Apache Kafka Logs: คู่มือที่ครอบคลุมเกี่ยวกับโครงสร้างและการจัดการข้อมูลบันทึก?
เผยแพร่แล้ว: 2021-12-24ข้อมูลไม่เหมือนกับที่เคยเป็นเมื่อสิบปีก่อน การประมวลผลข้อมูลให้เป็นข้อมูลที่ใช้งานได้นั้นยากกว่าที่จะจินตนาการได้มาก Apache Kafka สร้างขึ้นโดย Linkedin แต่ตอนนี้เปลี่ยนเป็นการพัฒนาโอเพ่นซอร์ส เป็นเครื่องมือฟรีที่ยอดเยี่ยมสำหรับการจัดการข้อมูลและใช้ประโยชน์จากข้อมูลเหล่านี้ ในทุกวันนี้ข้อมูลโลกดิจิทัลมีความสำคัญมาก เป็นข้อมูลที่ขับเคลื่อนการรับรู้ถึงความเป็นจริงของเรา
Apache Kafka ถูกสร้างขึ้นมาอย่างแม่นยำเพื่อแก้ปัญหาที่ซับซ้อนนี้ เป็นซอฟต์แวร์สตรีมข้อมูลแบบเรียลไทม์ (น้อยกว่า 10 มิลลิวินาที) ที่ช่วยให้ผู้ใช้สามารถจัดเก็บ วิเคราะห์ และอ่านข้อมูลที่ได้รับจากแหล่งเดียว (ผู้ผลิต) หรือหลายแหล่ง โดยพื้นฐานแล้วจะช่วยในการกระจายข้อมูลไปยังช่องทางที่มีความหมายในกรอบเวลาที่เร็วที่สุด ตัวอย่างเช่น- ในการแข่งขันคริกเก็ตจะมีแหล่ง (ผู้ผลิต) ที่ตรวจสอบคะแนนแบบเรียลไทม์และส่งข้อมูลนี้ไปยังช่องต่างๆ ช่องทางต่างๆ เปรียบเสมือนนายหน้าที่จะให้ข้อมูลแก่ผู้บริโภคปลายทางอย่างมีประสิทธิภาพสูงสุด Apache Kafka เป็นสื่อกลางที่สิ่งเหล่านี้เกิดขึ้น
- Apache Kafka สามารถปรับขนาดได้ง่ายมากโดยไม่ต้องหยุดทำงานในระดับระบบที่สำคัญ
- Apache Kafka เป็นระบบที่ทนทานต่อข้อผิดพลาด เนื่องจากใช้โบรกเกอร์หลายตัวในการส่งข้อมูล หมายความว่าหากนายหน้ารายหนึ่งออฟไลน์ จะมีนายหน้าจำลองที่มีข้อมูลเดียวกันอยู่เสมอ
- เครื่องมือรักษาความปลอดภัย เช่น Kerberos สามารถใช้ในขณะที่สร้างแอปพลิเคชันตาม Kafka
เพื่อความสะดวก คุณสามารถรับการเรียนรู้ Apache kafka ได้โดยตรงที่นี่
บันทึก Apache Kafka คืออะไร?
- Apache Kafka Logs คือชุดของส่วนข้อมูลต่างๆ ที่มีอยู่ในดิสก์ของคุณ เซ็กเมนต์ข้อมูลต่างๆ ทั้งหมดมีชื่อที่เป็นพาร์ติชันของหัวข้อที่มีรูปแบบหรือหัวข้อเฉพาะ
- Apache Kafka ยังอนุญาตให้เราจำลองโหนดข้อมูลโดยส่งบันทึกภายนอกสำหรับระบบแบบกระจาย กลไกนี้ช่วยให้เรากู้คืนข้อมูลได้ ยกเว้นแค่อ่านข้อมูลเมื่อไรก็ตามที่เราต้องการ

ภาพถ่ายโดย Markus Spiske จาก Pexels
สิ่งที่ต้องจำขณะทำงานกับ Apache Kafka Logs?
- อย่าลืมหลีกเลี่ยงการเข้าสู่ระบบข้อมูลซ้ำซ้อนหรือข้อมูลที่ใช้เพื่อวัตถุประสงค์ในการดำเนินงานเท่านั้น
- โดยปกติ คุณสามารถเลือกสร้างรายการตามบันทึกที่จุดเริ่มต้นและจุดสิ้นสุดของการเริ่มต้นหรือปิดโมดูล แต่คุณสามารถสร้างบันทึกพิเศษได้เช่นกัน คุณสามารถใช้บันทึกพิเศษในตอนเริ่มต้นและตอนท้ายของเฟสเฉพาะเพื่อดำเนินการอัปเดต
วิธีเปิดใช้งานบันทึกใน Apache Kafka
นี่คือการกำหนดค่าบันทึกที่คุณต้องป้อนลงในสคริปต์เพื่อเริ่มบันทึกใน apache kafka-
# เปิดใช้งานทั้งไฟล์และการบันทึกตาม kafka
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}
เมื่อคุณทำสิ่งนี้เสร็จแล้ว คุณควรจะสามารถค้นหาบันทึกในรูปแบบสัญลักษณ์อ็อบเจกต์จาวาสคริปต์หรือ json
การทำงานและคำสั่งต่างๆ ที่เกี่ยวข้องกับ Apache Kafka Logs
ใน Apache Kafka คุณสามารถรันคำสั่งต่างๆ ได้มากเท่าที่คุณต้องการเพื่อใช้งานการดำเนินการต่างๆ การบันทึกจะเกิดขึ้นในพื้นหลัง
- ส่วนการบันทึก – ใช้รหัสนี้
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]
- สร้างอินสแตนซ์บันทึกใหม่
apply(
dir:
File,config: LogConfig,
logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
- การอ่านบันทึก
addAbortedTransactions
(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
read(
startOffset: Long,
maxLength: Int,
maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
- ต่อท้ายบันทึก
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
- ทำความสะอาดเซกเมนต์และสร้างออฟเซ็ตแมป
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
- การลบกลุ่ม
roll(
expectedNextOffset: Option[Long] = None): LogSegment
asyncDeleteSegment(segment: LogSegment): Unit
deleteSeg(): Unit
- สร้างไฟล์บันทึกใหม่
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
- เปิดส่วนบันทึกใหม่-
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile(
dir: File,
offset: Long,
คำต่อท้าย: String = “”): File
- ปิดบันทึก
close(): Unit
- กู้คืนและสร้างกลุ่มใหม่
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
- เพิ่มหรือแปลงกลุ่ม:
addSegment(
segment: LogSegment): LogSegment
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
- โหลดพาร์ติชั่น
parseTopicPartitionName(dir: File): TopicPartition
- แก้ไขไฟล์การกำหนดค่า
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
- ตัดทอนการดำเนินการ:
truncateTo(targetOffset: Long): Boolean
truncateFullyAndStartAt(newOffset: Long): Unit