Apache Kafka Günlükleri: Günlük Veri Yapısı ve Yönetimi için Kapsamlı Bir Kılavuz?
Yayınlanan: 2021-12-24Veriler, on yıl önce olduğu gibi aynı değil. Verileri kullanılabilir bilgilere dönüştürmek, hayal edilebilecekten çok daha zordur. Linkedin tarafından oluşturulan ancak şimdi açık kaynak geliştirme olarak değiştirilen Apache Kafka, verileri yönetmek ve onlardan iyi yararlanmak için mükemmel bir ücretsiz araçtır. Günümüz dijital dünyasında veriler çok önemlidir, gerçeklik algımızı yönlendiren verilerdir.
Apache Kafka tam olarak bu karmaşık sorunu çözmek için yapıldı. Kullanıcıların tek bir kaynaktan (üretici) veya birden çok kaynaktan alınan verileri depolamasına, analiz etmesine ve okumasına olanak tanıyan gerçek zamanlı (10 ms'den az) bir veri akışı yazılımıdır. Temel olarak, verilerin mümkün olan en hızlı zaman diliminde anlamlı kanallara dağıtılmasına yardımcı olur. Örneğin- Bir kriket maçında gerçek zamanlı skoru kontrol eden ve bu bilgiyi kanallara ileten bir kaynak(yapımcı) vardır. Kanallar, daha sonra bilgiyi son tüketicilere en verimli şekilde sağlayacak olan aracılar gibidir. Apache Kafka, tüm bunların gerçekleştiği ortamdır.
- Apache Kafka, sistem düzeyinde önemli bir kesinti olmaksızın kolayca ölçeklenebilir.
- Apache Kafka, veri iletmek için birden çok aracı kullandığından hataya dayanıklı bir sistemdir, yani bir aracı çevrimdışı olursa, her zaman aynı verilere sahip olan çoğaltılmış bir aracı vardır.
- Kafka'ya dayalı uygulamalar oluştururken Kerberos gibi güvenlik araçları kullanılabilir.
Size kolaylık sağlamak için, doğrudan Apache kafka'yı buradan alabilirsiniz.
Apache Kafka Günlükleri Nedir?
- Apache Kafka Günlükleri, diskinizde bulunan çeşitli veri bölümlerinden oluşan bir koleksiyondur. Çeşitli veri bölümlerinin tümü, biçim konusu veya belirli konu bölümü olan adlara sahiptir.
- Apache Kafka ayrıca dağıtılmış bir sistem için harici bir günlük işleyerek veri düğümlerini çoğaltmamıza da olanak tanır. Bu mekanizma, ihtiyacımız olduğunda verileri sadece okumaktan ayrı olarak geri yüklememizi sağlar.

Markus Spiske'nin Pexels'daki fotoğrafı
Apache Kafka Günlükleri ile çalışırken Hatırlanması Gerekenler?
- Yalnızca operasyonel amaçlar için kullanılacak fazla veri veya verilerde oturum açmaktan kaçınmayı unutmayın.
- Normalde modül başlatma(lar)ının veya kapatma(lar)ının en başında ve sonunda günlük tabanlı girişler oluşturmayı seçebilirsiniz, ancak özel günlükler de oluşturabilirsiniz. Özel Günlükler, bir güncelleme gerçekleştirmek için belirli bir aşamanın başında ve sonunda kullanılabilir.
Apache Kafka'da Günlükler nasıl etkinleştirilir?
İşte apache kafka'da günlükleri başlatmak için betiğinize girmeniz gereken bir günlük yapılandırması.
# Hem dosya hem de kafka tabanlı günlük kaydını etkinleştir
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}
Bunu yaptıktan sonra, günlükleri javascript nesne gösterimi biçiminde veya json'da bulabilmeniz gerekir.
Apache Kafka Günlükleriyle İlişkili Çeşitli İşlemler ve Komutlar
Apache Kafka'da çeşitli işlemleri uygulamak için istediğiniz kadar komut çalıştırabilirsiniz. Günlük kaydı arka planda gerçekleşecektir.
- Günlük Segmentleri – Bu kodu kullanın
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]
- Yeni bir günlük örneği oluşturun
apply(
dir:
File,config: LogConfig,
logStartOffset: Long,

recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
- Kayıtları Okuma
addAbortedTransactions
(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
read(
startOffset: Long,
maxLength: Int,
maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
- Kayıtları Eklemek
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
- Segmentleri temizleyin ve bir ofset haritası oluşturun
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
- Segmentleri Silme
roll(
expectedNextOffset: Option[Long] = None): LogSegment
asyncDeleteSegment(segment: LogSegment): Unit
deleteSeg(): Unit
- Yeni bir kayıt dosyası oluştur
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
- Yeni bir günlük segmenti açın-
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile(
dir: File,
offset: Long,
sonek: Dize = “”): Dosya
- Bir günlüğü kapat
close(): Unit
- Segmentleri kurtarma ve yeniden oluşturma
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
- Segmentleri ekleyin veya dönüştürün:
addSegment(
segment: LogSegment): LogSegment
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
- Bir bölüm yükle
parseTopicPartitionName(dir: File): TopicPartition
- Yapılandırma dosyasını değiştirin
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
- Bir işlemi kes:
truncateTo(targetOffset: Long): Boolean
truncateFullyAndStartAt(newOffset: Long): Unit