본 게시글은 '아파치 카프카 애플리케이션 프로그래밍 with 자바'를 읽고 개인적으로 정리한 내용입니다.
본격적으로 카프카에 대한 상세한 개념을 습득하기 전에 카프카를 설치하고, 커맨드 라인 툴을 통해 카프카에 명령을 내리는 방법에 대해 알아보자.
실습용 카프카 브로커 설치
책에서는 AWS EC2를 통해 실습을 진행했지만 우리는 AWS까지 가지 않고, 도커를 통해 우분투 서버를 띄워서 실습을 해볼 것이다. 도커에 대한 설명은 전에 작성한 도커 관련 포스팅을 확인하자. 순서대로 따라오면 어렵지 않으니 긴 설명 없이 순서대로 설명하도록 하겠다. 실습을 따라하기 위해서는 macOS 기준 도커 데스크탑이 설치되어 있어야 한다.
1) 도커를 통한 우분투 서버 실행
docker run -it --name ubuntu-server -p 2222:22 -p 9092:9092 --privileged ubuntu:latest /bin/bash
- ssh 접속을 위한 22번 포트를 내 PC의 2222번 포트와 매핑시킨다. 내 PC에서 이 우분투 서버로 접속할 때는 localhost:2222를 통해 SSH 접속을 하면 된다.
- 카프카 브로커의 기본 포트인 9092 포트를 개방시켜준다. 이는 내 PC에서 우분투 서버 내의 카프카 브로커와 통신하기 위함이다.
2) 패키지 관리 도구 업데이트 및 SSH, VIM 설치
apt update && apt install openssh-server vim
3) SSH 설정
sed -i 's/#PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config
echo "PasswordAuthentication yes" >> /etc/ssh/sshd_config
# root 계정 비밀번호 변경
echo "root:1234” | chpasswd
# SSH 시작
service ssh start
4) 로컬 PC에서 우분투 서버로 SSH 접속(비밀번호 기반 인증)
우분투 서버 내부가 아닌 내 PC의 터미널에서 해당 명령어를 통해 SSH 접속을 한다.
ssh -p 2222 root@localhost
- 우분투 서버 컨테이너를 실행할 때 설정한 2222번 포트를 사용하는 것을 확인할 수 있다.
5) 우분투 서버 JDK 설치
apt install -y openjdk-8-jdk
- 카프카 브로커를 실행하기 위해서는 JDK가 필요하다. 카프카 브로커는 스칼라와 자바로 작서되어 JVM 환경 위에서 실행되기 때문이다
- 여기서는 자바 1.8 버전을 사용한다
6) 카프카 바이너리 패키지 다운 & 압축해제
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xvf kafka_2.12-2.5.0.tgz
- 카프카 바이너리 패키지에는 자바 소스코드를 컴파일하여 실행하기 위해 준비해놓은 바이너리 파일들이 들어있다
7) 카프카 브로커 힙 메모리 설정
$ vi ~/.bashrc --- (1)
# 이후 열리는 파일의 최하단에 다음의 내용 추가 (i버튼을 누르면 편집 모드로 변경)
export KAFKA_HEAP_OPTS="-Xmx400m-Xms400m” --- (2)
# 추가 후 ESC를 누르고 :wq!릉 통해 저장하고 나가기
$ source ~/.bashrc --- (3)
$ echo $KAFKA_HEAP_OPTS --- (4)
-Xmx400m-Xms400m
- (1): vi 편집기로 .bashrc 파일을 연다.
- (2): 개발 환경에 맞는 적절한 힙 메모리 사이즈를 KAFKA_HEAP_OPTS 환경변수로 선언한다. 환경변수로 저장되어 있지 않으면 기본 힙 메모리는 'Xmx 1G,Xms 1G'로 설정된다.
- (3): source 명령어는 스크립트 파일을 수정한 후에 수정된 값을 바로 적용하기 위해 사용한다. 수정한 .bashrc를 바로 반영하도록 실행한다.
- (4): 선언한 환경변수를 확인한다. 이는 .bashrc을 통해 설정되었으므로 세션이 끊기거나 재접속하더라도 유지된다.
카프카 브로커 실행 시 메모리를 설정하는 부분은 카프카를 실행하기 위해서 사용하는 kafka-server-start.sh
스크립트 내부에서 확인할 수 있다.
8) 카프카 브로커 실행 옵션 설정
config/server.properties
파일에는 카프카 브로커가 클러스터 운영에 필요한 옵션들을 지정할 수 있다. 여기서는 실습용 카프카 브로커를 실행할 것이므로 advertised.listener
만 설정하면 된다. advertised.listener
는 카프카 클라이언트 또는 커맨드 라인 툴을 브로커와 연결할 때 사용된다. 현재 접속하고 있는 인스턴스의 퍼블릭 IP와 카프카 기본 포트인 9092를 PLAINTEXT://
와 함께 붙여넣고 advertised.listener
를 주석 해제한다.
vi config/server.properties
# 다음의 내용 주석 해제 & 추가
advertised.listeners=PLAINTEXT://localhost:9092
다음은 config/server.properties
파일 내부에 각 설정들에 대한 설명이다.
broker.id=0
: 실행하는 카프카 브로커의 번호. 클러스터를 구축할 때 브로커들을 구분하기 위해 유니크하게 설정해야 한다.listeners=PLAINTEXT://:9092
: 카프카 브로커가 통신을 위해 열어둘 인터페이스 IP, port, 프로토콜. 따로 설정하지 않으면 모든 IP와 port에서 접속할 수 있다.advertised.listeners=PLAINTEXT://localhost:9092
: 카프카 클라이언트 또는 카프카 커맨드 라인 툴에서 접속할 때 사용하는 IP와 port 정보SASL_PLAINTEXT,SASL_SSL:SASL_SSL
: SASL_SSL,SASL_PLAIN 보안 설정 시 프로토콜 매핑을 위한 설정num.network.threads=3
: 네트워크를 통한 처리를 할 때 사용할 네트워크 스레드 개수num.io.threads=8
: 카프카 브로커 내부에서 사용할 스레드 개수log.dirs=/tmp/kafka-logs
: 통신을 통해 가져온 데이터를 파일로 저장할 디렉토리 위치. 브로커 실행 전에 디렉토리가 생성되어 있어야 에러가 발생하지 않는다.num.partitions=1
: 기본 파티션 개수log.retention.hours=168
: 카프카 브로커가 저장한 파일이 삭제되기까지 걸리는 시간.log.retention.hours
대신log.retention.ms
값을 설정하여 운영하는 것을 추천한다.log.retention.ms=-1
로 설정하면 파일은 영원히 삭제되지 않는다.log.segment.bytes=1073741824
: 카프카 브로커가 저장할 파일의 최대 크기를 지정한다. 데이터 양이 많아 이 크기를 채우면 새로운 파일이 생성된다.log.retention.check.interval.ms=300000
: 카프카 브로커가 저장한 파일을 삭제하기 위해 체크하는 간격을 지정한다zookeeper.connect=localhost:2181
: 카프카 브로커와 연동할 주키퍼의 IP와 port를 설정한다. 실습을 위해 우분투 서버 내에 주키퍼와 브로커르 동시에 실행하므로 localhost:2181이다.zookeeper.connection.timeout.ms=18000
: 주키퍼의 세션 타임아웃 시간을 지정한다.
이미 실행되고 있는 카프카 브로커의 설정을 변경하고 싶다면 브로커를 재시작해야 한다.
9) 주키퍼(zookeeper) 실행
카프카 바이너리가 포함된 폴더에는 브로커와 같이 실행할 주키퍼가 준비되어 있다. 주키퍼는 카프카의 클러스터 설정 리더 정보, 컨트롤러 정보를 담고 있어 카프카를 실행하는 데에 필요한 필수 애플리케이션이다. 주키퍼를 안전하게 운영하기 위해서는 3애 이상의 서버로 구성하여 사용해야 하지만, 여기서는 실습이니 1대만 실행하도록 하겠다.
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ jps -m
QuorumPeerMain config/zookeeper.properties
Jps -m
-daemon
옵션: 주키퍼를 백그라운드에서 실행한다.- 주키퍼가 정상적으로 실행 중인지는
jps -m
을 통해 확인할 수 있다.
10) 카프카 브로커 실행 및 로그 확인
$ bin/kafka-server-start.sh -daemon config/server.properties
$ jps -m
QuorumPeerMain config/zookeeper.properties
Kafka config/server.properties
Jps -m
$ tail -f logs/server.log
kafka-server-start.sh
명령어를 통해 카프카 브로커를 실행한 뒤 jps 명령어를 통해 주키퍼와 브로커 프로세스의 동작 여부를 확인한다.- tail 명령어를 통해 로그를 확인하여 카프카 브로커가 정상 동작하는지 확인한다.
- 카프카 브로커의 로그를 확인하는 것은 매우 중요한데, 카프카 클라이언트를 개발할 때뿐만 아니라 카프카 클러스터를 운영할 때 이슈가 발생할 경우 모두 카프카 브로커에 로그가 남기 때문이다.
11) 로컬 컴퓨터에서 카프카와 통신 확인
카프카 바이너리 패키지에서 제공하는 커맨드 라인 툴들 중 카프카 브로커에 대한 정보를 가져올 수 있는 kafka-broker-api-versions.sh
명령어를 제공한다. 이 명령어를 통해 카프카 브로커와 정상적으로 연동되는지 확인할 수 있다. 이를 위해 로컬 컴퓨터에도 카프카 바이너리 패키지를 다운로드 하자! 이후 다운로드한 폴더의 bin 폴더에 가면 커맨드 라인 툴들을 확인할 수 있다.
$ wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
$ tar -xvf kafka_2.12-2.5.0.tgz
$ cd kafka_2.12-2.5.0
$ bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
localhost:9092 (id: 0 rack: null) -> (
Produce(0): 0 to 8 [usable: 8],
Fetch(1): 0 to 11 [usable: 11],
...
IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
AlterPartitionReassignments(45): 0 [usable: 0],
ListPartitionReassignments(46): 0 [usable: 0],
OffsetDelete(47): 0 [usable: 0]
kafka-broker-api-versions.sh
명령어와 함께 --bootstrap-server에 호스트와 9092 포트를 넣으면 원격으로 카프카의 버전과 borker.id, rack 정보, 각종 카프카 브로커 옵션들을 확인할 수 있다.- 우리는 docker를 통해 우분투 서버를 띄우고 9092 포트를 개방해두었으므로 localhost:9092를 통해 통신이 가능하다.
카프카 커맨드 라인 툴(command-line tool)
카프카에서 커맨드 라인 툴을 통해 토픽이나 파티션 개수 변경 등과 같은 카프카 브로커 운영에 필요한 다양한 명령을 내릴 수 있다. 커맨드 라인 툴을 통해 관련 명령을 실행할 때 필수 옵션과 선택 옵션이 있다. 선택 옵션은 지정하지 않을 시 브로커의 기본 설정값 혹은 커맨드 라인 툴의 기본값으로 대체되기 때문에 커맨드 라인 툴을 사용하기 전, 현재 브로커에 옵션이 어떻게 설정되어 있는지 확인 후에 사용하면 커맨드 라인 툴 사용 시 실수할 확률이 줄어든다.
kafka-topics.sh
이 커맨드 라인 툴을 통해 토픽(topic)과 관련된 명령을 수행할 수 있다. 토픽은 카프카에서 데이터를 구분하는 가장 기본적인 개념으로 RDBMS에서의 테이블과 유사하다. 토픽에는 파티션(partition)이 존재하는데, 파티션의 개수는 최소 1개부터 시작한다.
토픽을 생성하는 방법에는 2가지가 있다. 하나는 카프카 컨슈머 또는 프로듀서가 카프카 브로커에 생성되지 않은 토픽에 대해 데이터를 요청할 때이고, 다른 하나는 커맨드 라인 툴로 명시적으로 토픽을 생성하는 것이다. 토픽의 효과적인 유지보수를 위해서는 토픽을 명시적으로 생성하는 것이 추천된다.
토픽 생성
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic hello.kafka
Created topic hello.kafka.
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --config retention.ms=172800000 --topic hello.kafka.2
Created topic hello.kafka.2.
--cretae
옵션으로 토픽 생성 명령어임을 명시한다.--bootstrap-server
에는 토픽을 생성할 카프카 클러스터를 구성하는 브로커들의 IP와 포트를 적는다.--topic
에는 토픽 이름을 작성한다. 유지보수를 위해 토픽 이름은 내부 데이터 유추가 가능할 정도로 자세히 적도록 하자.--partitions
는 파티션의 개수를 지정한다. 파티션 최소 개수는 1개이고, 이 옵션을 사용하지 않으면 카프카 브로커 설정파일(config/server.properties)에 있는num.partitions
옵션값에 따라 생성된다.--replication-factor
에는 토픽의 파티션 복제 개수를 적는다. 파티션의 데이터는 각 브로커마다 저장된다. 이 옵션을 사용하지 않으면 카프카 브로커 설정에 있는default.replication.factor
옵션값을 사용한다.- 1은 복제 하지 않는다는 의미
- 2는 1개의 복제본을 사용한다는 의미
- 실제 업무 환경에서는 3개 이상의 카프카 브로커로 운영하는 것이 일반적이고 2 또는 3으로 복제 개수를 설정하여 사용한다
--config
를 통해kafka-topics.sh
명령에 포함되지 않은 추가적인 설정이 가능하다.retention.ms
는 토픽의 데이터를 유지하는 기간으로, ms 단위이다.
토픽 리스트 조회
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
hello.kafka
hello.kafka.2
--list
옵션을 사용하여 생성된 토픽들의 이름을 확인할 수 있다.- 카프카에 내부 관리를 위한 인터널 토픽(internal topic)이 존재하는데, 실질적으로 운영하는 데에 사용하지 않으므로
--exclude-internal
옵션을 통해 조회 시 목록에서 제외할 수 있다.
토픽 상세 조회
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic hello.kafka.2
Topic: hello.kafka.2 PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=172800000
Topic: hello.kafka.2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka.2 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka.2 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
--describe
옵션을 통해 토픽의 상세한 상태를 확인할 수 있다.- 파티션 개수가 몇 개인지, 복제 파티션이 위치한 브로커의 번호, 기타 토픽을 구성하는 설정들을 확인할 수 있다.
- 리더 파티션이 일부 브로커에 몰려있는 경우 카프카 클러스터 부하가 특정 브로커들로 몰려 네트워크 대역 이슈가 생길 수 있기 떄문에, 토픽 상세 조회 명령을 통해 토픽의 리더 파티션 쏠림 현상을 확인하는 것도 좋은 방법이다.
토픽 옵션 수정
토픽에 설정된 옵션을 변경하기 위해서는 kafka-topics.sh 또는 kafka-configs.sh 두 개를 사용해야 한다.
$ bin/kafka-topcis.sh --bootstrap-server localhost:9092 --topic hello.kafka --alter --partitions 4
$ bin/kafka-conigs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name hello.kafka --alter --add-config retention.ms=86400000
Completed updating config for entity: topic 'hello.kafka'.
$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name hello.kafka --describe
Dynamic configs for topic hello.kafka are:
retention.ms=86400000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=86400000}
- 파티션 개수 변경 ->
kafka-topics.sh
사용--alter
옵션과--partitions
옵션을 함께 사용하여 파티션 개수 변경 가능하다. 토픽의 파티션을 늘릴 수는 있지만, 줄일 수는 없으므로 주의하자.
- 토픽 삭제 정책 등과 같은 다이나믹 토픽 옵션(dynamic topic config)이라고 정의되는 일부 옵션들 변경 ->
kafka-configs.sh
사용retention.ms
를 수정하기 위해kafka-configs.sh
와--alter
,--add-config
옵션을 사용.--add-config
옵션을 사용하면 이미 존재하는 설정값은 변경하고, 존재하지 않는 설정값은 신규로 추가한다.
- 다이나믹 토픽 옵션인
retention.ms
가 변경된 것을kafka-configs.sh
와--describe
옵션을 통해 확인할 수 있다.
kafka-console-producer.sh
이번엔 생성된 토픽에 데이터를 넣을 수 있는 kafka-console-producer.sh 명령어를 사용해보자. 토픽에 넣는 데이터는 '레코드(record)'라고 부르며 메시지 키(key)와 메시지 값(value)으로 이루어져 있다. 키 없이 메시지만 보낼 수도 있는데 이 경우 메시지 키는 자바의 null로 기본 설정되어 브로커로 전송된다.
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello.kafka
> hello
> kafka
> 0
> 1
> 2
> 3
> 4
> 5
- 레코드 값은 UTF-8을 기반으로 ByteeArray로 변환되고 ByteArraySerializer로만 직렬화 된다. 즉, String이 아닌 타입으로 직렬화 하여 전송할 수 없다.
- 다른 타입의 데이터를 전송하고 싶다면 카프카 프로듀서 애플리케이션을 직접 개발해야 한다.
이번엔 메시지 키를 가지는 레코드를 전송해보자.
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello.kafka --property "parse.key=true" --property "key.separator=:"
> key1:no1
> key2:no2
> key3:no3
parse.key
를 true로 설정하면 레코드 전송 시 메시지 키를 추가할 수 있다.key.separator
를 통해 키와 메시지의 구분자를 선언한다. 별도로 선언하지 않으면 기본 설정은 Tab delimiter(\t)이다.
메시지 키가 null인 경우에는 프로듀서가 파티션으로 전송할 때 레코드 배치 단위로 '라운드 로빈'으로 전송한다. 반면, 메시지 키가 존재하는 경우에는 키의 해시값을 작성하여 존재하는 파티션 중 한 개에 할당된다. 즉, 메시지 키가 동일한 경우에는 동일한 파티션으로 전송되는 것이다. 이러한 메시지 키와 파티션 할당은 프로듀서에서 설정된 '파티셔너'에 의해 결정되는데, 커스텀 파티셔너를 구현하여 할당 동작 방식을 임의로 설정할 수 있다. 파티션 개수가 늘어나면 새로 프로듀싱 되는 레코드의 경우 파티션과 메시지 키의 일관성이 보장되지 않는다. 파티션을 추가하더라도 이전에 사용하던 메시지 키의 일관성을 보장해야 하는 경우 커스텀 파티셔너를 만들어서 운영해야 한다.
kafka-cnosole-consumer.sh
이번엔 토픽으로 전송된 데이터를 컨슈머를 통해 확인해보자. 필수 옵션으로 --bootstrap-server
와 --topic
이 필요하다. 추가로 --from-beginning
옵션을 주면 토픽에 저장된 가장 처음 데이터부터 출력한다.
$ bin/kafka-console.consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka --from-beginning
no1
4
5
... 생략
만약 데이터의 메시지 키와 메시지 값을 확인하고 싶다면 --property
옵션을 사용하자
$ bin/kafka-console.consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka --property print.key=true --property key.separator="-" --group hello-group --from-beginning
key1-no1
null-4
null-5
...
key2-no2
key3-no3
- 메시지 키를 확인하기 위해
print.key=true
로 설정한다. - 메시지 키 값을 구분하기 위해
key.separator
를 설정한다. 설정하지 않으면 tab delimieter가 기본값으로 설정되어 출력된다. --group
옵션을 통해 신규 컨슈머 그룹(consumer group)을 생성했다. 컨슈머 그룹은 1개 이상의 컨슈머로 이루어져 있다. 이 컨슈머 그룹을 통해 가져간 토픽의 메시지는 가져간 메시지에 대해 커밋(commit)을 한다.- 커밋: 컨슈머가 특정 레코드까지 처리 완료했다고 레코드의 오프셋 번호를 카프카 브로커에 저장하는 것
- 커밋 정보는
__consumer_offsets
이름의 내부 토픽에 저장된다.
- 전송했던 데이터의 순서가 현재 출력되는 순서와 다른데, 이는 카프카의 핵심인 파티션 개념 때문에 생기는 현상이다.
- kafka-console-consumer.sh로 토픽의 데이터를 가져가게 되면 토픽의 모든 파티션으로부터 동일한 중요도로 데이터를 가져간다. 이로 인해 프로듀서가 토픽에 넣은 데이터의 순서와 컨슈머가 토픽에서 가져간 데이터의 순서가 달라지게 된다.
- 만약 토픽에 넣은 데이터의 순서를 보장하고 싶다면, 가장 좋은 방법은 파티션 1개로 구성된 토픽을 만드는 것이다.
kafka-consumer-groups.sh
컨슈머 그룹을 따로 생성하는 명령을 날리지 않고 컨슈머를 동작할 때 컨슈머 그룹 이름을 지정하면 새로 생성된다. 생성된 컨슈머 그룹에 대한 정보를 조회해보자.
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
hello-group
--list
옵션을 통해 컨슈머 그룹의 리스트를 확인할 수 있다.
이번엔 특정 컨슈머 그룹이 어떤 토픽의 데이터를 가져가는지 확인해보자.
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group hello-group --describe
Consumer group 'hello-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
hello-group hello.kafka 3 3 3 0 - - -
hello-group hello.kafka 2 1 1 0 - - -
hello-group hello.kafka 1 3 3 0 - - -
hello-group hello.kafka 0 5 5 0 - - -
(위는 필자가 테스트를 거치다보니 값이 조금 다를 것이다. 무시하고 진행하자)
--group
옵션을 통해 어떤 컨슈머 그룹에 대한 상세 내용을 볼 것인지 지정한다.--describe
옵션을 통해 컨슈머 그룹의 상세 내용을 확인한다.- GROUP과 TOPIC, PARTITION은 조회한 컨슈머 그룹이 마지막으로 커밋한 토픽과 파티션을 나타낸다.
- CURRENT-OFFSET: 컨슈머 그룹이 가져간 토픽의 파티션에 가장 최신 오프셋(offset)을 나타낸다. 오프셋이란 파티션의 각 레코드에 할당된 번호로 데이터가 파티션에 들어올 때마다 1씩 증가한다.
- LOG-END-OFFSET: 해당 컨슈머 그룹의 컨슈머가 어느 오프셋까지 커밋했는지 알 수 있다. CURRENT-OFFSET은 LOG-END-OFFSET과 같거나 작은 값일 수 있다.
- LAG: 랙(lag)은 컨슈머 그룹이 토픽의 파티션에 있는 데이터를 가져가는 데에 얼마나 지연이 발생하는지 나타내는 지표이다.
- 랙은 컨슈머 그룹이 커밋한 오프셋(LOG-END-OFFSET)과 해당 파티션의 가장 최신 오프셋(CURRENT-OFFSET) 간의 차이이다.
- CONSUMER-ID: 컨슈머의 토픽(파티션) 할당을 카프카 내부적으로 구분하기 위해 사용하는 id로, 자동 할당되어 유니크한 값으로 설정된다.
- HOST: 컨슈머가 동작하는 host 명을 출력한다. 이 값을 통해 카프카에 붙은 컨슈머의 호스트 명 또는 IP를 알 수 있다.
- CLIENT-ID: 컨슈머에 할당된 id로, 사용자가 지정할 수 있으며 지정하지 않으면 자동 생성된다.
이러한 명령어는 컨슈머 개발 혹은 카프카를 운영할 때 둘 다 중요하게 활용된다. 컨슈머 그룹이 중복되지는 않는지 확인하거나, 운영 중인 컨슈머가 랙이 얼마인지 확인하여 컨슈머의 상태를 최적화 하는 데에 사용한다. 또한, 컨슈머 그룹 이름을 알아내고 컨슈머 그룹의 상세 정보를 파악하면 카프카에 연결된 컨슈머의 호스트명 또는 IP를 알아낼 수 있다. 이를 통해 카프카가 인가된 사람에게만 사용 중인지 알 수 있다.
kafka-verifiable-producer, consumer.sh
kafka-verifiable로 시작하는 2개의 스크립트를 사용하면 String 타입 메시지 값을 코드 없이 주고받을 수 있다. 카프카 클러스터 설치가 완료된 이후에 토픽에 데이터를 전송하여 간단한 네트워크 통신 테스트를 할 때 유용하다.
$ bin/kafka-verifiable-producer.sh --bootstrap-server localhost:9092 --max-messages 10 --topic verify-test
{"timestamp":1733279176130,"name":"startup_complete"}
{"timestamp":1733279176248,"name":"producer_send_success","key":null,"value":"0","offset":10,"partition":0,"topic":"verify-test"}
{"timestamp":1733279176249,"name":"producer_send_success","key":null,"value":"1","offset":11,"partition":0,"topic":"verify-test"}
{"timestamp":1733279176249,"name":"producer_send_success","key":null,"value":"2","offset":12,"partition":0,"topic":"verify-test"}
...
{"timestamp":1733279176250,"name":"producer_send_success","key":null,"value":"8","offset":18,"partition":0,"topic":"verify-test"}
{"timestamp":1733279176250,"name":"producer_send_success","key":null,"value":"9","offset":19,"partition":0,"topic":"verify-test"}
{"timestamp":1733279176254,"name":"shutdown_complete"}
{"timestamp":1733279176255,"name":"tool_data","sent":10,"acked":10,"target_throughput":-1,"avg_throughput":78.74015748031496}
--max-messages
는 kafka-verifiable-producer.sh로 보내는 데이터 개수를 지정한다. -1 옵션값으로 입력하면 종료할 때까지 계속 데이터를 토픽으로 보낸다.- 메시지별로 보낸 시간과 메시지 키 & 값, 토픽, 저장된 파티션, 저장도니 오프셋 번호가 출력된다.
- 데이터가 모두 전송된 이후 통계값이 출력된다. 평균 처리량을 확인할 수 있다.
전송한 데이터는 kafka-verifiable-consumer.sh로 확인할 수 있다.
$ bin/kafka-verifiable-consumer.sh --bootstrap-server localhost:9092 --topic verify-test --group-id test-group
{"timestamp":1733279241557,"name":"startup_complete"}
{"timestamp":1733279241738,"name":"partitions_assigned","partitions":[{"topic":"verify-test","partition":0}]}
{"timestamp":1733279241788,"name":"records_consumed","count":10,"partitions":[{"topic":"verify-test","partition":0,"count":10,"minOffset":10,"maxOffset":19}]}
{"timestamp":1733279241801,"name":"offsets_committed","offsets":[{"topic":"verify-test","partition":0,"offset":20}],"success":true}
(출력 내용은 다를 수 있다)
--group-id
을 통해 컨슈머 그룹을 지정한다.- 컨슈머가 실행되면 startup_complete 문자열과 시작 시간이 timestamp와 함께 출력된다. 컨슈머는 토픽에서 데이터를 가져오기 위해 파티션에 할당하는 과정을 거친다.
- 컨슈머는 한 번에 다수의 메시지를 가져와서 처리(배치 처리)하므로 한 번에 10개의 메시지를 정상적으로 받았음을 알 수 있다.
kafka-delete-records.sh
이미 적재된 토픽의 데이터를 kafka-delete-records.sh 명령어로 지울 수 있다. 이 명령어를 사용하면 이미 적재된 토픽의 데이터 중 가장 오래된 데이터(가장 낮은 숫자의 오프셋)부터 특정 시점의 오프셋까지 삭제할 수 있다.
$ vi delete-topic.json
{"partitions": [{"topic": "hello.kafka", "partition": 0, "offset": 2}], "version": 1 }
$ bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file delete-topic.json
Executing records delete operation
Records delete operation completed:
partition: test-0 low_watermark: 2
- 삭제하고자 하는 데이터에 대한 정보를 파일로 저장해서 사용해야 한다. 여기서는 delete-topic.json이라는 파일로 생성했다. 해당 파일에는 삭제하고자 하는 토픽, 파티션, 오프셋 정보가 들어가야 한다.
- hello.kafka 토픽의 0번 파티션에 0부터 2 오프셋까지 지우는 내용을 json 형식으로 작성했다.
--offset-json-file
옵션을 통해 delete-topic.json 파일을 전달한다.- 삭제가 완료되면 각 파티션에서 삭제된 오프셋 정보를 출력한다.
주의해야 할 점은 토픽의 특정 레코드 하나만 삭제하는 것이 아니라, 파티션에 존재하는 가장 오래된 오프셋부터 지정한 오프셋까지 삭제된다는 점이다. 카프카는 스트림 데이터를 다루기 때문에 토픽의 파티션에 저장된 특정 데이터만 삭제할 수 없다는 점을 명심해야 한다!
'Data Infra' 카테고리의 다른 글
[Apache Kafka] 카프카 프로듀서 상세 개념 (2) | 2024.12.11 |
---|---|
[Apache Kafka] 토픽과 파티션 상세 개념 (2) | 2024.12.11 |
[Apache Kafka] 카프카 스트림즈, 카프카 커넥트, 카프카 미러메이커2 (2) | 2024.12.10 |
[Apache Kafka] 카프카 기본 개념 (3) | 2024.12.10 |
[Apache Kafka] 카프카의 탄생과 미래 (1) | 2024.12.03 |