سجلات أباتشي كافكا: دليل شامل لهيكل بيانات السجل وإدارتها؟
نشرت: 2021-12-24البيانات ليست هي نفسها التي كانت عليها قبل عقد من الزمان. تعد معالجة البيانات إلى معلومات قابلة للاستخدام أصعب بكثير مما يمكن تخيله. يعد Apache Kafka ، الذي تم إنشاؤه بواسطة Linkedin ولكن تم تغييره الآن إلى تطوير مفتوح المصدر ، أداة مجانية ممتازة لإدارة البيانات والاستفادة منها بشكل جيد. تعتبر البيانات في عالم اليوم الرقمي مهمة للغاية ، فهي البيانات التي تقود تصورنا للواقع.
صُنع أباتشي كافكا بدقة لحل هذه المشكلة المعقدة. إنه برنامج دفق بيانات في الوقت الفعلي (أقل من 10 مللي ثانية) يسمح للمستخدمين بتخزين وتحليل وقراءة البيانات الواردة من مصدر واحد (منتج) أو متعدد. يساعد بشكل أساسي في توزيع البيانات على قنوات ذات مغزى في أسرع إطار زمني ممكن. على سبيل المثال- في مباراة كريكيت ، يوجد مصدر (منتج) يتحقق من النتيجة في الوقت الفعلي ويمرر هذه المعلومات إلى القنوات. القنوات مثل الوسطاء الذين سيقومون بعد ذلك بتوفير المعلومات للمستهلكين النهائيين بالطريقة الأكثر فعالية. أباتشي كافكا هي تلك الوسيلة التي تحدث فيها كل هذه الأشياء.
- يمكن بسهولة تحجيم Apache Kafka دون أي تعطل كبير على مستوى النظام.
- يعد Apache Kafka نظامًا متسامحًا مع الأخطاء نظرًا لأنه يستخدم وسطاء متعددين لنقل البيانات ، مما يعني أنه إذا توقف أحد الوسطاء عن الاتصال بالإنترنت ، فهناك دائمًا وسيط مكرر لديه نفس البيانات المخزنة.
- يمكن استخدام أدوات الأمان مثل Kerberos أثناء إنشاء تطبيقات تعتمد على كافكا.
لراحتك ، يمكنك الحصول مباشرة على تعلم Apache kafka هنا
ما هي سجلات أباتشي كافكا؟
- Apache Kafka Logs هي مجموعة من مقاطع البيانات المختلفة الموجودة على القرص الخاص بك. تحتوي جميع مقاطع البيانات المختلفة على أسماء إما قسم موضوع النموذج أو قسم موضوع محدد.
- يسمح لنا Apache Kafka أيضًا بتكرار عقد البيانات عن طريق إنشاء سجل خارجي لنظام موزع. تسمح لنا هذه الآلية باستعادة البيانات بعيدًا عن مجرد قراءتها ، متى احتجنا إليها.

تصوير ماركوس سبيسكي من شركة Pexels
أشياء يجب تذكرها أثناء العمل مع Apache Kafka Logs؟
- تذكر أن تتجنب تسجيل الدخول إلى البيانات أو البيانات الزائدة عن الحاجة والتي سيتم استخدامها فقط للأغراض التشغيلية.
- يمكنك عادةً اختيار إنشاء إدخالات تستند إلى السجل في بداية ونهاية بدء تشغيل (عمليات) الوحدة النمطية أو إيقاف التشغيل (عمليات إيقاف التشغيل) ، ولكن يمكنك أيضًا إنشاء سجلات خاصة. يمكن استخدام السجلات الخاصة في بداية ونهاية مرحلة معينة لإجراء تحديث.
كيفية تمكين السجلات في Apache Kafka؟
فيما يلي تكوين السجل الذي تحتاج إلى إدخاله في البرنامج النصي الخاص بك لبدء السجلات في apache kafka-
# تمكين التسجيل المستند إلى ملف و kafka
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}
بمجرد القيام بذلك ، يجب أن تكون قادرًا على العثور على السجلات بتنسيق تدوين كائن javascript أو json.
عمليات وأوامر مختلفة مرتبطة بسجلات أباتشي كافكا
في Apache Kafka ، يمكنك تنفيذ العديد من الأوامر كما تريد لتنفيذ عمليات مختلفة. سيتم التسجيل في الخلفية.
- قطع التسجيل - استخدم هذا الرمز
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]
- إنشاء مثيل سجل جديد
apply(
dir:
File,config: LogConfig,
logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
- سجلات القراءة
addAbortedTransactions
(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
read(
startOffset: Long,
maxLength: Int,

maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
- إلحاق السجلات
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
- شرائح نظيفة وبناء خريطة تعويض
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
- حذف الشرائح
roll(
expectedNextOffset: Option[Long] = None): LogSegment
asyncDeleteSegment(segment: LogSegment): Unit
deleteSeg(): Unit
- إنشاء ملف سجل جديد
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
- افتح مقطع سجل جديد-
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile(
dir: File,
offset: Long,
اللاحقة: String = ""): ملف
- أغلق السجل
close(): Unit
- استعادة وإعادة بناء الأجزاء
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
- إضافة شرائح أو تحويلها:
addSegment(
segment: LogSegment): LogSegment
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
- قم بتحميل قسم
parseTopicPartitionName(dir: File): TopicPartition
- قم بتعديل ملف التكوين
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
- اقتطاع عملية:
truncateTo(targetOffset: Long): Boolean
truncateFullyAndStartAt(newOffset: Long): Unit