سجلات أباتشي كافكا: دليل شامل لهيكل بيانات السجل وإدارتها؟

نشرت: 2021-12-24

البيانات ليست هي نفسها التي كانت عليها قبل عقد من الزمان. تعد معالجة البيانات إلى معلومات قابلة للاستخدام أصعب بكثير مما يمكن تخيله. يعد Apache Kafka ، الذي تم إنشاؤه بواسطة Linkedin ولكن تم تغييره الآن إلى تطوير مفتوح المصدر ، أداة مجانية ممتازة لإدارة البيانات والاستفادة منها بشكل جيد. تعتبر البيانات في عالم اليوم الرقمي مهمة للغاية ، فهي البيانات التي تقود تصورنا للواقع.

صُنع أباتشي كافكا بدقة لحل هذه المشكلة المعقدة. إنه برنامج دفق بيانات في الوقت الفعلي (أقل من 10 مللي ثانية) يسمح للمستخدمين بتخزين وتحليل وقراءة البيانات الواردة من مصدر واحد (منتج) أو متعدد. يساعد بشكل أساسي في توزيع البيانات على قنوات ذات مغزى في أسرع إطار زمني ممكن. على سبيل المثال- في مباراة كريكيت ، يوجد مصدر (منتج) يتحقق من النتيجة في الوقت الفعلي ويمرر هذه المعلومات إلى القنوات. القنوات مثل الوسطاء الذين سيقومون بعد ذلك بتوفير المعلومات للمستهلكين النهائيين بالطريقة الأكثر فعالية. أباتشي كافكا هي تلك الوسيلة التي تحدث فيها كل هذه الأشياء.

  • يمكن بسهولة تحجيم Apache Kafka دون أي تعطل كبير على مستوى النظام.
  • يعد Apache Kafka نظامًا متسامحًا مع الأخطاء نظرًا لأنه يستخدم وسطاء متعددين لنقل البيانات ، مما يعني أنه إذا توقف أحد الوسطاء عن الاتصال بالإنترنت ، فهناك دائمًا وسيط مكرر لديه نفس البيانات المخزنة.
  • يمكن استخدام أدوات الأمان مثل Kerberos أثناء إنشاء تطبيقات تعتمد على كافكا.

لراحتك ، يمكنك الحصول مباشرة على تعلم Apache kafka هنا

ما هي سجلات أباتشي كافكا؟

  • Apache Kafka Logs هي مجموعة من مقاطع البيانات المختلفة الموجودة على القرص الخاص بك. تحتوي جميع مقاطع البيانات المختلفة على أسماء إما قسم موضوع النموذج أو قسم موضوع محدد.
  • يسمح لنا Apache Kafka أيضًا بتكرار عقد البيانات عن طريق إنشاء سجل خارجي لنظام موزع. تسمح لنا هذه الآلية باستعادة البيانات بعيدًا عن مجرد قراءتها ، متى احتجنا إليها.

تصوير ماركوس سبيسكي من شركة Pexels

أشياء يجب تذكرها أثناء العمل مع Apache Kafka Logs؟

  • تذكر أن تتجنب تسجيل الدخول إلى البيانات أو البيانات الزائدة عن الحاجة والتي سيتم استخدامها فقط للأغراض التشغيلية.
  • يمكنك عادةً اختيار إنشاء إدخالات تستند إلى السجل في بداية ونهاية بدء تشغيل (عمليات) الوحدة النمطية أو إيقاف التشغيل (عمليات إيقاف التشغيل) ، ولكن يمكنك أيضًا إنشاء سجلات خاصة. يمكن استخدام السجلات الخاصة في بداية ونهاية مرحلة معينة لإجراء تحديث.

كيفية تمكين السجلات في Apache Kafka؟

فيما يلي تكوين السجل الذي تحتاج إلى إدخاله في البرنامج النصي الخاص بك لبدء السجلات في apache kafka-

# تمكين التسجيل المستند إلى ملف و kafka

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

بمجرد القيام بذلك ، يجب أن تكون قادرًا على العثور على السجلات بتنسيق تدوين كائن javascript أو json.

عمليات وأوامر مختلفة مرتبطة بسجلات أباتشي كافكا

في Apache Kafka ، يمكنك تنفيذ العديد من الأوامر كما تريد لتنفيذ عمليات مختلفة. سيتم التسجيل في الخلفية.

  1. قطع التسجيل - استخدم هذا الرمز

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. إنشاء مثيل سجل جديد

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. سجلات القراءة

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. إلحاق السجلات

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. شرائح نظيفة وبناء خريطة تعويض

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. حذف الشرائح

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. إنشاء ملف سجل جديد

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. افتح مقطع سجل جديد-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

اللاحقة: String = ""): ملف

  1. أغلق السجل

close(): Unit

  1. استعادة وإعادة بناء الأجزاء

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. إضافة شرائح أو تحويلها:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. قم بتحميل قسم

parseTopicPartitionName(dir: File): TopicPartition

  1. قم بتعديل ملف التكوين

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. اقتطاع عملية:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit