Apache Kafka Logs: kompleksowy przewodnik po strukturze i zarządzaniu danymi dziennika?

Opublikowany: 2021-12-24

Dane nie są takie same jak dziesięć lat temu. Przetwarzanie danych w użyteczne informacje jest o wiele trudniejsze niż można sobie wyobrazić. Apache Kafka, stworzony przez Linkedin, ale teraz zmieniony na programowanie open source, jest doskonałym darmowym narzędziem do zarządzania danymi i dobrego ich wykorzystania. W dzisiejszym cyfrowym świecie dane są bardzo ważne, to one kierują naszym postrzeganiem rzeczywistości.

Apache Kafka został stworzony właśnie po to, by rozwiązać ten skomplikowany problem. Jest to oprogramowanie do strumieniowego przesyłania danych w czasie rzeczywistym (mniej niż 10 ms), które umożliwia użytkownikom przechowywanie, analizowanie i odczytywanie danych otrzymanych z jednego źródła (producenta) lub z wielu. Zasadniczo pomaga w dystrybucji danych do znaczących kanałów w najszybszym możliwym czasie. Na przykład: w meczu krykieta jest źródło (producent), który sprawdza wynik w czasie rzeczywistym i przekazuje te informacje do kanałów. Kanały są jak brokerzy, którzy następnie dostarczają informacje konsumentom końcowym w najbardziej efektywny sposób. Apache Kafka to medium, w którym dzieją się te wszystkie rzeczy.

  • Apache Kafka można bardzo łatwo skalować bez żadnych znaczących przestojów na poziomie systemu.
  • Apache Kafka jest systemem odpornym na błędy, ponieważ używa wielu brokerów do przesyłania danych, co oznacza, że ​​jeśli jeden broker przejdzie w tryb offline, zawsze istnieje replikowany broker, który przechowuje te same dane.
  • Narzędzia zabezpieczające, takie jak Kerberos, mogą być używane podczas budowania aplikacji opartych na Kafce.

Dla Twojej wygody możesz bezpośrednio pobrać kafkę Apache tutaj

Czym są dzienniki Apache Kafka?

  • Apache Kafka Logs to zbiór różnych segmentów danych znajdujących się na dysku. Wszystkie różne segmenty danych mają nazwy, które są albo partycją tematu formularza, albo partycji określonego tematu.
  • Apache Kafka pozwala nam również replikować węzły danych poprzez zatwierdzenie zewnętrznego dziennika dla systemu rozproszonego. Mechanizm ten pozwala nam na przywracanie danych, a nie tylko ich odczytywanie, kiedy tylko tego potrzebujemy.

Zdjęcie: Markus Spiske z Pexels

O czym należy pamiętać podczas pracy z Apache Kafka Logs?

  • Pamiętaj, aby nie logować się na zbędne dane lub dane, które służą wyłącznie do celów operacyjnych.
  • Zwykle możesz wybrać tworzenie wpisów opartych na dziennikach na samym początku i na końcu uruchamiania lub zamykania modułu, ale możesz także tworzyć specjalne dzienniki. Dzienniki specjalne mogą być używane na początku i na końcu określonej fazy w celu przeprowadzenia aktualizacji.

Jak włączyć Logi w Apache Kafka?

Oto konfiguracja dziennika, którą musisz wprowadzić do swojego skryptu, aby uruchomić logi w apache kafka-

# Włącz logowanie oparte na plikach i kafce

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

Gdy to zrobisz, powinieneś być w stanie znaleźć logi w formacie notacji obiektów javascript lub json.

Różne operacje i polecenia związane z Apache Kafka Logs

W Apache Kafka możesz wykonać tyle poleceń, ile chcesz, aby zaimplementować różne operacje. Logowanie odbędzie się w tle.

  1. Segmenty logowania – użyj tego kodu

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. Utwórz nową instancję dziennika

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. Czytanie zapisów

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. Dołączanie rekordów

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. Oczyść segmenty i zbuduj mapę offsetową

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. Usuwanie segmentów

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. Stwórz nowy plik

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. Otwórz nowy segment dziennika-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

sufiks: String = „”): Plik

  1. Zamknij dziennik

close(): Unit

  1. Odzyskaj i odbuduj segmenty

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. Dodaj lub przekonwertuj segmenty:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. Załaduj partycję

parseTopicPartitionName(dir: File): TopicPartition

  1. Zmodyfikuj plik konfiguracyjny

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. Obetnij operację:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit