상황
카프카의 전달 보증에 대해 공부했고 at most once, at least once, exactly once 세 종류가 있다는 것을 알게되었다. 각 전달 보증 수준에 대해 공부하면서, 브로커에서 응답하는 ack가 오지않는(유실되는) 경우 프로듀에서 메시지를 재전송 하는 부분에 대해 다음과 같은 테스트를 해보고 싶었다.
- at most once
- ack가 오지 않아도 메시지를 재전송하지 않는다.
- at least once
- ack가 오지 않으면 메시지를 재전송한다.
- 이미 브로커에서 저장된 메시지에 대해 재전송 하게되면, 메시지가 중복으로 저장될 수 있다.
- exactly once
- ack가 오지 않으면 메시지를 재전송한다.
- 이미 브로커에서 저장된 메시지에 대해 재전송 하더라도, 브로커에서는 중복 메시지 처리 알고리즘을 활용해 메시지를 중복 저장하지 않는다.
하지만, 생각처럼 상황을 시뮬레이션 할 수가 없었고 어떤 부분을 더 공부해야 원하는 테스트를 제대로 할 수 있는지 모르겠다… 일단 시도해본 것과 실패에 대한 원인을 적어보고, 앞으로 더 공부하면서 어떤걸 몰랐고 잘못 생각하고 있던건지 깨닫게 될 때, 다시 제대로 테스트해서 이 글을 보완할 예정이다.
시도해본 것들
Case 1. at least once 수준에서의 재송신
- 환경 및 옵션 세팅
- 브로커 1대 / acks = 1(default) / request.timeout.ms = 1000 (1초) / retries = 10
- 토픽 생성
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 토픽명
- 테스트 시나리오
- 브로커를 끈다.
- 프로듀서에서 메시지를 송신한다.
- request.timeout.ms에 정의된 시간 내에 ack 응답이 오지 않으면 10번 재송신된다.
- send 메서드 두 번째 파라미터인 Callback 인터페이스를 구현하여, 재송신된 10건의 메시지에 대한 exception이 10번 뜨는지 확인한다.
- 테스트 결과
exception : org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
- 프로듀서 코드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;
public class SampleProducer {
private static final String TOPIC_NAME = "jaeyoon";
private static final String FIN_MESSAGE = "exit";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "1");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);
properties.put(ProducerConfig.RETRIES_CONFIG, 10);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
while(true) {
Scanner sc = new Scanner(System.in);
System.out.print("Input > ");
String message = sc.nextLine();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
try {
producer.send(record, (metadata, exception) -> {
System.out.println("metadata : "+metadata);
if (exception != null) {
// some exception
System.out.println("exception : "+exception);
}
});
} catch (Exception e) {
// exception
} finally {
producer.flush();
}
if(message.equals(FIN_MESSAGE)) {
producer.close();
break;
}
}
}
}
결과 분석
- 브로커가 꺼져있었기 때문에 metadata를 제한 시간내에 가져오지 못해서 발생한 에러이다.
- send 메서드는 실행 후
max.block.ms(default 60000)
에 정의된 만큼 metadata fetch 및 버퍼 할당을 기다리는 총 시간을 제한한다. - 해당 시간이 초과하면
TimeoutException
이 발생한다.
- send 메서드는 실행 후
메타데이터는 언제 가져오나 ?
- 프로듀서가 처음으로 메타데이터 요청을 하는 때는 클라이언트 구성에서 설정한 부트스트랩 서버(
bootstrap.server
)에 연결할 때다.- 물론, 한 대 이상의 브로커일 수 있지만 반드시 클러스터에 있는 모든 브로커일 필요는 없다(따라서 메타데이터 요청은 각 브로커에 대한 것이 아님).
- 메타데이터를 통해, 프로듀서는 자신이 메시지를 보내고자 하는 토픽이 어디에 있는지 정보를 얻는다.
- 또는, 토픽의 리더 레플리카를 가진 브로커에 연결 오류가 발생하면 메타데이터 요청을 수행할 수 있다.
- 이 경우 브로커에 연결하기 위해(다른 토픽에 대해 아직 연결되지 않은 경우) 어떤 브로커가 새 리더인지 알아야 한다.
Case 2. 1번 케이스에서 메타데이터 얻어온 후 브로커 종료
- 환경 및 옵션 세팅 (case1과 동일)
- 브로커 1대 / acks = 1(default) / request.timeout.ms = 1000 (1초) / retries = 10
- 테스트 시나리오
- 주키퍼와 브로커를 켜고 메시지를 한 번 송신한다.(메타데이터를 받아온다.)
- 브로커를 끈다.
- 메시지를 송신한다.
- request.timeout.ms에 정의된 시간 내에 ack 응답이 오지 않으면 10번 재송신된다.
- send 메서드 두 번째 파라미터인 Callback 인터페이스를 구현하여, 재송신된 10건의 메시지에 대한 exception이 10번 뜨는지 확인한다.
프로듀서 코드 (case1과 동일)
- 테스트 결과
1
2
3
Input > bye
exception : org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for jaeyoon-0: 1018 ms has passed since batch creation plus linger time
결과 분석
- 해당 에러는 버퍼에 쌓인 레코드가
request.timeout.ms(default 30000)
에 세팅된 시간 +linger.ms (default 0)
에 세팅된 시간을 초과할 때 까지 브로커에 송신되지 않으면 발생한다.- 일반적으로
레코드가 버퍼에 쌓이는 속도 >>> 레코드를 송신하는 속도
인 경우 레코드가 버퍼에서 대기하는 시간이 길어져 이러한 에러가 발생할 수 있다. - 위 테스트의 경우에는 프로듀서가 송신할 브로커를 찾지 못해 버퍼에 레코드가 쌓여있다가
request.timeout.ms
에 세팅된 1초를 넘겨 에러가 발생했다.
- 일반적으로
linger.ms
- 프로듀서는 해당 옵션에 세팅된 시간만큼(default 0) 지연 후 브로커로 메시지를 송신한다.
- 즉, 버퍼에 쌓인 레코드를 즉시 내보내는 것이 아니라, 다른 레코드가 함께 전송될 수 있도록(batch 처리) 기다린다.
batch 처리
- 레코드는 브로커에게 전송하기 위한 묶음으로(batch) 그룹화되어 메시지당 전송 오버헤드를 줄이고 처리량을 증가시킨다.
- send 메서드를 호출하면 브로커에게 보낼 수 있도록 ProducerRecord가 내부 버퍼에 저장된다.
- send 메서드는 전송 여부와 관계없이 ProducerRecord가 버퍼링되면 즉시 return 한다.
- 배치가 제한 시간보다 오래 대기한 경우 예외가 발생하며, 해당 배치의 레코드는 전송 대기열에서 제거된다. (위 테스트의 경우)
Case3. min.insync.replicas 활용하여 에러 발생시키기
- 환경 및 옵션 세팅
- 브로커 3대 / replication.factor = 3 / min.insync.replicas = 2 / acks = all / retries = 5
- 토픽 생성
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --config min.insync.replicas=2 --replication-factor 3 --partitions 1 --topic 토픽명
- 테스트 시나리오
- 주피커와 브로커를 3대를 켠다.
- 메시지를 한 번 송신하고 정상적으로 처리되는지 확인한다.
- 브로커 두 대를 다운시킨다.
- 브로커가 한 대만 남아있으므로
min.insync.replicas=2
를 만족할 수 없기 때문에 에러가 발생할 것이고 메시지는 재전송 될 것이다. - send 메서드 두 번째 파라미터인 Callback 인터페이스를 구현하여, 재송신된 10건의 메시지에 대한 exception이 10번 뜨는지 확인한다.
- 테스트 결과
- 프로듀서 측
1 2 3
Input > down test! exception : org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
- 브로커 측 (1대 남은 브로커)
- 첫 번째 에러 발생 후, 프로듀서 측에 세팅한 대로 5번 재시도 하는 것을 볼 수 있다.
1 2 3 4 5 6 7 8 9 10 11 12
[2021-08-02 04:05:07,937] ERROR [ReplicaManager broker=0] Error processing append operation on partition dawn-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition dawn-0 [2021-08-02 04:05:08,043] ERROR [ReplicaManager broker=0] Error processing append operation on partition dawn-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition dawn-0 [2021-08-02 04:05:08,150] ERROR [ReplicaManager broker=0] Error processing append operation on partition dawn-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition dawn-0 [2021-08-02 04:05:08,258] ERROR [ReplicaManager broker=0] Error processing append operation on partition dawn-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition dawn-0 [2021-08-02 04:05:08,363] ERROR [ReplicaManager broker=0] Error processing append operation on partition dawn-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition dawn-0 [2021-08-02 04:05:08,465] ERROR [ReplicaManager broker=0] Error processing append operation on partition dawn-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition dawn-0
- 프로듀서 코드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;
public class SampleProducer {
private static final String TOPIC_NAME = "dawn";
private static final String FIN_MESSAGE = "exit";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);
properties.put(ProducerConfig.RETRIES_CONFIG, 5);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
while (true) {
Scanner sc = new Scanner(System.in);
System.out.print("Input > ");
String message = sc.nextLine();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
try {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// some exception
System.out.println("exception : " + exception);
}
});
} catch (Exception e) {
// exception
} finally {
producer.flush();
}
if (message.equals(FIN_MESSAGE)) {
producer.close();
break;
}
}
}
}
결과 분석
- 브로커 측에서 프로듀서에 세팅한 재송신 횟수 만큼의 예외 메시지를 출력한다. 그렇다는건 초기에 세팅한 횟수만큼 메시지를 재전송 했다는 것이다.
- 재전송을 하긴 했는데, 프로듀서 측에서의 콜백은 한 번만 발생했다.
- 재전송이라는게 send 메서드가 여러번 호출되는 개념이 아니라, send를 한 번 호출하면 그 안에서 필요한 경우 자동으로 재전송이 되고 그에 대한 최종적인 결과를 콜백 메서드에 넘겨주는건가 ?
- 정확히 send 메서드 내부의 코드를 이해하지는 못했지만, 이에 대한 의문은 RetriableException에 대해 공부하면서 조금은 해소되었다.
RetriableException
- 프로듀서의 에러 처리에는 두 가지가 있다.
- 프로듀서가 자동으로 처리하는 에러(=재시도 가능한 에러)
- 개발자가 프로듀서 라이브러리를 사용해서 처리해야하는 에러
- 예를 들어,
LEADER_NOT_AVAILABE
같은 에러는 리더가 다시 선출되면 해결되는 문제이므로 재시도 가능한 에러(retriable error) 라고 한다. - 반면,
INVALD_CONFIG
에러 (구성이 잘못되어 발생한 에러)의 경우는 아무리 메시지를 재전송해도 구성이 변경되지 않으므로 재시도 불가능한 에러이다. - 재시도 가능한 에러는 카프카 프로듀서 객체가 알아서 해주므로 우리 코드에서 직접 처리할 필요가 없다.
- 위에서 발생한
NotEnoughReplicasException
또한 재시도 가능한 에러이다.
ReplicaManager
- 테스트 결과의 브로커 측 예외 메시지를 보면
kafka.server.ReplicaManager
에서 에러를 발생시키는 것 같아 찾아보니 다음과 같은 구성으로 되어있는 것 같다. - “ReplicaManager manages log replicas using the LogManager.” 라고 설명 되어있는데, 카프카 서버측 로그를 담당하는 역할을 하는 것 같다.
Case4. at most once vs at least once 메시지 유실 정도 비교
- 환경 및 옵션 세팅
- 브로커 3대 / replication.factor = 3 / min.insync.replicas = 2 / retries = Integer.MAX
- acks = 0 (at most once)
- acks = 1 (at least once)
- 토픽 생성
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --config min.insync.replicas=2 --replication-factor 3 --partitions 1 --topic 토픽명
- 테스트 시나리오
- 주키퍼와 브로커 3대를 켠다.
./bin/kafka-topics.sh --describe --topic 토픽명 --zookeeper localhost:2181
명령어를 통해 리더 레플리카를 파악한다../bin/kafka-consumer-groups.sh --bootstrap-server localhost:포트번호 --group dawn --describe
명령어를 통해 브로커 3대의 LOG-END-OFFSET을 파악한다.- 프로듀서에서 10만건의 메시지를 송신하게 하고 도중에 리더 레플리카를 가진 브로커를 다운시킨다.
- 메시지 송신이 완료되고 난 뒤, 새로운 리더 레플리카를 파악하고 3번 명령어 통해 현재 LOG-END-OFFSET을 확인한다.
- 새로운 리더가 선출될 때
LeaderNotAvailableException
가 발생할 것이고, 그로 인해 처리되지 못한 메시지가 얼마나 차이가 나는지 확인해본다.
- at most once인 경우에는 리더 선출로 인해 메시지 처리가 되지 않더라도 재송신 하지 않기 때문에 유실되는 메시지가 있을 것이다.
- at least once인 경우에 리더 선출로 인해 에러가 나는 경우 메시지를 재송할 것이므로 at most once 경우보다는 더 많은 메시지가 처리될 것이다.
- 테스트 결과 (at most once)
- before
1 2
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG dawn dawn 0 5 5 0
- after
1 2
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG dawn dawn 0 100005 100005 0
- 테스트 결과 (at least once)
- before
1 2
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG dawn dawn 0 100005 100005 0
- after
1 2
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG dawn dawn 0 200005 200005 0
- 프로듀서 코드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package basic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;
public class SampleProducer {
private static final String TOPIC_NAME = "dawn";
private static final String FIN_MESSAGE = "exit";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
properties.put(ProducerConfig.ACKS_CONFIG, "1"); // at most once : 0 , at least once : 1
properties.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
while (true) {
Scanner sc = new Scanner(System.in);
System.out.print("Input > ");
String message = sc.nextLine();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
try {
for(int i=0; i<100000; i++) {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// some exception
System.out.println("exception : " + exception);
}
});
if(i%10000==0) {
Thread.sleep(1000);
}
}
} catch (Exception e) {
// exception
} finally {
producer.flush();
}
if (message.equals(FIN_MESSAGE)) {
producer.close();
break;
}
}
}
}
결과 분석
- 두 가지 전달 보증 수준 모두에서 메시지 유실은 없었다.
- 이렇게 테스트하는게 아닌가 … ??
- 리더가 선출되는 과정과 그 때의 메시지 처리에 대해 좀 더 자세하게 공부해보자
Case 별로 배운 부분
추측이나 확실히 모르겠는건 추후 보완 예정 !
Case1~2
- 브로커에서 ack 응답이 오지 않는 상황을 브로커를 다운시킴으로써 시뮬레이션 하려고 했으나, 브로커가 꺼져있으면 프로듀서에서 브로커로 송신 자체가 되지 않는다.
- ack를 유실하는 경우를 어떻게 시뮬레이션 할 수 있을까 ??
Case3
- 재시도 가능한 에러는 프로듀서 내부에서 자동으로 재전송해준다. (해당 코드는 어디에 있을까 ?)
- 추측 : 재전송이 여러번 되더라도 send 메서드에 대한 콜백은 한 번만 발생하는 것 같다.
참고자료
- https://kafka.apache.org/documentation/#producerconfigs
- https://stackoverflow.com/questions/56794122/metadata-requests-in-kafka-producer
- https://stackoverflow.com/questions/46750420/kafka-producer-error-expiring-10-records-for-topicxxxxxx-6686-ms-has-passed
- https://bistros.tistory.com/152
- 네하 나크헤데 외 2인 『카프카 핵심 가이드』, 제이펍(2018), chapter6