Journaux Apache Kafka : un guide complet sur la structure et la gestion des données de journal ?
Publié: 2021-12-24Les données ne sont plus les mêmes qu'il y a dix ans. Transformer des données en informations utilisables est beaucoup plus difficile qu'on ne l'imagine. Apache Kafka, créé par Linkedin mais maintenant passé au développement open source, est un excellent outil gratuit pour gérer les données et en faire bon usage. Dans le monde numérique d'aujourd'hui, les données sont très importantes, ce sont les données qui guident notre perception de la réalité.
Apache Kafka a été justement fait pour résoudre ce problème compliqué. Il s'agit d'un logiciel de diffusion de données en temps réel (moins de 10 ms) qui permet aux utilisateurs de stocker, d'analyser et de lire les données reçues d'une seule source (producteur) ou de plusieurs. Cela aide essentiellement à distribuer les données à des canaux significatifs dans les délais les plus rapides possibles. Par exemple, dans un match de cricket, il y a une source (producteur) qui vérifie le score en temps réel et transmet cette information aux chaînes. Les canaux sont comme des courtiers qui fourniront ensuite les informations aux consommateurs finaux de la manière la plus efficace. Apache Kafka est ce média où toutes ces choses se produisent.
- Apache Kafka peut très facilement être mis à l'échelle sans aucun temps d'arrêt significatif au niveau du système.
- Apache Kafka est un système tolérant aux pannes car il utilise plusieurs courtiers pour transmettre des données, ce qui signifie que si un courtier se déconnecte, il y a toujours un courtier répliqué qui a les mêmes données stockées.
- Des outils de sécurité comme Kerberos peuvent être utilisés lors de la création d'applications basées sur Kafka.
Pour votre commodité, vous pouvez obtenir directement l'apprentissage Apache kafka ici
Que sont les journaux Apache Kafka ?
- Les journaux Apache Kafka sont une collection de divers segments de données présents sur votre disque. Tous les différents segments de données ont des noms qui sont soit une partition de sujet de formulaire, soit une partition de sujet spécifique.
- Apache Kafka nous permet également de répliquer des nœuds de données en validant un journal externe pour un système distribué. Ce mécanisme nous permet de restaurer des données en dehors de la simple lecture, chaque fois que nous en avons besoin.

Photo de Markus Spiske provenant de Pexels
Choses à retenir lorsque vous travaillez avec Apache Kafka Logs ?
- N'oubliez pas d'éviter de vous connecter à des données redondantes ou à des données qui ne doivent être utilisées qu'à des fins opérationnelles.
- Normalement, vous pouvez choisir de créer des entrées basées sur le journal au tout début et à la fin du ou des démarrages ou arrêts d'un module, mais vous pouvez également créer des journaux spéciaux. Des journaux spéciaux peuvent être utilisés au début et à la fin d'une phase spécifique pour effectuer une mise à jour.
Comment activer les journaux dans Apache Kafka ?
Voici une configuration de journal que vous devez entrer dans votre script pour démarrer les journaux dans apache kafka-
# Activer la journalisation basée sur les fichiers et kafka
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}
Une fois que vous avez fait cela, vous devriez pouvoir trouver les journaux au format de notation d'objet javascript ou json.
Diverses opérations et commandes associées aux journaux Apache Kafka
Dans Apache Kafka, vous pouvez exécuter autant de commandes que vous le souhaitez pour implémenter diverses opérations. La journalisation aura lieu en arrière-plan.
- Segments de journalisation - Utilisez ce code
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

- Créer une nouvelle instance de journal
apply(
dir:
File,config: LogConfig,
logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
- Lire des enregistrements
addAbortedTransactions
(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
read(
startOffset: Long,
maxLength: Int,
maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
- Ajout d'enregistrements
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
- Nettoyer les segments et créer une carte de décalage
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
- Suppression de segments
roll(
expectedNextOffset: Option[Long] = None): LogSegment
asyncDeleteSegment(segment: LogSegment): Unit
deleteSeg(): Unit
- Créer un nouveau fichier journal
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
- Ouvrir un nouveau segment de journal-
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile(
dir: File,
offset: Long,
suffixe : Chaîne = "") : Fichier
- Fermer un journal
close(): Unit
- Récupérer et reconstruire des segments
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
- Ajouter ou convertir des segments :
addSegment(
segment: LogSegment): LogSegment
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
- Charger une partition
parseTopicPartitionName(dir: File): TopicPartition
- Modifier le fichier de configuration
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
- Tronquer une opération :
truncateTo(targetOffset: Long): Boolean
truncateFullyAndStartAt(newOffset: Long): Unit