Apache Kafka Günlükleri: Günlük Veri Yapısı ve Yönetimi için Kapsamlı Bir Kılavuz?

Yayınlanan: 2021-12-24

Veriler, on yıl önce olduğu gibi aynı değil. Verileri kullanılabilir bilgilere dönüştürmek, hayal edilebilecekten çok daha zordur. Linkedin tarafından oluşturulan ancak şimdi açık kaynak geliştirme olarak değiştirilen Apache Kafka, verileri yönetmek ve onlardan iyi yararlanmak için mükemmel bir ücretsiz araçtır. Günümüz dijital dünyasında veriler çok önemlidir, gerçeklik algımızı yönlendiren verilerdir.

Apache Kafka tam olarak bu karmaşık sorunu çözmek için yapıldı. Kullanıcıların tek bir kaynaktan (üretici) veya birden çok kaynaktan alınan verileri depolamasına, analiz etmesine ve okumasına olanak tanıyan gerçek zamanlı (10 ms'den az) bir veri akışı yazılımıdır. Temel olarak, verilerin mümkün olan en hızlı zaman diliminde anlamlı kanallara dağıtılmasına yardımcı olur. Örneğin- Bir kriket maçında gerçek zamanlı skoru kontrol eden ve bu bilgiyi kanallara ileten bir kaynak(yapımcı) vardır. Kanallar, daha sonra bilgiyi son tüketicilere en verimli şekilde sağlayacak olan aracılar gibidir. Apache Kafka, tüm bunların gerçekleştiği ortamdır.

  • Apache Kafka, sistem düzeyinde önemli bir kesinti olmaksızın kolayca ölçeklenebilir.
  • Apache Kafka, veri iletmek için birden çok aracı kullandığından hataya dayanıklı bir sistemdir, yani bir aracı çevrimdışı olursa, her zaman aynı verilere sahip olan çoğaltılmış bir aracı vardır.
  • Kafka'ya dayalı uygulamalar oluştururken Kerberos gibi güvenlik araçları kullanılabilir.

Size kolaylık sağlamak için, doğrudan Apache kafka'yı buradan alabilirsiniz.

Apache Kafka Günlükleri Nedir?

  • Apache Kafka Günlükleri, diskinizde bulunan çeşitli veri bölümlerinden oluşan bir koleksiyondur. Çeşitli veri bölümlerinin tümü, biçim konusu veya belirli konu bölümü olan adlara sahiptir.
  • Apache Kafka ayrıca dağıtılmış bir sistem için harici bir günlük işleyerek veri düğümlerini çoğaltmamıza da olanak tanır. Bu mekanizma, ihtiyacımız olduğunda verileri sadece okumaktan ayrı olarak geri yüklememizi sağlar.

Markus Spiske'nin Pexels'daki fotoğrafı

Apache Kafka Günlükleri ile çalışırken Hatırlanması Gerekenler?

  • Yalnızca operasyonel amaçlar için kullanılacak fazla veri veya verilerde oturum açmaktan kaçınmayı unutmayın.
  • Normalde modül başlatma(lar)ının veya kapatma(lar)ının en başında ve sonunda günlük tabanlı girişler oluşturmayı seçebilirsiniz, ancak özel günlükler de oluşturabilirsiniz. Özel Günlükler, bir güncelleme gerçekleştirmek için belirli bir aşamanın başında ve sonunda kullanılabilir.

Apache Kafka'da Günlükler nasıl etkinleştirilir?

İşte apache kafka'da günlükleri başlatmak için betiğinize girmeniz gereken bir günlük yapılandırması.

# Hem dosya hem de kafka tabanlı günlük kaydını etkinleştir

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

Bunu yaptıktan sonra, günlükleri javascript nesne gösterimi biçiminde veya json'da bulabilmeniz gerekir.

Apache Kafka Günlükleriyle İlişkili Çeşitli İşlemler ve Komutlar

Apache Kafka'da çeşitli işlemleri uygulamak için istediğiniz kadar komut çalıştırabilirsiniz. Günlük kaydı arka planda gerçekleşecektir.

  1. Günlük Segmentleri – Bu kodu kullanın

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. Yeni bir günlük örneği oluşturun

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. Kayıtları Okuma

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. Kayıtları Eklemek

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. Segmentleri temizleyin ve bir ofset haritası oluşturun

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. Segmentleri Silme

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. Yeni bir kayıt dosyası oluştur

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. Yeni bir günlük segmenti açın-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

sonek: Dize = “”): Dosya

  1. Bir günlüğü kapat

close(): Unit

  1. Segmentleri kurtarma ve yeniden oluşturma

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. Segmentleri ekleyin veya dönüştürün:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. Bir bölüm yükle

parseTopicPartitionName(dir: File): TopicPartition

  1. Yapılandırma dosyasını değiştirin

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. Bir işlemi kes:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit