Registri Apache Kafka: una guida completa alla struttura e alla gestione dei dati di registro?
Pubblicato: 2021-12-24I dati non sono più gli stessi di dieci anni fa. L'elaborazione dei dati in informazioni utilizzabili è molto più difficile di quanto si possa immaginare. Apache Kafka, creato da Linkedin ma ora trasformato in sviluppo open source, è un ottimo strumento gratuito per gestire i dati e farne buon uso. Nel mondo digitale di oggi i dati sono molto importanti, sono i dati che guidano la nostra percezione della realtà.
Apache Kafka è stato creato proprio per risolvere questo complicato problema. È un software di streaming di dati in tempo reale (meno di 10 ms) che consente agli utenti di archiviare, analizzare e leggere i dati ricevuti da una singola fonte (produttore) o da più fonti. Aiuta essenzialmente a distribuire i dati a canali significativi nel più breve tempo possibile. Ad esempio: in una partita di cricket c'è una fonte (produttore) che controlla il punteggio in tempo reale e trasmette queste informazioni ai canali. I canali sono come broker che forniranno le informazioni ai consumatori finali nel modo più efficiente. Apache Kafka è quel mezzo in cui accadono tutte queste cose.
- Apache Kafka può essere ridimensionato molto facilmente senza alcun significativo downtime a livello di sistema.
- Apache Kafka è un sistema a tolleranza d'errore poiché utilizza più broker per trasmettere i dati, il che significa che se un broker va offline, c'è sempre un broker replicato che ha gli stessi dati archiviati.
- Strumenti di sicurezza come Kerberos possono essere utilizzati durante la creazione di applicazioni basate su Kafka.
Per tua comodità, puoi ottenere direttamente l'app Learn Apache kafka qui
Cosa sono i log di Apache Kafka?
- I registri Apache Kafka sono una raccolta di vari segmenti di dati presenti sul disco. Tutti i vari segmenti di dati hanno nomi che sono partizione argomento o argomento specifico.
- Apache Kafka ci consente anche di replicare i nodi di dati eseguendo il commit di un log esterno per un sistema distribuito. Questo meccanismo ci consente di ripristinare i dati oltre alla semplice lettura, ogni volta che ne abbiamo bisogno.

Foto di Markus Spiske di Pexels
Cose da ricordare mentre si lavora con Apache Kafka Logs?
- Ricordarsi di evitare di accedere a dati ridondanti o dati che devono essere utilizzati solo per scopi operativi.
- Normalmente puoi scegliere di creare voci basate su log proprio all'inizio e alla fine dell'avvio o dello spegnimento di un modulo, ma puoi anche creare log speciali. I Log speciali possono essere utilizzati all'inizio e alla fine di una specifica fase per effettuare un aggiornamento.
Come abilitare i registri in Apache Kafka?
Ecco una configurazione del registro che devi inserire nel tuo script per avviare i registri in apache kafka-
# Abilita la registrazione basata su file e kafka
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}
Dopo averlo fatto, dovresti essere in grado di trovare i log in formato di notazione oggetto javascript o json.
Varie operazioni e comandi associati ai registri di Apache Kafka
In Apache Kafka, puoi eseguire tutti i comandi che desideri per implementare varie operazioni. La registrazione avverrà in background.
- Segmenti di registrazione: utilizzare questo codice
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]
- Crea una nuova istanza di log
apply(
dir:
File,config: LogConfig,

logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
- Leggere i record
addAbortedTransactions
(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
read(
startOffset: Long,
maxLength: Int,
maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
- Record di accodamento
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
- Pulisci i segmenti e costruisci una mappa di offset
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
- Eliminazione di segmenti
roll(
expectedNextOffset: Option[Long] = None): LogSegment
asyncDeleteSegment(segment: LogSegment): Unit
deleteSeg(): Unit
- Crea un nuovo file di registro
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
- Apri un nuovo segmento di log-
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile(
dir: File,
offset: Long,
suffisso: String = “”): File
- Chiudi un registro
close(): Unit
- Recupera e ricostruisci segmenti
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
- Aggiungi o converti segmenti:
addSegment(
segment: LogSegment): LogSegment
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
- Carica una partizione
parseTopicPartitionName(dir: File): TopicPartition
- Modifica il file di configurazione
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
- Tronca un'operazione:
truncateTo(targetOffset: Long): Boolean
truncateFullyAndStartAt(newOffset: Long): Unit