Журналы Apache Kafka: подробное руководство по структуре данных журналов и управлению ими?

Опубликовано: 2021-12-24

Данные уже не те, что были десять лет назад. Преобразование данных в полезную информацию намного сложнее, чем можно себе представить. Apache Kafka, созданный Linkedin, но теперь переведенный на разработку с открытым исходным кодом, является отличным бесплатным инструментом для управления данными и их эффективного использования. В современном цифровом мире очень важны данные, именно они определяют наше восприятие реальности.

Apache Kafka был создан именно для решения этой сложной проблемы. Это программное обеспечение для потоковой передачи данных в режиме реального времени (менее 10 мс), которое позволяет пользователям хранить, анализировать и считывать данные, полученные из одного источника (производителя) или нескольких. По сути, это помогает распространять данные по значимым каналам в кратчайшие сроки. Например, в матче по крикету есть источник (продюсер), который проверяет счет в реальном времени и передает эту информацию по каналам. Каналы подобны брокерам, которые затем наиболее эффективным образом предоставляют информацию конечным потребителям. Apache Kafka — это среда, в которой все это происходит.

  • Apache Kafka можно очень легко масштабировать без значительного простоя на уровне системы.
  • Apache Kafka — отказоустойчивая система, поскольку она использует несколько брокеров для передачи данных, а это означает, что если один брокер отключается, всегда есть реплицированный брокер, в котором хранятся те же данные.
  • Инструменты безопасности, такие как Kerberos, можно использовать при создании приложений на основе Kafka.

Для вашего удобства вы можете напрямую изучить Apache kafka здесь

Что такое журналы Apache Kafka?

  • Журналы Apache Kafka — это набор различных сегментов данных, присутствующих на вашем диске. Все различные сегменты данных имеют имена, которые относятся либо к разделу формы, либо к разделу конкретной темы.
  • Apache Kafka также позволяет нам реплицировать узлы данных, фиксируя внешний журнал для распределенной системы. Этот механизм позволяет нам восстанавливать данные, помимо простого чтения, всякий раз, когда нам это нужно.

Фото Маркуса Списке: Pexels

Что следует помнить при работе с журналами Apache Kafka?

  • Помните, что следует избегать регистрации избыточных данных или данных, которые должны использоваться только в рабочих целях.
  • Обычно вы можете создавать записи на основе журнала в самом начале и в конце запуска или завершения работы модуля, но вы также можете создавать специальные журналы. Специальные журналы можно использовать в начале и в конце определенного этапа для выполнения обновления.

Как включить журналы в Apache Kafka?

Вот конфигурация журнала, которую вам нужно ввести в свой скрипт, чтобы запустить журналы в apache kafka:

# Включить ведение журнала как в файле, так и на основе kafka

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

После того, как вы это сделаете, вы сможете найти журналы в формате записи объектов javascript или json.

Различные операции и команды, связанные с журналами Apache Kafka

В Apache Kafka вы можете выполнять столько команд, сколько хотите для реализации различных операций. Регистрация будет происходить в фоновом режиме.

  1. Регистрация сегментов — используйте этот код

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. Создать новый экземпляр журнала

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. Чтение записей

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. Добавление записей

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. Очистить сегменты и построить карту смещения

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. Удаление сегментов

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. Создать новый файл журнала

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. Откройте новый сегмент журнала-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

суффикс: Строка = ""): Файл

  1. Закрыть журнал

close(): Unit

  1. Восстановление и перестроение сегментов

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. Добавить или преобразовать сегменты:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. Загрузить раздел

parseTopicPartitionName(dir: File): TopicPartition

  1. Изменить файл конфигурации

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. Усечение операции:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit