Logs do Apache Kafka: um guia abrangente para estrutura e gerenciamento de dados de log?
Publicados: 2021-12-24Os dados não são os mesmos de uma década atrás. Processar dados em informações utilizáveis é muito mais difícil do que se possa imaginar. O Apache Kafka, criado pelo Linkedin, mas agora alterado para desenvolvimento de código aberto, é uma excelente ferramenta gratuita para gerenciar dados e fazer bom uso deles. No mundo digital de hoje os dados são muito importantes, são os dados que estão conduzindo nossa percepção da realidade.
O Apache Kafka foi feito precisamente para resolver esse problema complicado. É um software de streaming de dados em tempo real (menos de 10ms) que permite aos usuários armazenar, analisar e ler dados recebidos de uma única fonte (produtor) ou múltipla. Essencialmente, ajuda a distribuir os dados para canais significativos no prazo mais rápido possível. Por exemplo- Em uma partida de críquete há uma fonte (produtor) que verifica a pontuação em tempo real e passa essa informação para os canais. Os canais são como corretores que fornecerão as informações aos consumidores finais da maneira mais eficiente. Apache Kafka é aquele meio onde todas essas coisas acontecem.
- O Apache Kafka pode ser facilmente dimensionado sem qualquer tempo de inatividade significativo no nível do sistema.
- O Apache Kafka é um sistema tolerante a falhas, pois usa vários agentes para transmitir dados, ou seja, se um agente ficar offline, sempre haverá um agente replicado que tem os mesmos dados armazenados.
- Ferramentas de segurança como Kerberos podem ser usadas durante a criação de aplicativos baseados em Kafka.
Para sua conveniência, você pode obter diretamente o Apache kafka aqui
O que são logs do Apache Kafka?
- Os logs do Apache Kafka são uma coleção de vários segmentos de dados presentes em seu disco. Todos os vários segmentos de dados têm nomes que são partição de tópico de formulário ou de tópico específico.
- O Apache Kafka também nos permite replicar nós de dados confirmando um log externo para um sistema distribuído. Esse mecanismo nos permite restaurar dados além de apenas lê-los, sempre que precisarmos.

Foto de Markus Spiske do Pexels
Coisas para lembrar ao trabalhar com Apache Kafka Logs?
- Lembre-se de evitar o login de dados redundantes ou dados que devem ser usados apenas para fins operacionais.
- Normalmente, você pode optar por criar entradas baseadas em log bem no início e no final da(s) inicialização(ões) ou encerramento(s) de um módulo, mas também pode criar logs especiais. Os logs especiais podem ser usados no início e no final de uma fase específica para realizar uma atualização.
Como habilitar logs no Apache Kafka?
Aqui está uma configuração de log que você precisa inserir em seu script para iniciar os logs no apache kafka-
# Habilite o registro baseado em arquivo e kafka
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}
Depois de fazer isso, você poderá encontrar os logs no formato de notação de objeto javascript ou json.
Várias operações e comandos associados aos logs do Apache Kafka
No Apache Kafka, você pode executar quantos comandos quiser para implementar várias operações. O registro ocorrerá em segundo plano.
- Segmentos de Log – Use este código
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]
- Criar uma nova instância de log
apply(
dir:
File,config: LogConfig,
logStartOffset: Long,

recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
- Lendo registros
addAbortedTransactions
(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
read(
startOffset: Long,
maxLength: Int,
maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
- Anexando registros
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
- Limpe segmentos e construa um mapa de deslocamento
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
- Excluindo segmentos
roll(
expectedNextOffset: Option[Long] = None): LogSegment
asyncDeleteSegment(segment: LogSegment): Unit
deleteSeg(): Unit
- Criar um novo arquivo de log
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
- Abra um novo segmento de log-
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile(
dir: File,
offset: Long,
sufixo: String = “”): Arquivo
- Fechar um registro
close(): Unit
- Recuperar e reconstruir segmentos
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
- Adicione ou converta segmentos:
addSegment(
segment: LogSegment): LogSegment
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
- Carregar uma partição
parseTopicPartitionName(dir: File): TopicPartition
- Modifique o arquivo de configuração
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
- Truncar uma operação:
truncateTo(targetOffset: Long): Boolean
truncateFullyAndStartAt(newOffset: Long): Unit