Apache Kafka Logs: 로그 데이터 구조 및 관리에 대한 종합 가이드?
게시 됨: 2021-12-24데이터는 10년 전과 같지 않습니다. 데이터를 사용 가능한 정보로 처리하는 것은 상상할 수 있는 것보다 훨씬 어렵습니다. Linkedin에서 만들었지만 이제 오픈 소스 개발로 변경된 Apache Kafka는 데이터를 관리하고 잘 활용하기 위한 훌륭한 무료 도구입니다. 오늘날의 디지털 세계에서 데이터는 매우 중요하며 현실에 대한 우리의 인식을 이끄는 것은 데이터입니다.
Apache Kafka는 이 복잡한 문제를 해결하기 위해 정확하게 만들어졌습니다. 단일 소스(프로듀서) 또는 다중 소스로부터 수신된 데이터를 사용자가 저장, 분석 및 읽을 수 있는 실시간(10ms 미만) 데이터 스트리밍 소프트웨어입니다. 이는 본질적으로 가장 빠른 시간 프레임에 의미 있는 채널에 데이터를 배포하는 데 도움이 됩니다. 예를 들어 - 크리켓 경기에는 실시간 점수를 확인하고 이 정보를 채널에 전달하는 소스(프로듀서)가 있습니다. 채널은 가장 효율적인 방법으로 최종 소비자에게 정보를 제공하는 브로커와 같습니다. Apache Kafka는 이러한 모든 일이 발생하는 매체입니다.
- Apache Kafka는 시스템 수준 다운타임 없이 매우 쉽게 확장할 수 있습니다.
- Apache Kafka는 여러 브로커를 사용하여 데이터를 전송하기 때문에 내결함성 시스템입니다. 즉, 한 브로커가 오프라인이 되면 동일한 데이터가 저장된 복제된 브로커가 항상 존재합니다.
- Kafka를 기반으로 애플리케이션을 구축하는 동안 Kerberos와 같은 보안 도구를 사용할 수 있습니다.
편의를 위해 여기에서 Apache kafka 학습을 직접 얻을 수 있습니다.
Apache Kafka 로그란 무엇입니까?
- Apache Kafka 로그는 디스크에 있는 다양한 데이터 세그먼트의 모음입니다. 모든 다양한 데이터 세그먼트에는 형식 주제 또는 특정 주제 파티션 이름이 있습니다.
- Apache Kafka를 사용하면 분산 시스템에 대한 외부 로그를 커밋하여 데이터 노드를 복제할 수도 있습니다. 이 메커니즘을 사용하면 필요할 때마다 데이터를 읽는 것 외에도 데이터를 복원할 수 있습니다.

Pexels의 Markus Spiske 사진
Apache Kafka Logs로 작업하는 동안 기억해야 할 사항은 무엇입니까?
- 중복 데이터 또는 운영 목적으로만 사용되는 데이터에 로그인하지 않도록 하십시오.
- 일반적으로 모듈 시작 또는 종료의 맨 처음과 끝에서 로그 기반 항목을 생성하도록 선택할 수 있지만 특수 로그를 생성할 수도 있습니다. 업데이트 수행을 위한 특정 단계의 시작과 끝에서 특수 로그를 사용할 수 있습니다.
Apache Kafka에서 로그를 활성화하는 방법은 무엇입니까?
다음은 Apache kafka-에서 로그를 시작하기 위해 스크립트에 입력해야 하는 로그 구성입니다.
# 파일 및 kafka 기반 로깅 모두 활성화
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}
이 작업을 완료하면 javascript 객체 표기 형식 또는 json에서 로그를 찾을 수 있습니다.
Apache Kafka Logs와 관련된 다양한 작업 및 명령
Apache Kafka에서는 다양한 작업을 구현하려는 만큼 명령을 실행할 수 있습니다. 로깅은 백그라운드에서 수행됩니다.
- 로깅 세그먼트 – 이 코드 사용
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]
- 새 로그 인스턴스 만들기
apply(
dir:
File,config: LogConfig,
logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
- 기록 읽기
addAbortedTransactions
(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
read(

startOffset: Long,
maxLength: Int,
maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
- 기록 추가
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
- 세그먼트 정리 및 오프셋 맵 작성
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
- 세그먼트 삭제
roll(
expectedNextOffset: Option[Long] = None): LogSegment
asyncDeleteSegment(segment: LogSegment): Unit
deleteSeg(): Unit
- 새 로그 파일 생성
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
- 새 로그 세그먼트 열기-
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile(
dir: File,
offset: Long,
접미사: 문자열 = ""): 파일
- 로그 닫기
close(): Unit
- 세그먼트 복구 및 재구축
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
- 세그먼트 추가 또는 변환:
addSegment(
segment: LogSegment): LogSegment
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
- 파티션 로드
parseTopicPartitionName(dir: File): TopicPartition
- 구성 파일 수정
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
- 작업 자르기:
truncateTo(targetOffset: Long): Boolean
truncateFullyAndStartAt(newOffset: Long): Unit