Registros de Apache Kafka: ¿una guía completa para la estructura y administración de datos de registro?

Publicado: 2021-12-24

Los datos no son los mismos que solían ser hace una década. Procesar datos en información utilizable es mucho más difícil de lo imaginable. Apache Kafka, creado por Linkedin pero ahora cambiado a desarrollo de código abierto, es una excelente herramienta gratuita para administrar datos y hacer un buen uso de ellos. En el mundo digital actual, los datos son muy importantes, son los datos que impulsan nuestra percepción de la realidad.

Apache Kafka fue creado precisamente para resolver este complicado problema. Es un software de transmisión de datos en tiempo real (menos de 10 ms) que permite a los usuarios almacenar, analizar y leer datos recibidos de una sola fuente (productor) o múltiples. Esencialmente, ayuda a distribuir los datos a canales significativos en el marco de tiempo más rápido posible. Por ejemplo, en un partido de cricket hay una fuente (productor) que verifica la puntuación en tiempo real y pasa esta información a los canales. Los canales son como intermediarios que luego suministrarán la información a los consumidores finales de la manera más eficiente. Apache Kafka es ese medio donde suceden todas estas cosas.

  • Apache Kafka se puede escalar muy fácilmente sin ningún tiempo de inactividad significativo a nivel del sistema.
  • Apache Kafka es un sistema tolerante a fallas, ya que utiliza múltiples intermediarios para transmitir datos, lo que significa que si un intermediario se desconecta, siempre hay un intermediario replicado que tiene los mismos datos almacenados.
  • Las herramientas de seguridad como Kerberos se pueden usar al crear aplicaciones basadas en Kafka.

Para su comodidad, puede obtener directamente aprender Apache kafka aquí

¿Qué son los registros de Apache Kafka?

  • Los registros de Apache Kafka son una colección de varios segmentos de datos presentes en su disco. Todos los diversos segmentos de datos tienen nombres que son partición de tema de formulario o de tema específico.
  • Apache Kafka también nos permite replicar nodos de datos al enviar un registro externo para un sistema distribuido. Este mecanismo nos permite restaurar datos además de leerlos, siempre que lo necesitemos.

Foto de Markus Spiske de Pexels

¿Cosas para recordar mientras trabaja con Apache Kafka Logs?

  • Recuerde evitar iniciar sesión en datos redundantes o datos que solo se utilizarán con fines operativos.
  • Normalmente, puede optar por crear entradas basadas en registros al principio y al final de los inicios o apagados de un módulo, pero también puede crear registros especiales. Los Registros Especiales se pueden utilizar al inicio y al final de una fase específica para realizar una actualización.

¿Cómo habilitar los registros en Apache Kafka?

Aquí hay una configuración de registro que debe ingresar en su secuencia de comandos para iniciar registros en apache kafka-

# Habilite el registro basado en archivos y kafka

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

Una vez que haya hecho esto, debería poder encontrar los registros en formato de notación de objetos javascript o json.

Varias operaciones y comandos asociados con los registros de Apache Kafka

En Apache Kafka, puede ejecutar tantos comandos como desee para implementar varias operaciones. El registro se realizará en segundo plano.

  1. Registro de segmentos: use este código

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. Crear una nueva instancia de registro

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. Registros de lectura

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. Agregar registros

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. Limpiar segmentos y construir un mapa de compensación

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. Eliminación de segmentos

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. Crear un nuevo archivo de registro

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. Abrir un nuevo segmento de registro-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

sufijo: Cadena = “”): Archivo

  1. cerrar un registro

close(): Unit

  1. Recuperar y reconstruir segmentos

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. Agregar o convertir segmentos:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. Cargar una partición

parseTopicPartitionName(dir: File): TopicPartition

  1. Modificar el archivo de configuración

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. Truncar una operación:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit