Apache Kafka Logs: kompleksowy przewodnik po strukturze i zarządzaniu danymi dziennika?
Opublikowany: 2021-12-24Dane nie są takie same jak dziesięć lat temu. Przetwarzanie danych w użyteczne informacje jest o wiele trudniejsze niż można sobie wyobrazić. Apache Kafka, stworzony przez Linkedin, ale teraz zmieniony na programowanie open source, jest doskonałym darmowym narzędziem do zarządzania danymi i dobrego ich wykorzystania. W dzisiejszym cyfrowym świecie dane są bardzo ważne, to one kierują naszym postrzeganiem rzeczywistości.
Apache Kafka został stworzony właśnie po to, by rozwiązać ten skomplikowany problem. Jest to oprogramowanie do strumieniowego przesyłania danych w czasie rzeczywistym (mniej niż 10 ms), które umożliwia użytkownikom przechowywanie, analizowanie i odczytywanie danych otrzymanych z jednego źródła (producenta) lub z wielu. Zasadniczo pomaga w dystrybucji danych do znaczących kanałów w najszybszym możliwym czasie. Na przykład: w meczu krykieta jest źródło (producent), który sprawdza wynik w czasie rzeczywistym i przekazuje te informacje do kanałów. Kanały są jak brokerzy, którzy następnie dostarczają informacje konsumentom końcowym w najbardziej efektywny sposób. Apache Kafka to medium, w którym dzieją się te wszystkie rzeczy.
- Apache Kafka można bardzo łatwo skalować bez żadnych znaczących przestojów na poziomie systemu.
- Apache Kafka jest systemem odpornym na błędy, ponieważ używa wielu brokerów do przesyłania danych, co oznacza, że jeśli jeden broker przejdzie w tryb offline, zawsze istnieje replikowany broker, który przechowuje te same dane.
- Narzędzia zabezpieczające, takie jak Kerberos, mogą być używane podczas budowania aplikacji opartych na Kafce.
Dla Twojej wygody możesz bezpośrednio pobrać kafkę Apache tutaj
Czym są dzienniki Apache Kafka?
- Apache Kafka Logs to zbiór różnych segmentów danych znajdujących się na dysku. Wszystkie różne segmenty danych mają nazwy, które są albo partycją tematu formularza, albo partycji określonego tematu.
- Apache Kafka pozwala nam również replikować węzły danych poprzez zatwierdzenie zewnętrznego dziennika dla systemu rozproszonego. Mechanizm ten pozwala nam na przywracanie danych, a nie tylko ich odczytywanie, kiedy tylko tego potrzebujemy.

Zdjęcie: Markus Spiske z Pexels
O czym należy pamiętać podczas pracy z Apache Kafka Logs?
- Pamiętaj, aby nie logować się na zbędne dane lub dane, które służą wyłącznie do celów operacyjnych.
- Zwykle możesz wybrać tworzenie wpisów opartych na dziennikach na samym początku i na końcu uruchamiania lub zamykania modułu, ale możesz także tworzyć specjalne dzienniki. Dzienniki specjalne mogą być używane na początku i na końcu określonej fazy w celu przeprowadzenia aktualizacji.
Jak włączyć Logi w Apache Kafka?
Oto konfiguracja dziennika, którą musisz wprowadzić do swojego skryptu, aby uruchomić logi w apache kafka-
# Włącz logowanie oparte na plikach i kafce
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}
Gdy to zrobisz, powinieneś być w stanie znaleźć logi w formacie notacji obiektów javascript lub json.
Różne operacje i polecenia związane z Apache Kafka Logs
W Apache Kafka możesz wykonać tyle poleceń, ile chcesz, aby zaimplementować różne operacje. Logowanie odbędzie się w tle.
- Segmenty logowania – użyj tego kodu
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]
- Utwórz nową instancję dziennika
apply(
dir:
File,config: LogConfig,

logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
- Czytanie zapisów
addAbortedTransactions
(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
read(
startOffset: Long,
maxLength: Int,
maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
- Dołączanie rekordów
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
- Oczyść segmenty i zbuduj mapę offsetową
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
- Usuwanie segmentów
roll(
expectedNextOffset: Option[Long] = None): LogSegment
asyncDeleteSegment(segment: LogSegment): Unit
deleteSeg(): Unit
- Stwórz nowy plik
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
- Otwórz nowy segment dziennika-
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile(
dir: File,
offset: Long,
sufiks: String = „”): Plik
- Zamknij dziennik
close(): Unit
- Odzyskaj i odbuduj segmenty
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
- Dodaj lub przekonwertuj segmenty:
addSegment(
segment: LogSegment): LogSegment
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
- Załaduj partycję
parseTopicPartitionName(dir: File): TopicPartition
- Zmodyfikuj plik konfiguracyjny
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
- Obetnij operację:
truncateTo(targetOffset: Long): Boolean
truncateFullyAndStartAt(newOffset: Long): Unit