Apache Kafka Logs:ログデータの構造と管理に関する包括的なガイド?

公開: 2021-12-24

データは、10年前と同じではありません。 データを使用可能な情報に処理することは、想像以上に困難です。 Linkedinによって作成されたが現在はオープンソース開発に変更されたApacheKafkaは、データを管理し、それらを有効に活用するための優れた無料ツールです。 今日のデジタル世界では、データは非常に重要です。それが私たちの現実の認識を推進しているデータです。

Apache Kafkaは、この複雑な問題を解決するために正確に作成されました。 これは、ユーザーが単一のソース(プロデューサー)または複数のソース(プロデューサー)から受信したデータを保存、分析、および読み取ることができるリアルタイム(10ミリ秒未満)のデータストリーミングソフトウェアです。 これは基本的に、可能な限り最速の時間枠で意味のあるチャネルにデータを配布するのに役立ちます。 例-クリケットの試合では、リアルタイムのスコアをチェックしてこの情報をチャンネルに渡すソース(プロデューサー)がいます。 チャネルはブローカーのようなもので、最も効率的な方法でエンドユーザーに情報を提供します。 Apache Kafkaは、これらすべてのことが起こる媒体です。

  • Apache Kafkaは、システムレベルの大幅なダウンタイムなしで、非常に簡単に拡張できます。
  • Apache Kafkaは、複数のブローカーを使用してデータを送信するため、フォールトトレラントシステムです。つまり、1つのブローカーがオフラインになった場合、同じデータが保存されているレプリケートされたブローカーが常に存在します。
  • Kafkaに基づくアプリケーションを構築する際に、Kerberosなどのセキュリティツールを使用できます。

あなたの便宜のために、あなたはここで直接Apachekafkaを学ぶことができます

Apache Kafkaログとは何ですか?

  • Apache Kafka Logsは、ディスク上に存在するさまざまなデータセグメントのコレクションです。 さまざまなデータセグメントはすべて、フォームトピックまたは特定トピックのパーティションのいずれかの名前を持っています。
  • Apache Kafkaを使用すると、分散システムの外部ログをコミットすることでデータノードを複製することもできます。 このメカニズムにより、必要なときにいつでもデータを読み取るだけでなく、データを復元できます。

PexelsのMarkusSpiskeによる写真

Apache Kafka Logsを使用する際に覚えておくべきことはありますか?

  • 冗長データや運用目的でのみ使用されるデータのログインは避けてください。
  • 通常、モジュールの起動またはシャットダウンの最初と最後にログベースのエントリを作成することを選択できますが、特別なログを作成することもできます。 特別ログは、更新を実行するための特定のフェーズの開始時と終了時に使用できます。

Apache Kafkaでログを有効にする方法は?

これは、apachekafkaでログを開始するためにスクリプトに入力する必要があるログ構成です-

#ファイルベースとkafkaベースの両方のロギングを有効にする

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

これを実行すると、javascriptオブジェクト表記形式またはjsonでログを見つけることができるはずです。

ApacheKafkaログに関連するさまざまな操作とコマンド

Apache Kafkaでは、さまざまな操作を実装するために必要な数のコマンドを実行できます。 ロギングはバックグラウンドで行われます。

  1. セグメントのロギング–このコードを使用します

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. 新しいログインスタンスを作成します

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. 読書記録

addAbortedTransactions

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. レコードの追加

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. セグメントをクリーンアップし、オフセットマップを作成します

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. セグメントの削除

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. 新しいログファイルを作成します

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. 新しいログセグメントを開く-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

接尾辞:文字列=“”):ファイル

  1. ログを閉じる

close(): Unit

  1. セグメントの回復と再構築

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. セグメントの追加または変換:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. パーティションをロードする

parseTopicPartitionName(dir: File): TopicPartition

  1. 構成ファイルを変更します

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. 操作を切り捨てます。

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit