Apache Kafka 日誌:日誌數據結構和管理的綜合指南?

已發表: 2021-12-24

數據與十年前不同。 將數據處理成可用信息比想像的要困難得多。 Apache Kafka 由 Linkedin 創建,但現在改為開源開發,是管理數據和充分利用數據的優秀免費工具。 在當今的數字世界中,數據非常重要,正是數據推動了我們對現實的感知。

Apache Kafka 正是為了解決這個複雜的問題而設計的。 它是一個實時(小於 10 毫秒)數據流軟件,允許用戶存儲、分析和讀取從單個來源(生產者)或多個來源接收到的數據。 它本質上有助於在盡可能快的時間範圍內將數據分發到有意義的渠道。 例如-在板球比賽中,有一個來源(製作人)檢查實時比分並將此信息傳遞給頻道。 渠道就像經紀人,他們將以最有效的方式將信息提供給最終消費者。 Apache Kafka 是所有這些事情發生的媒介。

  • Apache Kafka 可以非常輕鬆地進行擴展,而無需任何顯著的系統級停機時間。
  • Apache Kafka 是一個容錯系統,因為它使用多個代理來傳輸數據,這意味著如果一個代理下線,總會有一個複制的代理存儲相同的數據。
  • 在構建基於 Kafka 的應用程序時,可以使用 Kerberos 等安全工具。

為了您的方便,您可以在此處直接獲取學習 Apache kafka

什麼是 Apache Kafka 日誌?

  • Apache Kafka 日誌是磁盤上存在的各種數據段的集合。 所有各種數據段的名稱要么是表單主題分區,要么是特定主題分區。
  • Apache Kafka 還允許我們通過提交分佈式系統的外部日誌來複製數據節點。 這種機制允許我們在需要時恢復數據,而不僅僅是讀取數據。

Pexels 上的 Markus Spiske 拍攝的照片

使用 Apache Kafka 日誌時要記住的事情?

  • 請記住避免登錄冗餘數據或僅用於操作目的的數據。
  • 通常,您可以選擇在模塊啟動或關閉的開始和結束時創建基於日誌的條目,但您也可以創建特殊日誌。 可以在特定階段的開始和結束時使用特殊日誌來執行更新。

如何在 Apache Kafka 中啟用日誌?

這是一個日誌配置,您需要將其輸入到腳本中以在 apache kafka 中啟動日誌-

# 啟用基於文件和 kafka 的日誌記錄

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

完成此操作後,您應該能夠找到 javascript 對象表示法格式或 json 格式的日誌。

與 Apache Kafka 日誌相關的各種操作和命令

在 Apache Kafka 中,您可以執行任意數量的命令來實現各種操作。 日誌記錄將在後台進行。

  1. 記錄段 - 使用此代碼

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. 創建一個新的日誌實例

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. 閱讀記錄

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. 附加記錄

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. 清理段並構建偏移地圖

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. 刪除段

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. 創建一個新的日誌文件

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. 打開一個新的日誌段-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

後綴:字符串=“”):文件

  1. 關閉日誌

close(): Unit

  1. 恢復和重建段

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. 添加或轉換段:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. 加載分區

parseTopicPartitionName(dir: File): TopicPartition

  1. 修改配置文件

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. 截斷一個操作:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit