Apache Kafka 日志:日志数据结构和管理的综合指南?

已发表: 2021-12-24

数据与十年前不同。 将数据处理成可用信息比想象的要困难得多。 Apache Kafka 由 Linkedin 创建,但现在改为开源开发,是管理数据和充分利用数据的优秀免费工具。 在当今的数字世界中,数据非常重要,正是数据推动了我们对现实的感知。

Apache Kafka 正是为了解决这个复杂的问题而设计的。 它是一个实时(小于 10 毫秒)数据流软件,允许用户存储、分析和读取从单个来源(生产者)或多个来源接收到的数据。 它本质上有助于在尽可能快的时间范围内将数据分发到有意义的渠道。 例如-在板球比赛中,有一个来源(制作人)检查实时比分并将此信息传递给频道。 渠道就像经纪人,他们将以最有效的方式将信息提供给最终消费者。 Apache Kafka 是所有这些事情发生的媒介。

  • Apache Kafka 可以非常轻松地进行扩展,而无需任何显着的系统级停机时间。
  • Apache Kafka 是一个容错系统,因为它使用多个代理来传输数据,这意味着如果一个代理下线,总会有一个复制的代理存储相同的数据。
  • 在构建基于 Kafka 的应用程序时,可以使用 Kerberos 等安全工具。

为了您的方便,您可以在此处直接获取学习 Apache kafka

什么是 Apache Kafka 日志?

  • Apache Kafka 日志是磁盘上存在的各种数据段的集合。 所有各种数据段的名称要么是表单主题分区,要么是特定主题分区。
  • Apache Kafka 还允许我们通过提交分布式系统的外部日志来复制数据节点。 这种机制允许我们在需要时恢复数据,而不仅仅是读取数据。

Pexels 上的 Markus Spiske 拍摄的照片

使用 Apache Kafka 日志时要记住的事情?

  • 请记住避免登录冗余数据或仅用于操作目的的数据。
  • 通常,您可以选择在模块启动或关闭的开始和结束时创建基于日志的条目,但您也可以创建特殊日志。 可以在特定阶段的开始和结束时使用特殊日志来执行更新。

如何在 Apache Kafka 中启用日志?

这是一个日志配置,您需要将其输入到脚本中以在 apache kafka 中启动日志-

# 启用基于文件和 kafka 的日志记录

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

完成此操作后,您应该能够找到 javascript 对象表示法格式或 json 格式的日志。

与 Apache Kafka 日志相关的各种操作和命令

在 Apache Kafka 中,您可以执行任意数量的命令来实现各种操作。 日志记录将在后台进行。

  1. 记录段 - 使用此代码

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. 创建一个新的日志实例

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. 阅读记录

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. 附加记录

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. 清理段并构建偏移地图

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. 删除段

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. 创建一个新的日志文件

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. 打开一个新的日志段-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

后缀:字符串=“”):文件

  1. 关闭日志

close(): Unit

  1. 恢复和重建段

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. 添加或转换段:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. 加载分区

parseTopicPartitionName(dir: File): TopicPartition

  1. 修改配置文件

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. 截断一个操作:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit