Apache Kafka Logs: 로그 데이터 구조 및 관리에 대한 종합 가이드?

게시 됨: 2021-12-24

데이터는 10년 전과 같지 않습니다. 데이터를 사용 가능한 정보로 처리하는 것은 상상할 수 있는 것보다 훨씬 어렵습니다. Linkedin에서 만들었지만 이제 오픈 소스 개발로 변경된 Apache Kafka는 데이터를 관리하고 잘 활용하기 위한 훌륭한 무료 도구입니다. 오늘날의 디지털 세계에서 데이터는 매우 중요하며 현실에 대한 우리의 인식을 이끄는 것은 데이터입니다.

Apache Kafka는 이 복잡한 문제를 해결하기 위해 정확하게 만들어졌습니다. 단일 소스(프로듀서) 또는 다중 소스로부터 수신된 데이터를 사용자가 저장, 분석 및 읽을 수 있는 실시간(10ms 미만) 데이터 스트리밍 소프트웨어입니다. 이는 본질적으로 가장 빠른 시간 프레임에 의미 있는 채널에 데이터를 배포하는 데 도움이 됩니다. 예를 들어 - 크리켓 경기에는 실시간 점수를 확인하고 이 정보를 채널에 전달하는 소스(프로듀서)가 있습니다. 채널은 가장 효율적인 방법으로 최종 소비자에게 정보를 제공하는 브로커와 같습니다. Apache Kafka는 이러한 모든 일이 발생하는 매체입니다.

  • Apache Kafka는 시스템 수준 다운타임 없이 매우 쉽게 확장할 수 있습니다.
  • Apache Kafka는 여러 브로커를 사용하여 데이터를 전송하기 때문에 내결함성 시스템입니다. 즉, 한 브로커가 오프라인이 되면 동일한 데이터가 저장된 복제된 브로커가 항상 존재합니다.
  • Kafka를 기반으로 애플리케이션을 구축하는 동안 Kerberos와 같은 보안 도구를 사용할 수 있습니다.

편의를 위해 여기에서 Apache kafka 학습을 직접 얻을 수 있습니다.

Apache Kafka 로그란 무엇입니까?

  • Apache Kafka 로그는 디스크에 있는 다양한 데이터 세그먼트의 모음입니다. 모든 다양한 데이터 세그먼트에는 형식 주제 또는 특정 주제 파티션 이름이 있습니다.
  • Apache Kafka를 사용하면 분산 시스템에 대한 외부 로그를 커밋하여 데이터 노드를 복제할 수도 있습니다. 이 메커니즘을 사용하면 필요할 때마다 데이터를 읽는 것 외에도 데이터를 복원할 수 있습니다.

Pexels의 Markus Spiske 사진

Apache Kafka Logs로 작업하는 동안 기억해야 할 사항은 무엇입니까?

  • 중복 데이터 또는 운영 목적으로만 사용되는 데이터에 로그인하지 않도록 하십시오.
  • 일반적으로 모듈 시작 또는 종료의 맨 처음과 끝에서 로그 기반 항목을 생성하도록 선택할 수 있지만 특수 로그를 생성할 수도 있습니다. 업데이트 수행을 위한 특정 단계의 시작과 끝에서 특수 로그를 사용할 수 있습니다.

Apache Kafka에서 로그를 활성화하는 방법은 무엇입니까?

다음은 Apache kafka-에서 로그를 시작하기 위해 스크립트에 입력해야 하는 로그 구성입니다.

# 파일 및 kafka 기반 로깅 모두 활성화

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

이 작업을 완료하면 javascript 객체 표기 형식 또는 json에서 로그를 찾을 수 있습니다.

Apache Kafka Logs와 관련된 다양한 작업 및 명령

Apache Kafka에서는 다양한 작업을 구현하려는 만큼 명령을 실행할 수 있습니다. 로깅은 백그라운드에서 수행됩니다.

  1. 로깅 세그먼트 – 이 코드 사용

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1. 새 로그 인스턴스 만들기

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1. 기록 읽기

addAbortedTransactions (

startOffset: Long,

segmentEntry: JEntry[JLong, LogSegment],

fetchInfo: FetchDataInfo): FetchDataInfo

read(

startOffset: Long,

maxLength: Int,

maxOffset: Option[Long],

minOneMessage: Boolean,

includeAbortedTxns: Boolean): FetchDataInfo

  1. 기록 추가

maybeRoll(

messagesSize: Int,

appendInfo: LogAppendInfo): LogSegment

append(

records: MemoryRecords,

isFromClient: Boolean,

interBrokerProtocolVersion: ApiVersion,

assignOffsets: Boolean,

leaderEpoch: Int): LogAppendInfo

  1. 세그먼트 정리 및 오프셋 맵 작성

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

startOffset: Long,

upperBoundOffset: Long,

startingSegmentEntry: JEntry[JLong, LogSegment],

accumulator: List[AbortedTxn] => Unit): Unit

  1. 세그먼트 삭제

roll(

expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. 새 로그 파일 생성

logFile(

dir: File,

offset: Long,

suffix: String = ""): File

  1. 새 로그 세그먼트 열기-

offsetIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

timeIndexFile(

dir: File,

offset: Long,

suffix: String = ""): File

transactionIndexFile(

dir: File,

offset: Long,

접미사: 문자열 = ""): 파일

  1. 로그 닫기

close(): Unit

  1. 세그먼트 복구 및 재구축

recoverSegment(

segment: LogSegment,

leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

lastOffset: Long,

reloadFromCleanShutdown: Boolean,

producerStateManager: ProducerStateManager): Unit

  1. 세그먼트 추가 또는 변환:

addSegment(

segment: LogSegment): LogSegment

convertToOffsetMetadata(

offset: Long): Option[LogOffsetMetadata]

  1. 파티션 로드

parseTopicPartitionName(dir: File): TopicPartition

  1. 구성 파일 수정

updateConfig(

updatedKeys: Set[String],

newConfig: LogConfig): Unit

  1. 작업 자르기:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit