Jurnalele Apache Kafka: un ghid cuprinzător pentru structura și managementul datelor jurnalului?

Publicat: 2021-12-24

Datele nu mai sunt la fel ca acum un deceniu. Procesarea datelor în informații utilizabile este mult mai dificilă decât se poate imagina. Apache Kafka, creat de Linkedin, dar acum schimbat la dezvoltarea open source, este un instrument gratuit excelent pentru gestionarea datelor și pentru a le folosi bine. În lumea digitală de astăzi, datele sunt foarte importante, acestea sunt cele care ne conduc percepția asupra realității.

Apache Kafka a fost făcut tocmai pentru a rezolva această problemă complicată. Este un software de streaming de date în timp real (mai puțin de 10 ms) care permite utilizatorilor să stocheze, să analizeze și să citească datele primite de la o singură sursă (producător) sau mai multe. În esență, ajută la distribuirea datelor către canale semnificative în cel mai rapid interval de timp posibil. De exemplu, într-un meci de cricket există o sursă (producător) care verifică scorul în timp real și transmite aceste informații către canale. Canalele sunt ca brokerii care vor furniza apoi informațiile consumatorilor finali în cel mai eficient mod. Apache Kafka este acel mediu în care se întâmplă toate aceste lucruri.

  • Apache Kafka poate fi scalat foarte ușor, fără vreun timp de nefuncționare semnificativ la nivel de sistem.
  • Apache Kafka este un sistem tolerant la erori, deoarece folosește mai mulți brokeri pentru a transmite date, ceea ce înseamnă că, dacă un broker este offline, există întotdeauna un broker replicat care are aceleași date stocate.
  • Instrumente de securitate precum Kerberos pot fi utilizate în timp ce se construiesc aplicații bazate pe Kafka.

Pentru confortul dvs., puteți obține direct instrucțiunile Apache kafka aici

Ce sunt jurnalele Apache Kafka?

  • Jurnalele Apache Kafka sunt o colecție de diferite segmente de date prezente pe disc. Toate diferitele segmente de date au nume care sunt fie partiții de formulare, fie de subiecte specifice.
  • Apache Kafka ne permite, de asemenea, să replicăm nodurile de date prin comiterea unui jurnal extern pentru un sistem distribuit. Acest mecanism ne permite să restaurăm datele în afară de doar citirea lor, oricând avem nevoie de ele.

Fotografie de Markus Spiske de la Pexels

Lucruri de reținut când lucrați cu Apache Kafka Logs?

  • Nu uitați să evitați conectarea datelor redundante sau a datelor care trebuie utilizate numai în scopuri operaționale.
  • În mod normal, puteți alege să creați intrări bazate pe jurnal la începutul și la sfârșitul pornirii sau opririi unui modul, dar puteți crea și jurnalele speciale. Jurnalele speciale pot fi utilizate la începutul și la sfârșitul unei faze specifice pentru efectuarea unei actualizări.

Cum se activează jurnalele în Apache Kafka?

Iată o configurație de jurnal pe care trebuie să o introduceți în scriptul dvs. pentru a porni jurnalele în apache kafka-

# Activați atât înregistrarea pe fișier, cât și pe baza Kafka

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

Odată ce ați făcut acest lucru, ar trebui să puteți găsi jurnalele în format de notație obiect javascript sau json.

Diverse operațiuni și comenzi asociate cu jurnalele Apache Kafka

În Apache Kafka, puteți executa câte comenzi doriți pentru a implementa diverse operațiuni. Înregistrarea va avea loc în fundal.

  1. Segmente de înregistrare – Folosiți acest cod

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. Creați o nouă instanță de jurnal

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. Citirea înregistrărilor

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. Adăugarea înregistrărilor

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. Curățați segmentele și construiți o hartă de offset

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. Ștergerea segmentelor

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. Creați un nou fișier jurnal

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. Deschide un nou segment de jurnal -

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

sufix: String = “”): Fișier

  1. Închideți un jurnal

close(): Unit

  1. Recuperați și reconstruiți segmente

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. Adăugați sau convertiți segmente:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. Încărcați o partiție

parseTopicPartitionName(dir: File): TopicPartition

  1. Modificați fișierul de configurare

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. Trunchiază o operație:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit