Registros de Apache Kafka: ¿una guía completa para la estructura y administración de datos de registro?
Publicado: 2021-12-24Los datos no son los mismos que solían ser hace una década. Procesar datos en información utilizable es mucho más difícil de lo imaginable. Apache Kafka, creado por Linkedin pero ahora cambiado a desarrollo de código abierto, es una excelente herramienta gratuita para administrar datos y hacer un buen uso de ellos. En el mundo digital actual, los datos son muy importantes, son los datos que impulsan nuestra percepción de la realidad.
Apache Kafka fue creado precisamente para resolver este complicado problema. Es un software de transmisión de datos en tiempo real (menos de 10 ms) que permite a los usuarios almacenar, analizar y leer datos recibidos de una sola fuente (productor) o múltiples. Esencialmente, ayuda a distribuir los datos a canales significativos en el marco de tiempo más rápido posible. Por ejemplo, en un partido de cricket hay una fuente (productor) que verifica la puntuación en tiempo real y pasa esta información a los canales. Los canales son como intermediarios que luego suministrarán la información a los consumidores finales de la manera más eficiente. Apache Kafka es ese medio donde suceden todas estas cosas.
- Apache Kafka se puede escalar muy fácilmente sin ningún tiempo de inactividad significativo a nivel del sistema.
- Apache Kafka es un sistema tolerante a fallas, ya que utiliza múltiples intermediarios para transmitir datos, lo que significa que si un intermediario se desconecta, siempre hay un intermediario replicado que tiene los mismos datos almacenados.
- Las herramientas de seguridad como Kerberos se pueden usar al crear aplicaciones basadas en Kafka.
Para su comodidad, puede obtener directamente aprender Apache kafka aquí
¿Qué son los registros de Apache Kafka?
- Los registros de Apache Kafka son una colección de varios segmentos de datos presentes en su disco. Todos los diversos segmentos de datos tienen nombres que son partición de tema de formulario o de tema específico.
- Apache Kafka también nos permite replicar nodos de datos al enviar un registro externo para un sistema distribuido. Este mecanismo nos permite restaurar datos además de leerlos, siempre que lo necesitemos.

Foto de Markus Spiske de Pexels
¿Cosas para recordar mientras trabaja con Apache Kafka Logs?
- Recuerde evitar iniciar sesión en datos redundantes o datos que solo se utilizarán con fines operativos.
- Normalmente, puede optar por crear entradas basadas en registros al principio y al final de los inicios o apagados de un módulo, pero también puede crear registros especiales. Los Registros Especiales se pueden utilizar al inicio y al final de una fase específica para realizar una actualización.
¿Cómo habilitar los registros en Apache Kafka?
Aquí hay una configuración de registro que debe ingresar en su secuencia de comandos para iniciar registros en apache kafka-
# Habilite el registro basado en archivos y kafka
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}
Una vez que haya hecho esto, debería poder encontrar los registros en formato de notación de objetos javascript o json.
Varias operaciones y comandos asociados con los registros de Apache Kafka
En Apache Kafka, puede ejecutar tantos comandos como desee para implementar varias operaciones. El registro se realizará en segundo plano.
- Registro de segmentos: use este código
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]
- Crear una nueva instancia de registro
apply(
dir:

File,config: LogConfig,
logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
- Registros de lectura
addAbortedTransactions
(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
read(
startOffset: Long,
maxLength: Int,
maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
- Agregar registros
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
- Limpiar segmentos y construir un mapa de compensación
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
- Eliminación de segmentos
roll(
expectedNextOffset: Option[Long] = None): LogSegment
asyncDeleteSegment(segment: LogSegment): Unit
deleteSeg(): Unit
- Crear un nuevo archivo de registro
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
- Abrir un nuevo segmento de registro-
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile(
dir: File,
offset: Long,
sufijo: Cadena = “”): Archivo
- cerrar un registro
close(): Unit
- Recuperar y reconstruir segmentos
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
- Agregar o convertir segmentos:
addSegment(
segment: LogSegment): LogSegment
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
- Cargar una partición
parseTopicPartitionName(dir: File): TopicPartition
- Modificar el archivo de configuración
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
- Truncar una operación:
truncateTo(targetOffset: Long): Boolean
truncateFullyAndStartAt(newOffset: Long): Unit