본문 바로가기
BigData

How to handle aborted transactions by Apache Kafka Broker and Consumer / 카프카 브로커와 컨슈머의 취소된 트랜잭션 처리 방법에 대한 이해

by BestUgi 2020. 7. 10.

[편집 중]

[편집 중]

[편집 중]

 

Apache Kafka 0.11부터 Idempotent producerTransaction 기능이 소개되었다. Idempotent producer는 메시지의 중복과 누락 없이 정확히 한번만 카프카 서버에 저장하는 방식(Exactly-Once Semantic)이며 Transaction은 여러 개의 메시지를 여러 파티션에 걸쳐서 원자적(Atomicity)으로 저장할 수 있는 방법을 제공한다. 이 두 가지 기능에 대해서 알아보다 보니 Kafka에서 ‘Aborted Transaction’을 처리하는 방법을 다루는 글이 많지 않아서 이를 정리하고자 포스팅을 하게 되었다.

 

이 글에서는 아래와 같은 질문에 답을 구하기 위한 내용을 다루고자 한다.

  • 프로듀서가 KafkaProducer.abortTransaction()을 호출할 경우 Kafka Broker는 트랜잭션 데이터를 어떻게 처리하는가?
  • Aborted TransactionTopicLog에서 제거 되는가?
  • Aborted TransactionTopicLog에서 제거되지 않는다면, 컨슈머가 Topic의 데이터를 요청(Fetch)할 때 어떻게 동작 하는가?

 

이 포스팅은 EoS Abort Index Proposal을 기반으로 작성되었음을 알린다. 이 글을 이해하기 위해서는 카프카의 트랜잭션 동작 방식에 대한 이해가 필요하며 이를 위해서는 을 참고하길 바란다.

 

카프카 브로커의 ‘Transaction Abort’ 처리 방법

결론부터 얘기하자면, 프로듀서가 KafkaProducer.abortTransaction()을 호출할 경우 카프카 브로커는 현재 트랜잭션에 저장된 데이터를 지우지 않고 그대로 둔다. 다만 트랜잭션에 포함된 파티션에 해당 트랜잭션의 ‘Abort” 되었다는 제어 메시지(Control Message)를 추가로 저장(Append)한다. 카프카는 Append 방식으로 토픽의 로그를 운영(Read/Write)하여 순차적(Sequential) IO의 성능을 최대한 활용하는 시스템이기 때문이다. 그래서, 취소된(Aborted) 데이터를 삭제하는 방식은 지원하지 않는다.

 

프로듀서가 abortTransaction을 호출할 경우 카프카 브로커가 이를 어떻게 처리하는지는 아래의 그림에 설명되어 있다.

 

 

Transaction CoordinatorabortTransaction API를 호출하면 포로듀서는 EndTxnRequest(status=ABORT)Transaction Coordinator에게 전송한다(1).

 

Transaction CoordinatorAbort를 위한 EndTxnRequest를 수신하면, 트랜잭션 취소(ABORT)를 두 가지 단계(Two-Phase)로 수행한다. (2~4)

  1. Phase-1, Transaction LogPREPARE_ABORT 메시지 쓰기 (2)
    • 트랜잭션을 취소할 경우 제일먼저 Transaction Log PREPARE_ABORT 메시지 쓰기를 수행한다. 해당 메시지를 정상적으로 쓰고 난 이후에는 이후 단계가 실패하더라도 복구가 가능하다.
  2. Phase-2, 트랜잭션에 포함된 파티션에 ABORT marker 쓰기 (3~4)
    • 실제 취소 대상 데이터가 저장된 파티션의 리더들에게 요청(WriteTxnMarkerRequest)하여 Abort Marker를 쓰게된다.

Abort Marker를 모든 파티션에 쓴 이후에는 Transaction LogABORTED 메시지를 쓰게 된다. (5)

 

LSO(Last Stable Offset)

취소 트랜잭션의 시작 오프셋(FirstOffset)과 마지막 오프셋(LastOffset), 트랜잭션 취소 당시의 LSO(LastStableOffset)을 기록한다. 각 브로커는 파티션 별로 활성 트랜잭션(Active Transaction)들의 정보를 메모리에 관리하고 있으며, 이 활성 트랜잭션들의 시작 오프셋 중에서 최소 값 – 1”LSO로 정의한다. LSO는 다음과 같은 용도로 사용된다.

  • 컨슈머가 READ_COMMITTED isolation level로 데이터를 읽을 경우 파티션의 현재 LSO보다 작은 Offset의 데이터들만 읽을 수 있도록 허용한다. 이는 커밋된 트랜잭션의 데이터(Committed Transactional Messages)와 트랜잭션을 사용하지 않은 일반 데이터(Non-Transactional Messages)만 읽을 수 있도록 허용하고 현재 진행 중인 트랜잭션의 데이터는 읽을 수 없도록 제한한다.

  • Aborted Transaction Index에 취소된 트랜잭션 정보(TransactionEntry)에 저장된다. 저장하는 방법은 아래의 ‘Aborted Transaction Index’에서 설명하고, 컨슈머가 데이터 요청 시 해당 인덱스를 사용하는 방법은 아래의 ‘Fetch Messages in READ_COMMITTED MODE’를 참고하자.

 

Aborted Transaction Index

위의 Phase-2에서 Abort Marker를 쓰는 일 이외에도 추가적으로 수행하는 일이 있는데, 이는 Aborted Transaction Index를 생성하는 것이다. Aborted Transaction Index는 토픽 파티션의 각 세그먼트에 대응되는 별도 파일인데, 아래와 같은 Aborted Transaction의 정보(TransactionEntry)들을 계속해서 덧붙여(Append Only) 저장한다.

 

TransactionEntry => 

  Version => int16

  PID => int64

  FirstOffset => int64

  LastOffset => int64

  LastStableOffset => int64

 

 

Transaction Coordinator가 트랜잭션 취소(Abort)를 위해 EndTxnRequest를 브로커에 전달하면, 브로커는 아래와 같은 순서로 Aborted Transaction Index를 갱신한다.

  1. 현재 LSO를 갱신한다.
  2. 현재 LSO를 사용하여 취소된 트랜잭션에 대한 정보(TransactionEntry)를 생성하고 Aborted Transaction Index에 추가(Append)한다.
  3. 활성 트랜잭션(Active Transaction) 목록에서 취소된 트랜잭션을 제거한 이후에, 새로운 LSO를 갱신한다.

 

Fetch messages in COMMITTED_READ mode

취소 된 트랜잭션이 있을 경우 Consumer가 데이터를 요청(Fetch)할 때 어떻게 동작하는지 알아보도록 하자. 컨슈머가 FetchRequest를 브로커에 전송하는 경우에, Isolation 정보(READ_COMMITTED, READ_UNCOMMITTED)FetchOffset(패치 시작 오프셋)MaxBytes(최대 요청 크기)를 지정한다. 해당 요청을 브로커가 수신하게 되면, 요청의 FetchOffset, MaxBytes와 파티션의 세그먼트 인덱스(오프셋과 메시지의 매핑 정보)를 사용하여 최종 오프셋(FinalOffset)을 결정하게 된다. FinalOffset은 파티션의 현재 LSO보다 작은 값이어야 한다.

그렇게 FinalOffset을 찾게 되면, FetchOffset을 포함하는 Log SegmenetAborted Transaction Index를 조회하여 취소 트랜잭션 정보(TransactionEntry)를 찾는다.  

 

  1. FirstOffsetFinalOffset 사이의 파티션 세그먼트들이 소유하는 Aborted Transaction Index를 조회하여 TransactionEntry를 찾는다.
  2. FinalOffset 보다 크거나 같은 LSO를 가지는 TransactionEntry를 찾거나 더 이상의 TransactionEntry가 없을 때까지 인덱스를 조회한다.

이렇게 찾은 취소 트랜잭션 정보(TransactionEntry)들의 PID(Producer ID)FirstOffset만을 FetchResponse에 포함하여 전송한다.

 

컨슈머는 실제 수신한 데이터에서 취소 트랜잭션 데이터를 필터링 해야 하는데, FetchResponse에 수신한 취소 트랜잭션의 PIDFirstOffsetFirstOffset을 기반으로 하여 MinHeap을 생성하고 수신한 데이터에서 메시지를 하나씩 읽으면서 MinHeap의 데이터와 대조하여 동일한 PID, FirstOffset의 데이터인지 판단한다. 만약 PIDFirstOffset이 동일할 경우, 해당 데이터는 취소 트랜잭션의 최초 데이터이고, 이때부터 동일한 PID를 가지는 메시지(Abort Marker를 발견하기 전까지)는 취소 트랜잭션의 메시지로 판단하여 사용자에게 반환하지 않는다.

댓글