Registri Apache Kafka: una guida completa alla struttura e alla gestione dei dati di registro?

Pubblicato: 2021-12-24

I dati non sono più gli stessi di dieci anni fa. L'elaborazione dei dati in informazioni utilizzabili è molto più difficile di quanto si possa immaginare. Apache Kafka, creato da Linkedin ma ora trasformato in sviluppo open source, è un ottimo strumento gratuito per gestire i dati e farne buon uso. Nel mondo digitale di oggi i dati sono molto importanti, sono i dati che guidano la nostra percezione della realtà.

Apache Kafka è stato creato proprio per risolvere questo complicato problema. È un software di streaming di dati in tempo reale (meno di 10 ms) che consente agli utenti di archiviare, analizzare e leggere i dati ricevuti da una singola fonte (produttore) o da più fonti. Aiuta essenzialmente a distribuire i dati a canali significativi nel più breve tempo possibile. Ad esempio: in una partita di cricket c'è una fonte (produttore) che controlla il punteggio in tempo reale e trasmette queste informazioni ai canali. I canali sono come broker che forniranno le informazioni ai consumatori finali nel modo più efficiente. Apache Kafka è quel mezzo in cui accadono tutte queste cose.

  • Apache Kafka può essere ridimensionato molto facilmente senza alcun significativo downtime a livello di sistema.
  • Apache Kafka è un sistema a tolleranza d'errore poiché utilizza più broker per trasmettere i dati, il che significa che se un broker va offline, c'è sempre un broker replicato che ha gli stessi dati archiviati.
  • Strumenti di sicurezza come Kerberos possono essere utilizzati durante la creazione di applicazioni basate su Kafka.

Per tua comodità, puoi ottenere direttamente l'app Learn Apache kafka qui

Cosa sono i log di Apache Kafka?

  • I registri Apache Kafka sono una raccolta di vari segmenti di dati presenti sul disco. Tutti i vari segmenti di dati hanno nomi che sono partizione argomento o argomento specifico.
  • Apache Kafka ci consente anche di replicare i nodi di dati eseguendo il commit di un log esterno per un sistema distribuito. Questo meccanismo ci consente di ripristinare i dati oltre alla semplice lettura, ogni volta che ne abbiamo bisogno.

Foto di Markus Spiske di Pexels

Cose da ricordare mentre si lavora con Apache Kafka Logs?

  • Ricordarsi di evitare di accedere a dati ridondanti o dati che devono essere utilizzati solo per scopi operativi.
  • Normalmente puoi scegliere di creare voci basate su log proprio all'inizio e alla fine dell'avvio o dello spegnimento di un modulo, ma puoi anche creare log speciali. I Log speciali possono essere utilizzati all'inizio e alla fine di una specifica fase per effettuare un aggiornamento.

Come abilitare i registri in Apache Kafka?

Ecco una configurazione del registro che devi inserire nel tuo script per avviare i registri in apache kafka-

# Abilita la registrazione basata su file e kafka

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

Dopo averlo fatto, dovresti essere in grado di trovare i log in formato di notazione oggetto javascript o json.

Varie operazioni e comandi associati ai registri di Apache Kafka

In Apache Kafka, puoi eseguire tutti i comandi che desideri per implementare varie operazioni. La registrazione avverrà in background.

  1. Segmenti di registrazione: utilizzare questo codice

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. Crea una nuova istanza di log

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. Leggere i record

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. Record di accodamento

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. Pulisci i segmenti e costruisci una mappa di offset

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. Eliminazione di segmenti

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. Crea un nuovo file di registro

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. Apri un nuovo segmento di log-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

suffisso: String = “”): File

  1. Chiudi un registro

close(): Unit

  1. Recupera e ricostruisci segmenti

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. Aggiungi o converti segmenti:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. Carica una partizione

parseTopicPartitionName(dir: File): TopicPartition

  1. Modifica il file di configurazione

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. Tronca un'operazione:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit