Logs do Apache Kafka: um guia abrangente para estrutura e gerenciamento de dados de log?

Publicados: 2021-12-24

Os dados não são os mesmos de uma década atrás. Processar dados em informações utilizáveis ​​é muito mais difícil do que se possa imaginar. O Apache Kafka, criado pelo Linkedin, mas agora alterado para desenvolvimento de código aberto, é uma excelente ferramenta gratuita para gerenciar dados e fazer bom uso deles. No mundo digital de hoje os dados são muito importantes, são os dados que estão conduzindo nossa percepção da realidade.

O Apache Kafka foi feito precisamente para resolver esse problema complicado. É um software de streaming de dados em tempo real (menos de 10ms) que permite aos usuários armazenar, analisar e ler dados recebidos de uma única fonte (produtor) ou múltipla. Essencialmente, ajuda a distribuir os dados para canais significativos no prazo mais rápido possível. Por exemplo- Em uma partida de críquete há uma fonte (produtor) que verifica a pontuação em tempo real e passa essa informação para os canais. Os canais são como corretores que fornecerão as informações aos consumidores finais da maneira mais eficiente. Apache Kafka é aquele meio onde todas essas coisas acontecem.

  • O Apache Kafka pode ser facilmente dimensionado sem qualquer tempo de inatividade significativo no nível do sistema.
  • O Apache Kafka é um sistema tolerante a falhas, pois usa vários agentes para transmitir dados, ou seja, se um agente ficar offline, sempre haverá um agente replicado que tem os mesmos dados armazenados.
  • Ferramentas de segurança como Kerberos podem ser usadas durante a criação de aplicativos baseados em Kafka.

Para sua conveniência, você pode obter diretamente o Apache kafka aqui

O que são logs do Apache Kafka?

  • Os logs do Apache Kafka são uma coleção de vários segmentos de dados presentes em seu disco. Todos os vários segmentos de dados têm nomes que são partição de tópico de formulário ou de tópico específico.
  • O Apache Kafka também nos permite replicar nós de dados confirmando um log externo para um sistema distribuído. Esse mecanismo nos permite restaurar dados além de apenas lê-los, sempre que precisarmos.

Foto de Markus Spiske do Pexels

Coisas para lembrar ao trabalhar com Apache Kafka Logs?

  • Lembre-se de evitar o login de dados redundantes ou dados que devem ser usados ​​apenas para fins operacionais.
  • Normalmente, você pode optar por criar entradas baseadas em log bem no início e no final da(s) inicialização(ões) ou encerramento(s) de um módulo, mas também pode criar logs especiais. Os logs especiais podem ser usados ​​no início e no final de uma fase específica para realizar uma atualização.

Como habilitar logs no Apache Kafka?

Aqui está uma configuração de log que você precisa inserir em seu script para iniciar os logs no apache kafka-

# Habilite o registro baseado em arquivo e kafka

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

Depois de fazer isso, você poderá encontrar os logs no formato de notação de objeto javascript ou json.

Várias operações e comandos associados aos logs do Apache Kafka

No Apache Kafka, você pode executar quantos comandos quiser para implementar várias operações. O registro ocorrerá em segundo plano.

  1. Segmentos de Log – Use este código

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. Criar uma nova instância de log

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. Lendo registros

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. Anexando registros

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. Limpe segmentos e construa um mapa de deslocamento

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. Excluindo segmentos

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. Criar um novo arquivo de log

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. Abra um novo segmento de log-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

sufixo: String = “”): Arquivo

  1. Fechar um registro

close(): Unit

  1. Recuperar e reconstruir segmentos

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. Adicione ou converta segmentos:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. Carregar uma partição

parseTopicPartitionName(dir: File): TopicPartition

  1. Modifique o arquivo de configuração

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. Truncar uma operação:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit