Journaux Apache Kafka : un guide complet sur la structure et la gestion des données de journal ?

Publié: 2021-12-24

Les données ne sont plus les mêmes qu'il y a dix ans. Transformer des données en informations utilisables est beaucoup plus difficile qu'on ne l'imagine. Apache Kafka, créé par Linkedin mais maintenant passé au développement open source, est un excellent outil gratuit pour gérer les données et en faire bon usage. Dans le monde numérique d'aujourd'hui, les données sont très importantes, ce sont les données qui guident notre perception de la réalité.

Apache Kafka a été justement fait pour résoudre ce problème compliqué. Il s'agit d'un logiciel de diffusion de données en temps réel (moins de 10 ms) qui permet aux utilisateurs de stocker, d'analyser et de lire les données reçues d'une seule source (producteur) ou de plusieurs. Cela aide essentiellement à distribuer les données à des canaux significatifs dans les délais les plus rapides possibles. Par exemple, dans un match de cricket, il y a une source (producteur) qui vérifie le score en temps réel et transmet cette information aux chaînes. Les canaux sont comme des courtiers qui fourniront ensuite les informations aux consommateurs finaux de la manière la plus efficace. Apache Kafka est ce média où toutes ces choses se produisent.

  • Apache Kafka peut très facilement être mis à l'échelle sans aucun temps d'arrêt significatif au niveau du système.
  • Apache Kafka est un système tolérant aux pannes car il utilise plusieurs courtiers pour transmettre des données, ce qui signifie que si un courtier se déconnecte, il y a toujours un courtier répliqué qui a les mêmes données stockées.
  • Des outils de sécurité comme Kerberos peuvent être utilisés lors de la création d'applications basées sur Kafka.

Pour votre commodité, vous pouvez obtenir directement l'apprentissage Apache kafka ici

Que sont les journaux Apache Kafka ?

  • Les journaux Apache Kafka sont une collection de divers segments de données présents sur votre disque. Tous les différents segments de données ont des noms qui sont soit une partition de sujet de formulaire, soit une partition de sujet spécifique.
  • Apache Kafka nous permet également de répliquer des nœuds de données en validant un journal externe pour un système distribué. Ce mécanisme nous permet de restaurer des données en dehors de la simple lecture, chaque fois que nous en avons besoin.

Photo de Markus Spiske provenant de Pexels

Choses à retenir lorsque vous travaillez avec Apache Kafka Logs ?

  • N'oubliez pas d'éviter de vous connecter à des données redondantes ou à des données qui ne doivent être utilisées qu'à des fins opérationnelles.
  • Normalement, vous pouvez choisir de créer des entrées basées sur le journal au tout début et à la fin du ou des démarrages ou arrêts d'un module, mais vous pouvez également créer des journaux spéciaux. Des journaux spéciaux peuvent être utilisés au début et à la fin d'une phase spécifique pour effectuer une mise à jour.

Comment activer les journaux dans Apache Kafka ?

Voici une configuration de journal que vous devez entrer dans votre script pour démarrer les journaux dans apache kafka-

# Activer la journalisation basée sur les fichiers et kafka

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

Une fois que vous avez fait cela, vous devriez pouvoir trouver les journaux au format de notation d'objet javascript ou json.

Diverses opérations et commandes associées aux journaux Apache Kafka

Dans Apache Kafka, vous pouvez exécuter autant de commandes que vous le souhaitez pour implémenter diverses opérations. La journalisation aura lieu en arrière-plan.

  1. Segments de journalisation - Utilisez ce code

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. Créer une nouvelle instance de journal

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. Lire des enregistrements

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. Ajout d'enregistrements

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. Nettoyer les segments et créer une carte de décalage

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. Suppression de segments

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. Créer un nouveau fichier journal

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. Ouvrir un nouveau segment de journal-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

suffixe : Chaîne = "") : Fichier

  1. Fermer un journal

close(): Unit

  1. Récupérer et reconstruire des segments

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. Ajouter ou convertir des segments :

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. Charger une partition

parseTopicPartitionName(dir: File): TopicPartition

  1. Modifier le fichier de configuration

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. Tronquer une opération :

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit