[카프카] 프로듀서(Producer), 컨슈머(Consumer) CLI

복습

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
(sixat) 11:50:25 kafka_2.13-3.1.0 ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # 주키퍼 실행

(sixat) 11:50:29 kafka_2.13-3.1.0 ./bin/kafka-server-start.sh -daemon config/server.properties # 브로커 실행

(sixat) 11:50:47 kafka_2.13-3.1.0 netstat -an | grep 2181
'''
tcp6       0      0  ::1.2181               ::1.59308              ESTABLISHED
tcp6       0      0  ::1.59308              ::1.2181               ESTABLISHED
tcp46      0      0  *.2181                 *.*                    LISTEN     
'''

(sixat) 11:50:55 kafka_2.13-3.1.0 bin/kafka-topics.sh --list --bootstrap-server localhost:9092 # 토픽 확인
'''
first-topic
'''

(sixat) 11:51:09 kafka_2.13-3.1.0 bin/kafka-topics.sh --describe --bootstrap-server localhost:9092
'''
Topic: first-topic      TopicId: 9AZAxsacTKSOfYLmgYj7ig PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
       Topic: first-topic      Partition: 0    Leader: 0       Replicas: 0     Isr: 0
'''

프로듀서(Producer) CLI

  • CLI에서 카프카(Kafka)의 프로듀서(Producer)를 사용하여 몇 가지 메세지(Massage)를 보내볼 것이다.
  • 전 포스팅에서 주키퍼(Zookeeper)와 브로커(Broker), 토픽(Topic)까지 생성했다.
  • 앞으로 진행할 모든 실습은 카프카와 브로커, 주키퍼가 켜진 상태로 진행한다.
  • 카프카의 프로듀서와 관련된 스크립트(Scripts)를 살펴볼 것이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
1
2
3
4
5
6
7
8
9
10
11
12
13
1
2
3
4
5
6
7
8
9
10
11
12
13
(sixat) 11:52:27 kafka_2.13-3.1.0 ls bin
'''
connect-distributed.sh             kafka-consumer-perf-test.sh        kafka-producer-perf-test.sh        kafka-verifiable-consumer.sh
connect-mirror-maker.sh            kafka-delegation-tokens.sh         kafka-reassign-partitions.sh       kafka-verifiable-producer.sh
connect-standalone.sh              kafka-delete-records.sh            kafka-replica-verification.sh      trogdor.sh
kafka-acls.sh                      kafka-dump-log.sh                  kafka-run-class.sh                 windows
kafka-broker-api-versions.sh       kafka-features.sh                  kafka-server-start.sh              zookeeper-security-migration.sh
kafka-cluster.sh                   kafka-get-offsets.sh               kafka-server-stop.sh               zookeeper-server-start.sh
kafka-configs.sh                   kafka-leader-election.sh           kafka-storage.sh                   zookeeper-server-stop.sh
kafka-console-consumer.sh          kafka-log-dirs.sh                  kafka-streams-application-reset.sh zookeeper-shell.sh
kafka-console-producer.sh          kafka-metadata-shell.sh            kafka-topics.sh
kafka-consumer-groups.sh           kafka-mirror-maker.sh              kafka-transactions.sh
'''
  • kafka-console-producer.sh이란 파일이 존재하는데, 이 파일로 카프카의 프로듀서를 만들 수 있다.

프로듀서 실행

1
2
(sixat) 11:59:20 kafka_2.13-3.1.0 ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
>
  • 부트스트랩 서버(Bootstrap Server)와 토픽(Topic)을 지정하여 카프카 콘솔 프로듀서를 실행한다.
  • 꺽새가 나오게 되며, 프로듀서를 만든 것으로 보면 된다.
  • 여기서 메세지를 입력하면 그대로 토픽으로 보내지게 된다.
1
2
>6mini is handsome 
>6mini is smart
  • 라인 하나가 메세지가 되어 first-topic에 보내지게 된다.
  • 컨슈머(Consumer)를 이용하여 메세지를 꺼내볼 수 있다.

컨슈머(Consumer) CLI

  • 카프카 프로듀서에서 토픽으로 보낸 몇 가지 메세지를 받을 수 있는 컨슈머를 만들어볼 것이다.
  • 컨슈머를 위한 스크립트도 필요하다.
1
2
3
4
5
6
7
8
9
10
11
12
13
1
2
3
4
5
6
7
8
9
10
11
12
13
1
2
3
4
5
6
7
8
9
10
11
12
13
(sixat) 11:52:27 kafka_2.13-3.1.0 ls bin
'''
connect-distributed.sh             kafka-consumer-perf-test.sh        kafka-producer-perf-test.sh        kafka-verifiable-consumer.sh
connect-mirror-maker.sh            kafka-delegation-tokens.sh         kafka-reassign-partitions.sh       kafka-verifiable-producer.sh
connect-standalone.sh              kafka-delete-records.sh            kafka-replica-verification.sh      trogdor.sh
kafka-acls.sh                      kafka-dump-log.sh                  kafka-run-class.sh                 windows
kafka-broker-api-versions.sh       kafka-features.sh                  kafka-server-start.sh              zookeeper-security-migration.sh
kafka-cluster.sh                   kafka-get-offsets.sh               kafka-server-stop.sh               zookeeper-server-start.sh
kafka-configs.sh                   kafka-leader-election.sh           kafka-storage.sh                   zookeeper-server-stop.sh
kafka-console-consumer.sh          kafka-log-dirs.sh                  kafka-streams-application-reset.sh zookeeper-shell.sh
kafka-console-producer.sh          kafka-metadata-shell.sh            kafka-topics.sh
kafka-consumer-groups.sh           kafka-mirror-maker.sh              kafka-transactions.sh
'''

컨슈머 생성

  • kafka-console-consumer.sh라는 파일로 컨슈머를 생성할 수 있다.
1
(sixat) 12:14:22 kafka_2.13-3.1.0 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic
  • 실행시 아무런 행동도 일어나지 않는데, 그 이유는 메세지가 도착하지 않았기 때문이다.
  • 메세지를 실시간으로 보낼 수 있는 프로듀서를 생성한다.
1
(sixat) 11:59:20 kafka_2.13-3.1.0 ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic

image

  • 프로듀서에서 메세지 작성 시 컨슈머에서 바로 전시되는 것을 확인할 수 있다.
  • 단순히 컨슈머 하나를 생성했는데, 컨슈머 그룹을 생성하는 방법도 알아볼 것이다.

컨슈머 그룹 생성

  • 카프카의 컨슈머 그룹을 다루는 방법을 알아볼 것이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
1
2
3
4
5
6
7
8
9
10
11
12
13
1
2
3
4
5
6
7
8
9
10
11
12
13
(sixat) 11:52:27 kafka_2.13-3.1.0 ls bin
'''
connect-distributed.sh             kafka-consumer-perf-test.sh        kafka-producer-perf-test.sh        kafka-verifiable-consumer.sh
connect-mirror-maker.sh            kafka-delegation-tokens.sh         kafka-reassign-partitions.sh       kafka-verifiable-producer.sh
connect-standalone.sh              kafka-delete-records.sh            kafka-replica-verification.sh      trogdor.sh
kafka-acls.sh                      kafka-dump-log.sh                  kafka-run-class.sh                 windows
kafka-broker-api-versions.sh       kafka-features.sh                  kafka-server-start.sh              zookeeper-security-migration.sh
kafka-cluster.sh                   kafka-get-offsets.sh               kafka-server-stop.sh               zookeeper-server-start.sh
kafka-configs.sh                   kafka-leader-election.sh           kafka-storage.sh                   zookeeper-server-stop.sh
kafka-console-consumer.sh          kafka-log-dirs.sh                  kafka-streams-application-reset.sh zookeeper-shell.sh
kafka-console-producer.sh          kafka-metadata-shell.sh            kafka-topics.sh
kafka-consumer-groups.sh           kafka-mirror-maker.sh              kafka-transactions.sh
'''
  • kafka-consumer-groups.sh이란 파일을 이용하면 된다.
  • 컨슈머 그룹을 리스팅(listing)하는 작업을 먼저 진행한다.
1
2
3
4
(sixat) 12:23:09 kafka_2.13-3.1.0 ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
'''
console-consumer-29758
'''
  • console-consumer-29758이라는 컨슈머 그룹이 만들어진 것을 확인할 수 있다.
  • 컨슈머를 만들 때 그룹을 따로 명시하지 않으면 유니크한 컨슈머 그룹이 따로 만들어진다.
  • 위 그룹이 바로 유니크한 컨슈머 그룹이 된다.
1
(sixat) 12:23:14 kafka_2.13-3.1.0 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group first-group 
  • 위와 같은 명령을 통해 컨슈머 그룹을 명시할 수도 있다.
1
2
3
4
5
(sixat) 12:30:23 kafka_2.13-3.1.0 ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
'''
console-consumer-29758
first-group
'''
  • 리스팅 시 컨슈머 그룹 두 가지가 전시되는 것을 확인할 수 있다.
1
2
3
4
5
(sixat) 12:30:38 kafka_2.13-3.1.0 ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group first-group 
'''
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
first-group     first-topic     0          6               6               0               console-consumer-40b059e4-be3c-4ffe-bc0e-2da5f31f8c90 /10.210.131.29  console-consumer
'''
  • 위와 같은 명령어로 세부 정보를 확인할 수 있다.
  • 지금까지 카프카 브로커를 실행시키고 토픽을 만들고, 프로듀서에서 메세지를 만들어 그것을 컨슈머에서 받아 마지막으로 컨슈머 그룹을 이루는 방법까지 알아보았다.
  • 다음 포스팅부터 본격적으로 카프카를 프로매틱하게 사용하는 방법을 알아볼 것이다.
0%