7장에서는 카프카가 신뢰성 있는 데이터 전달을 위해 '최소 한 번'에 중점을 두어 기능들을 설명했다. 그러나 메시지 중복 문제는 여전히 발생하는 문제이다. 카프카에서는 메시지 중복에 대한 처리를 위해 멱등적 프로듀서와 트랜잭션을 활용한다.
멱등적이라는 말은 여러 번 실행해도 한 번 실행한 것과 결과가 같은 서비스를 가리킬 때 자주 사용된다. x = x+1 은 실행마다 다른 결과를 내겠지만, x = 18 은 여러 번 수행해도 한 번 수행한 것과 같을 것이다.
카프카에서는 멱등적 프로듀서 기능을 통해 중복을 탐지하고 처리한다.
멱등적 프로듀서의 작동 원리
멱등적 프로듀서 기능을 켜면 모든 메시지는 고유한 프로듀서 ID, 시퀀스 넘버를 가지게 된다. 브로커는 이 값과 토픽 및 파티션을 조합하여 고유한 식별자로 사용하며, 파티션별로 쓰여진 마지막 5개의 메시지들을 추적하는 데에 사용한다. (5개는 기본값이며 max.in.flights.requests.per.connection 값을 통해 변경할 수 있음)
똑같은 식별자를 가진 메시지를 받으면 적절한 에러를 발생시키고, 시퀀스 넘버가 예상보다 크게 올 경우에도 에러를 발생시킨다.
프로듀서가 재시작할 경우 트랜잭션 기능이 없다면 프로듀서는 완전히 새로운 프로듀서 ID 를 할당받아 메시지를 전송하기 시작한다. 따라서, 재시작 전 프로듀서와 동일한 메시지를 보내더라도 프로듀서 ID가 다르므로 중복이 발생했는지 브로커는 알 수 없다.
파티션 리더가 속한 브로커에서 장애가 발생해도 다음 리더가 될 파티션이 속한 브로커에서도 메시지는 최신 상태를 유지하며, 최근 5개의 메시지를 저장하는 인메모리 상태 저장 또한 복제하기 때문에 문제가 없다. 브로커에 장애가 발생하여 똑같은 브로커를 재시작하는 경우에도 이전 브로커의 스냅샷과 장애가 난 사이에 들어온 최신 메시지들을 통해 최신 상태로 업데이트하여 문제가 발생하지 않는다.
멱등적 프로듀서의 한계
위에서 살펴본 중복 방지 매커니즘은 프로듀서의 내부 로직으로 인한 재시도가 발생한 경우에 한해서 동작한다. 따라서, producer.send 메서드로 동일한 메시지를 두 번 보내는 것은 프로듀서가 동일한 메시지인지 알 방법이 없으므로 중복이 발생하게 된다.
프로듀서 내부 로직으로 인한 재시도에는 프로듀서, 네트워크, 브로커 에러 등으로 인한 재시도를 의미한다.
멱등적 프로듀서 사용법
프로듀서 설정에서 enable.idempotence=true 를 설정하기만 하면 된다.
acks=all 설정일 경우 성능 차이가 없다.
트랜잭션은 카프카 스트림즈를 통해 데이터 스트림을 처리하는 애플리케이션에서 정확히 한 번을 보장하기 위한 기능으로, 읽기-처리-쓰기 패턴에서 사용하도록 개발되었다. 읽기-처리-쓰기 패턴이란, 어떤 토픽에서 데이터를 읽어와 애플리케이션에서 데이터를 처리하고, 결과를 다시 출력 토픽에 쓰는 패턴을 의미한다.
트랜잭션이 해결하는 문제
레코드를 처리하고, 해당하는 오프셋을 커밋하기 직전에 크래시가 발생하여 다른 컨슈머가 해당 레코드를 중복 처리하는 문제
레코드를 읽어오고 바로 크래시가 발생하여 다른 컨슈머가 해당 레코드를 처리했는데 이후 크래시난 컨슈머가 다시 동작하여 받아온 레코드를 중복 처리하는 문제
트랜잭션은 어떻게 '정확히 한 번' 을 보장하는가?
원자적 다수 파티션 쓰기
오프셋 커밋과 데이터를 처리해 출력 토픽으로 보내는 일은 모두 파티션에 데이터를 쓰는 것이다. 따라서 이 일들을 원자적으로 수행하면 데이터 처리와 오프셋 커밋의 사이에 발생하는 일을 생각하지 않아도 된다.
원자적 다수 파티션 쓰기 기능을 사용하려면 트랜잭션적 프로듀서를 사용해야 한다.
트랜잭션적 프로듀서와 일반 프로듀서의 차이점은 별도의 transactional.id 값을 내부에 저장하는 initTransactions() 를 호출했다는 점이다. transactional.id 값은 프로듀서가 멈춰도 유지되어 재시작시 매핑되는 기존 producer.id 를 그대로 사용할 수 있게 해준다.
좀비 펜싱
위 문제 2번에서 살펴본 재시작하여 파티션 할당이 해제되었지만 모르고 있는 컨슈머를 좀비라고 부른다. 좀비가 출력 스트림에 결과를 쓰는 것을 막는 것이 좀비 펜싱이다.
일반적으로 에포크 방식을 통해 initTransactions() 를 호출하면 transactional.id의 에포크 값을 증가 시켜 높은 에포크 값을 가진 프로듀서의 요청만 처리하는 방식이다.
컨슈머 격리 수준
위에서 살펴본 기능들은 모두 프로듀서의 기능이다. 컨슈머도 격리 수준을 설정하여 정확히 한 번 데이터를 처리할 수 있는 기능이 존재한다.
isolation.level 설정값을 기본값인 read_uncommitted 로 사용하면 프로듀서가 메시지를 쓰고 트랜잭션을 커밋하지 않아도 해당 메시지를 전부 읽어오지만, read_committed 로 설정하면 프로듀서가 트랜잭션을 커밋해야만 해당하는 메시지들을 읽어올 수 있다.
컨슈머의 격리 수준을 높여 정확히 한 번 데이터를 처리하도록 할 수도 있으나, 트랜잭션이 커밋되지 않으면 데이터를 볼 수 없는 만큼 그 시간동안 컨슈머는 데이터를 처리할 수 없어 종단 지연이 길어진다. 데이터 스트림을 처리하는 애플리케이션은 트랜잭션 없이 원자적 다수 파티션 쓰기 기능만으로도 정확히 한 번의 데이터 처리를 보장할 수 있다.
참고.
프로듀서의 트랜잭션이란 데이터베이스 트랜잭션과 같이 트랜잭션 내의 여러 개의 메시지가 한번에 쓰여지거나 롤백되면 전부 없어지는 것을 말한다.
책의 설명에서는 transactional.id 를 사용하는 프로듀서의 기능과 메시지를 쓸 때의 트랜잭션 기능을 모두 트랜잭션이라고 혼용해서 사용하고 있다.
트랜잭션으로 해결할 수 없는 문제
스트림 처리에서의 side effect
애플리케이션이 파티션에서 데이터를 읽어와 처리하는 과정에서 사용자에게 이메일을 보낸다고 하자. 이메일은 애플리케이션에서 전송하는 것이므로 메시지의 커밋, 프로듀서 트랜잭션과는 관련이 없으므로 이메일이 정확히 한 번 발송 되는 것은 아니다.
카프카 토픽에서 읽어서 데이터베이스에 저장하는 경우
읽기-처리-쓰기 패턴은 토픽에서 읽어와 다시 출력 토픽에 저장하는 경우에 해당한다.
파티션에 대한 오프셋 커밋과 데이터베이스의 트랜잭션 커밋을 원자적으로 수행할 방법은 없으므로, 두 처리가 달라서 발생하는 문제가 여전히 발생한다.
이 문제를 해결하기 위해 토픽에서 데이터를 읽어오는 서비스와 데이터베이스에 데이터를 쓰는 작업을 분리하여 처리하는 아웃박스 패턴이 존재한다. (https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/)
데이터베이스에서 읽어서, 카프카에 쓰고, 데이터베이스에 저장하는 경우
데이터베이스에서 읽어오기 때문에 오프셋 커밋과 데이터 처리는 걱정할 필요가 없으나, 데이터베이스의 트랜잭션 만큼 카프카의 트랜잭션이 강력하지 않다는 점이 문제이다. 카프카 트랜잭션으로 컨슈머가 커밋된 메시지만 읽어온다고 하더라도, 토픽에서 랙이 발생하여 커밋된 메시지를 모두 읽어서 처리했을거라는 보장이 없다.
한 클러스터에서 다른 클러스터로 데이터 복제
애플리케이션이 한 클러스터에 메시지를 트랜잭션적으로 쓰더라도, 미러메이커를 통한 클러스터간 데이터 복제에서 트랜잭션과 관련된 정보는 저장하지 않기 때문에 다른 클러스터에 메시지는 개별적으로 미러메이커에 의해 잘 저장되겠지만, 트랜잭션 전체가 잘 저장되었는지 복제된 클러스터의 컨슈머는 알 방법이 없다.
발행/구독 패턴
위에서 정확히 한 번의 데이터 처리는 읽기-처리-쓰기 구조에서 일어날 수 있다고 했다. 발행/구독 패턴은 토픽에서 데이터를 읽고 사용하는 것이 끝이므로 트랜잭션을 통해 커밋된 메시지만 읽어오도록 할 수는 있지만, 해당 데이터에 대한 처리가 중복되는 것은 막을 수 없다.
트랜잭션 사용법
가장 일반적이고 권장되는 방법은 카프카 스트림즈를 사용하는 애플리케이션에서 processing.guarantee 설정값으로 exactly_once 를 주면 카프카 스트림즈가 읽기-처리-쓰기 패턴에 대해 자동으로 트랜잭션을 통한 정확히 한 번 보장을 제공한다.
카프카 스트림즈를 통한 자동 트랜잭션이 아니라 직접 트랜잭션 API를 사용하고 싶다면, KafkaProducer 에서 제공하는 beginTransaction, commitTransaction 을 사용하면 된다.
트랜잭션을 적용하기 위해 transactional.id 를 프로듀서에 설정하고, initTransactions 메서드를 호출하고, 트랜잭션 내에서 sendOffesetsToTransaction 메서드로 오프셋을 커밋해야만 원자적으로 메시지들과 오프셋들이 여러 파티션들에 쓰여지게 된다.
트랜잭션 ID와 펜싱
위에서 살펴본 방법에서는 transactional.id 를 프로듀서에 정적으로 설정해주어 해당 값을 통해 트랜잭션적 프로듀서를 사용했다. 그러나, 해당 프로듀서에서 크래시가 나서 다른 프로듀서가 쓰기 작업을 대체한다고 할 때, 같은 transactional.id 를 갖되, 에포크 값을 업데이트한 프로듀서가 동작할 것을 기대하겠지만, transactional.id 는 정적으로 설정되어있기 때문에 쉽지 않다.
카프카 2.5에서 새로 고안된 펜싱 방법은 트랜잭션 ID와 컨슈머 그룹 메타데이터를 함께 사용하는 펜싱 방법이다. 오프셋 커밋 메서드 호출 시, 인자로 컨슈머 그룹 메타데이터를 함께 넘겨 컨슈머 그룹의 에포크 값을 판단하여 펜싱하는 방식이다. 애플리케이션이 작동을 중지하여 사용하던 컨슈머, 프로듀서가 좀비가 되었을 경우 같은 컨슈머 그룹에 속한 다른 애플리케이션이 좀비의 파티션을 넘겨받아 처리하고, 컨슈머 그룹의 에포크 값을 증가시켜 이후에 좀비 프로듀서가 전송한 메시지는 펜싱되도록 동작한다.
트랜잭션의 작동 원리
기본적으로 카프카의 트랜잭션은 찬디-램포트 스냅샷 알고리즘의 영향을 받아 만들어졌으며, 프로듀서가 트랜잭션을 커밋할 때 트랜잭션 코디네이터에게 마커를 전송하고, 트랜잭션 코디네이터는 해당하는 파티션들에게 커밋 마커 메시지를 보내어 트랜잭션을 처리한다.
그러나, 마커를 전송하는 과정이나 파티션들에 커밋 메시지를 보내는 과정에서 문제가 발생할 수도 있기 때문에 카프카는 two-phase commit 과 트랜잭션 로그를 사용한다.
트랜잭션 로그와 two-phase commit
__transaction_state 라는 이름의 내부 토픽에 저장되며 트랜잭션 코디네이터가 트랜잭션 로그를 관리한다. 트랜잭션 코디네이터는 프로듀서의 트랜잭션 ID 별로 해당하는 트랜잭션 로그의 리더 파티션을 맡은 브로커가 수행한다.
트랜잭션 로그와 two-phase commit 알고리즘은 다음과 같이 4단계로 수행된다.
현재 진행중인 트랜잭션이 존재함을 로그에 쓴다. 연관된 파티션들도 모두 기록한다.
커밋 혹은 중단 시도를 기록한다.
모든 파티션에 트랜잭션 마커를 쓴다.
트랜잭션이 종료되었음을 로그에 쓴다.
실제 프로듀서와 트랜잭션 코디네이터의 동작으로 설명하면 다음과 같다.
프로듀서가 initTransaction 메서드를 호출하여 트랜잭션 코디네이터에게 자신이 새로운 트랜잭션 프로듀서임을 알리고, 에포크 값도 증가시켜 같은 트랜잭션 ID 를 갖는 수행중인 트랜잭션이 있다면 중단시킨다.
beginTransaction 메서드를 호출하면 프로듀서는 다음에 쓰는 메시지부터는 트랜잭션에 포함한 메시지임을 메시지에 명시한다. beginTransaction 메서드 호출만으로는 트랜잭션 코디네이터는 트랜잭션이 시작했음을 알 수 없다.
beginTransaction 메서드를 호출 이후 프로듀서가 메시지를 전송하면 브로커에 추가적으로 AddPartitionsToTxn 요청을 전송하여 메시지가 트랜잭션 내에서 전송되고 있으며, 진행중인 트랜잭션이 있음을 알린다. 이 작업은 로그에 기록된다.(1)
sendOffsetsToTransaction 메서드를 호출하면 읽어온 토픽에 오프셋들을 커밋한다. 이는 일반적인 오프셋 커밋과 같은 방식으로 동작한다.
commitTransaction, abortTransaction 을 통해 트랜잭션을 끝내면, 트랜잭션 코디네이터에 EndTxn 요청이 전송되어 커밋, 중단 시도를 로그에 기록한다.(2)
로그 기록이 끝나면, 모든 파티션에 커밋 또는 중단 마커를 쓴다.(3)
마커를 모두 쓰고 난 뒤에 트랜잭션이 종료되었음을 로그에 쓴다.(4)
이 과정을 진행하는 중에 트랜잭션 코디네이터에서 크래시가 발생하더라도 로그를 통해 최종적으로 트랜잭션이 보장된다.
트랜잭션 성능
프로듀서 측
트랜잭션 ID 등록 요청, 트랜잭션 종료 시 트랜잭션 커밋 요청은 동기적으로 일어나 오버헤드를 발생시킨다.
메시지의 수와 트랜잭션 오버헤드는 상관관계가 없으므로, 한 트랜잭션 내에 많은 메시지를 보내야 처리량을 증가시킬 수 있다.
컨슈머 측
커밋 마커를 읽어오는 작업에서 약간의 오버헤드가 발생한다.
read_committed 상태의 컨슈머는 트랜잭션 커밋 간격이 길어질수록 종단 지연도 그만큼 증가한다.
커밋되지 않아서 메시지를 읽어오지 못할 때, 브로커에서 메시지 자체를 보내지 않기 때문에 하드웨어적인 오버헤드는 발생하지 않는다.