Apache Kafka Logs: Ein umfassender Leitfaden zur Struktur und Verwaltung von Protokolldaten?
Veröffentlicht: 2021-12-24Daten sind nicht mehr dieselben wie vor zehn Jahren. Die Verarbeitung von Daten zu nutzbaren Informationen ist viel schwieriger als man sich vorstellen kann. Apache Kafka, erstellt von Linkedin, aber jetzt auf Open-Source-Entwicklung umgestellt, ist ein hervorragendes kostenloses Tool, um Daten zu verwalten und sinnvoll zu nutzen. In der heutigen digitalen Welt sind Daten sehr wichtig, es sind die Daten, die unsere Wahrnehmung der Realität bestimmen.
Apache Kafka wurde genau dafür entwickelt, dieses komplizierte Problem zu lösen. Es handelt sich um eine Echtzeit-Daten-Streaming-Software (weniger als 10 ms), die es Benutzern ermöglicht, Daten zu speichern, zu analysieren und zu lesen, die von einer einzelnen Quelle (Produzent) oder mehreren empfangen werden. Es hilft im Wesentlichen dabei, die Daten zum schnellstmöglichen Zeitrahmen auf sinnvolle Kanäle zu verteilen. Zum Beispiel: Bei einem Cricket-Match gibt es eine Quelle (Produzent), die das Ergebnis in Echtzeit überprüft und diese Informationen an die Kanäle weiterleitet. Kanäle sind wie Makler, die die Informationen dann auf die effizienteste Weise an die Endverbraucher liefern. Apache Kafka ist das Medium, in dem all diese Dinge passieren.
- Apache Kafka kann sehr einfach ohne nennenswerte Ausfallzeiten auf Systemebene skaliert werden.
- Apache Kafka ist ein fehlertolerantes System, da es mehrere Broker zur Datenübertragung verwendet, d. h. wenn ein Broker offline geht, gibt es immer einen replizierten Broker, der dieselben Daten gespeichert hat.
- Sicherheitstools wie Kerberos können beim Erstellen von Anwendungen auf Basis von Kafka verwendet werden.
Für Ihre Bequemlichkeit können Sie hier direkt Apache Kafka lernen
Was sind Apache Kafka-Protokolle?
- Apache Kafka-Protokolle sind eine Sammlung verschiedener Datensegmente, die auf Ihrer Festplatte vorhanden sind. Alle verschiedenen Datensegmente haben Namen, die entweder Form-Topic- oder Specific-Topic-Partition sind.
- Apache Kafka ermöglicht es uns auch, Datenknoten zu replizieren, indem wir ein externes Protokoll für ein verteiltes System festschreiben. Dieser Mechanismus ermöglicht es uns, Daten wiederherzustellen, abgesehen davon, dass wir sie nur lesen, wann immer wir sie brauchen.

Foto von Markus Spiske von Pexels
Woran Sie beim Arbeiten mit Apache Kafka Logs denken sollten?
- Vergessen Sie nicht, redundante Daten oder Daten, die nur für betriebliche Zwecke verwendet werden sollen, anzumelden.
- Normalerweise können Sie protokollbasierte Einträge ganz am Anfang und am Ende von Modulstarts oder -abschaltungen erstellen, aber Sie können auch spezielle Protokolle erstellen. Spezielle Protokolle können zu Beginn und am Ende einer bestimmten Phase zur Durchführung eines Updates verwendet werden.
Wie aktiviere ich Protokolle in Apache Kafka?
Hier ist eine Protokollkonfiguration, die Sie in Ihr Skript eingeben müssen, um Protokolle in Apache Kafka zu starten.
# Aktivieren Sie sowohl die datei- als auch die kafka-basierte Protokollierung
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}
Sobald Sie dies getan haben, sollten Sie in der Lage sein, die Protokolle im JavaScript-Objektnotationsformat oder json zu finden.
Verschiedene Operationen und Befehle im Zusammenhang mit Apache Kafka-Protokollen
In Apache Kafka können Sie beliebig viele Befehle ausführen, um verschiedene Operationen zu implementieren. Die Protokollierung erfolgt im Hintergrund.
- Protokollierungssegmente – Verwenden Sie diesen Code
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]
- Erstellen Sie eine neue Protokollinstanz
apply(

dir:
File,config: LogConfig,
logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
- Aufzeichnungen lesen
addAbortedTransactions
(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
read(
startOffset: Long,
maxLength: Int,
maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
- Datensätze anhängen
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
- Bereinigen Sie Segmente und erstellen Sie eine Offset-Karte
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
- Segmente löschen
roll(
expectedNextOffset: Option[Long] = None): LogSegment
asyncDeleteSegment(segment: LogSegment): Unit
deleteSeg(): Unit
- Erstelle eine neue Protokolldatei
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
- Öffnen Sie ein neues Log-Segment-
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile(
dir: File,
offset: Long,
Suffix: String = „“): Datei
- Schließen Sie ein Protokoll
close(): Unit
- Segmente wiederherstellen und neu erstellen
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
- Segmente hinzufügen oder konvertieren:
addSegment(
segment: LogSegment): LogSegment
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
- Laden Sie eine Partition
parseTopicPartitionName(dir: File): TopicPartition
- Ändern Sie die Konfigurationsdatei
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
- Eine Operation abschneiden:
truncateTo(targetOffset: Long): Boolean
truncateFullyAndStartAt(newOffset: Long): Unit