Apache Kafka Logs: Ein umfassender Leitfaden zur Struktur und Verwaltung von Protokolldaten?

Veröffentlicht: 2021-12-24

Daten sind nicht mehr dieselben wie vor zehn Jahren. Die Verarbeitung von Daten zu nutzbaren Informationen ist viel schwieriger als man sich vorstellen kann. Apache Kafka, erstellt von Linkedin, aber jetzt auf Open-Source-Entwicklung umgestellt, ist ein hervorragendes kostenloses Tool, um Daten zu verwalten und sinnvoll zu nutzen. In der heutigen digitalen Welt sind Daten sehr wichtig, es sind die Daten, die unsere Wahrnehmung der Realität bestimmen.

Apache Kafka wurde genau dafür entwickelt, dieses komplizierte Problem zu lösen. Es handelt sich um eine Echtzeit-Daten-Streaming-Software (weniger als 10 ms), die es Benutzern ermöglicht, Daten zu speichern, zu analysieren und zu lesen, die von einer einzelnen Quelle (Produzent) oder mehreren empfangen werden. Es hilft im Wesentlichen dabei, die Daten zum schnellstmöglichen Zeitrahmen auf sinnvolle Kanäle zu verteilen. Zum Beispiel: Bei einem Cricket-Match gibt es eine Quelle (Produzent), die das Ergebnis in Echtzeit überprüft und diese Informationen an die Kanäle weiterleitet. Kanäle sind wie Makler, die die Informationen dann auf die effizienteste Weise an die Endverbraucher liefern. Apache Kafka ist das Medium, in dem all diese Dinge passieren.

  • Apache Kafka kann sehr einfach ohne nennenswerte Ausfallzeiten auf Systemebene skaliert werden.
  • Apache Kafka ist ein fehlertolerantes System, da es mehrere Broker zur Datenübertragung verwendet, d. h. wenn ein Broker offline geht, gibt es immer einen replizierten Broker, der dieselben Daten gespeichert hat.
  • Sicherheitstools wie Kerberos können beim Erstellen von Anwendungen auf Basis von Kafka verwendet werden.

Für Ihre Bequemlichkeit können Sie hier direkt Apache Kafka lernen

Was sind Apache Kafka-Protokolle?

  • Apache Kafka-Protokolle sind eine Sammlung verschiedener Datensegmente, die auf Ihrer Festplatte vorhanden sind. Alle verschiedenen Datensegmente haben Namen, die entweder Form-Topic- oder Specific-Topic-Partition sind.
  • Apache Kafka ermöglicht es uns auch, Datenknoten zu replizieren, indem wir ein externes Protokoll für ein verteiltes System festschreiben. Dieser Mechanismus ermöglicht es uns, Daten wiederherzustellen, abgesehen davon, dass wir sie nur lesen, wann immer wir sie brauchen.

Foto von Markus Spiske von Pexels

Woran Sie beim Arbeiten mit Apache Kafka Logs denken sollten?

  • Vergessen Sie nicht, redundante Daten oder Daten, die nur für betriebliche Zwecke verwendet werden sollen, anzumelden.
  • Normalerweise können Sie protokollbasierte Einträge ganz am Anfang und am Ende von Modulstarts oder -abschaltungen erstellen, aber Sie können auch spezielle Protokolle erstellen. Spezielle Protokolle können zu Beginn und am Ende einer bestimmten Phase zur Durchführung eines Updates verwendet werden.

Wie aktiviere ich Protokolle in Apache Kafka?

Hier ist eine Protokollkonfiguration, die Sie in Ihr Skript eingeben müssen, um Protokolle in Apache Kafka zu starten.

# Aktivieren Sie sowohl die datei- als auch die kafka-basierte Protokollierung

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

Sobald Sie dies getan haben, sollten Sie in der Lage sein, die Protokolle im JavaScript-Objektnotationsformat oder json zu finden.

Verschiedene Operationen und Befehle im Zusammenhang mit Apache Kafka-Protokollen

In Apache Kafka können Sie beliebig viele Befehle ausführen, um verschiedene Operationen zu implementieren. Die Protokollierung erfolgt im Hintergrund.

  1. Protokollierungssegmente – Verwenden Sie diesen Code

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. Erstellen Sie eine neue Protokollinstanz

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. Aufzeichnungen lesen

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. Datensätze anhängen

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. Bereinigen Sie Segmente und erstellen Sie eine Offset-Karte

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. Segmente löschen

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. Erstelle eine neue Protokolldatei

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. Öffnen Sie ein neues Log-Segment-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

Suffix: String = „“): Datei

  1. Schließen Sie ein Protokoll

close(): Unit

  1. Segmente wiederherstellen und neu erstellen

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. Segmente hinzufügen oder konvertieren:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. Laden Sie eine Partition

parseTopicPartitionName(dir: File): TopicPartition

  1. Ändern Sie die Konfigurationsdatei

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. Eine Operation abschneiden:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit