Apache Kafka Logs: คู่มือที่ครอบคลุมเกี่ยวกับโครงสร้างและการจัดการข้อมูลบันทึก?

เผยแพร่แล้ว: 2021-12-24

ข้อมูลไม่เหมือนกับที่เคยเป็นเมื่อสิบปีก่อน การประมวลผลข้อมูลให้เป็นข้อมูลที่ใช้งานได้นั้นยากกว่าที่จะจินตนาการได้มาก Apache Kafka สร้างขึ้นโดย Linkedin แต่ตอนนี้เปลี่ยนเป็นการพัฒนาโอเพ่นซอร์ส เป็นเครื่องมือฟรีที่ยอดเยี่ยมสำหรับการจัดการข้อมูลและใช้ประโยชน์จากข้อมูลเหล่านี้ ในทุกวันนี้ข้อมูลโลกดิจิทัลมีความสำคัญมาก เป็นข้อมูลที่ขับเคลื่อนการรับรู้ถึงความเป็นจริงของเรา

Apache Kafka ถูกสร้างขึ้นมาอย่างแม่นยำเพื่อแก้ปัญหาที่ซับซ้อนนี้ เป็นซอฟต์แวร์สตรีมข้อมูลแบบเรียลไทม์ (น้อยกว่า 10 มิลลิวินาที) ที่ช่วยให้ผู้ใช้สามารถจัดเก็บ วิเคราะห์ และอ่านข้อมูลที่ได้รับจากแหล่งเดียว (ผู้ผลิต) หรือหลายแหล่ง โดยพื้นฐานแล้วจะช่วยในการกระจายข้อมูลไปยังช่องทางที่มีความหมายในกรอบเวลาที่เร็วที่สุด ตัวอย่างเช่น- ในการแข่งขันคริกเก็ตจะมีแหล่ง (ผู้ผลิต) ที่ตรวจสอบคะแนนแบบเรียลไทม์และส่งข้อมูลนี้ไปยังช่องต่างๆ ช่องทางต่างๆ เปรียบเสมือนนายหน้าที่จะให้ข้อมูลแก่ผู้บริโภคปลายทางอย่างมีประสิทธิภาพสูงสุด Apache Kafka เป็นสื่อกลางที่สิ่งเหล่านี้เกิดขึ้น

  • Apache Kafka สามารถปรับขนาดได้ง่ายมากโดยไม่ต้องหยุดทำงานในระดับระบบที่สำคัญ
  • Apache Kafka เป็นระบบที่ทนทานต่อข้อผิดพลาด เนื่องจากใช้โบรกเกอร์หลายตัวในการส่งข้อมูล หมายความว่าหากนายหน้ารายหนึ่งออฟไลน์ จะมีนายหน้าจำลองที่มีข้อมูลเดียวกันอยู่เสมอ
  • เครื่องมือรักษาความปลอดภัย เช่น Kerberos สามารถใช้ในขณะที่สร้างแอปพลิเคชันตาม Kafka

เพื่อความสะดวก คุณสามารถรับการเรียนรู้ Apache kafka ได้โดยตรงที่นี่

บันทึก Apache Kafka คืออะไร?

  • Apache Kafka Logs คือชุดของส่วนข้อมูลต่างๆ ที่มีอยู่ในดิสก์ของคุณ เซ็กเมนต์ข้อมูลต่างๆ ทั้งหมดมีชื่อที่เป็นพาร์ติชันของหัวข้อที่มีรูปแบบหรือหัวข้อเฉพาะ
  • Apache Kafka ยังอนุญาตให้เราจำลองโหนดข้อมูลโดยส่งบันทึกภายนอกสำหรับระบบแบบกระจาย กลไกนี้ช่วยให้เรากู้คืนข้อมูลได้ ยกเว้นแค่อ่านข้อมูลเมื่อไรก็ตามที่เราต้องการ

ภาพถ่ายโดย Markus Spiske จาก Pexels

สิ่งที่ต้องจำขณะทำงานกับ Apache Kafka Logs?

  • อย่าลืมหลีกเลี่ยงการเข้าสู่ระบบข้อมูลซ้ำซ้อนหรือข้อมูลที่ใช้เพื่อวัตถุประสงค์ในการดำเนินงานเท่านั้น
  • โดยปกติ คุณสามารถเลือกสร้างรายการตามบันทึกที่จุดเริ่มต้นและจุดสิ้นสุดของการเริ่มต้นหรือปิดโมดูล แต่คุณสามารถสร้างบันทึกพิเศษได้เช่นกัน คุณสามารถใช้บันทึกพิเศษในตอนเริ่มต้นและตอนท้ายของเฟสเฉพาะเพื่อดำเนินการอัปเดต

วิธีเปิดใช้งานบันทึกใน Apache Kafka

นี่คือการกำหนดค่าบันทึกที่คุณต้องป้อนลงในสคริปต์เพื่อเริ่มบันทึกใน apache kafka-

# เปิดใช้งานทั้งไฟล์และการบันทึกตาม kafka

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

เมื่อคุณทำสิ่งนี้เสร็จแล้ว คุณควรจะสามารถค้นหาบันทึกในรูปแบบสัญลักษณ์อ็อบเจกต์จาวาสคริปต์หรือ json

การทำงานและคำสั่งต่างๆ ที่เกี่ยวข้องกับ Apache Kafka Logs

ใน Apache Kafka คุณสามารถรันคำสั่งต่างๆ ได้มากเท่าที่คุณต้องการเพื่อใช้งานการดำเนินการต่างๆ การบันทึกจะเกิดขึ้นในพื้นหลัง

  1. ส่วนการบันทึก – ใช้รหัสนี้

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. สร้างอินสแตนซ์บันทึกใหม่

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. การอ่านบันทึก

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. ต่อท้ายบันทึก

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. ทำความสะอาดเซกเมนต์และสร้างออฟเซ็ตแมป

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. การลบกลุ่ม

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. สร้างไฟล์บันทึกใหม่

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. เปิดส่วนบันทึกใหม่-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

คำต่อท้าย: String = “”): File

  1. ปิดบันทึก

close(): Unit

  1. กู้คืนและสร้างกลุ่มใหม่

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. เพิ่มหรือแปลงกลุ่ม:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. โหลดพาร์ติชั่น

parseTopicPartitionName(dir: File): TopicPartition

  1. แก้ไขไฟล์การกำหนดค่า

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. ตัดทอนการดำเนินการ:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit