Журналы Apache Kafka: подробное руководство по структуре данных журналов и управлению ими?
Опубликовано: 2021-12-24Данные уже не те, что были десять лет назад. Преобразование данных в полезную информацию намного сложнее, чем можно себе представить. Apache Kafka, созданный Linkedin, но теперь переведенный на разработку с открытым исходным кодом, является отличным бесплатным инструментом для управления данными и их эффективного использования. В современном цифровом мире очень важны данные, именно они определяют наше восприятие реальности.
Apache Kafka был создан именно для решения этой сложной проблемы. Это программное обеспечение для потоковой передачи данных в режиме реального времени (менее 10 мс), которое позволяет пользователям хранить, анализировать и считывать данные, полученные из одного источника (производителя) или нескольких. По сути, это помогает распространять данные по значимым каналам в кратчайшие сроки. Например, в матче по крикету есть источник (продюсер), который проверяет счет в реальном времени и передает эту информацию по каналам. Каналы подобны брокерам, которые затем наиболее эффективным образом предоставляют информацию конечным потребителям. Apache Kafka — это среда, в которой все это происходит.
- Apache Kafka можно очень легко масштабировать без значительного простоя на уровне системы.
- Apache Kafka — отказоустойчивая система, поскольку она использует несколько брокеров для передачи данных, а это означает, что если один брокер отключается, всегда есть реплицированный брокер, в котором хранятся те же данные.
- Инструменты безопасности, такие как Kerberos, можно использовать при создании приложений на основе Kafka.
Для вашего удобства вы можете напрямую изучить Apache kafka здесь
Что такое журналы Apache Kafka?
- Журналы Apache Kafka — это набор различных сегментов данных, присутствующих на вашем диске. Все различные сегменты данных имеют имена, которые относятся либо к разделу формы, либо к разделу конкретной темы.
- Apache Kafka также позволяет нам реплицировать узлы данных, фиксируя внешний журнал для распределенной системы. Этот механизм позволяет нам восстанавливать данные, помимо простого чтения, всякий раз, когда нам это нужно.

Фото Маркуса Списке: Pexels
Что следует помнить при работе с журналами Apache Kafka?
- Помните, что следует избегать регистрации избыточных данных или данных, которые должны использоваться только в рабочих целях.
- Обычно вы можете создавать записи на основе журнала в самом начале и в конце запуска или завершения работы модуля, но вы также можете создавать специальные журналы. Специальные журналы можно использовать в начале и в конце определенного этапа для выполнения обновления.
Как включить журналы в Apache Kafka?
Вот конфигурация журнала, которую вам нужно ввести в свой скрипт, чтобы запустить журналы в apache kafka:
# Включить ведение журнала как в файле, так и на основе kafka
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}
После того, как вы это сделаете, вы сможете найти журналы в формате записи объектов javascript или json.
Различные операции и команды, связанные с журналами Apache Kafka
В Apache Kafka вы можете выполнять столько команд, сколько хотите для реализации различных операций. Регистрация будет происходить в фоновом режиме.
- Регистрация сегментов — используйте этот код
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]
- Создать новый экземпляр журнала
apply(
dir:
File,config: LogConfig,

logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
- Чтение записей
addAbortedTransactions
(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
read(
startOffset: Long,
maxLength: Int,
maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
- Добавление записей
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
- Очистить сегменты и построить карту смещения
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
- Удаление сегментов
roll(
expectedNextOffset: Option[Long] = None): LogSegment
asyncDeleteSegment(segment: LogSegment): Unit
deleteSeg(): Unit
- Создать новый файл журнала
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
- Откройте новый сегмент журнала-
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile(
dir: File,
offset: Long,
суффикс: Строка = ""): Файл
- Закрыть журнал
close(): Unit
- Восстановление и перестроение сегментов
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
- Добавить или преобразовать сегменты:
addSegment(
segment: LogSegment): LogSegment
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
- Загрузить раздел
parseTopicPartitionName(dir: File): TopicPartition
- Изменить файл конфигурации
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
- Усечение операции:
truncateTo(targetOffset: Long): Boolean
truncateFullyAndStartAt(newOffset: Long): Unit