[카프카] 도커(Docker)를 이용한 카프카(Kafka)

도커(Docker) 설치

  • 카프카 클러스터(Kafka Cluster)를 생성하여 그 안에서 카프카 브로커(Broker)를 생성한다.
  • 브로커는 각각의 토픽 파티션을 갖게 되며, 분산된 환경에서 카프카를 운용한다.
  • 로컬에서 관리하기 어렵기 때문에 도커 내에서 카프카 인스턴스를 생성한다.
    • 도커는 컴퓨터 안의 조그마한 버츄얼(Virtual) 컴퓨터라고 생각하면 된다.
  • 가상 컴퓨터를 여러가지 만들고 그 안에서 카프카 클러스터를 생성할 것이다.
  • 도커를 다운로드한다.

카프카 클러스터(Kafka Cluster) 구축

구축 준비

  • 카프카 클러스터를 만들기 위해서는 주키퍼(Zookeeper)가 필요하다.
  • 주키퍼를 위한 인스턴스를 하나 만들고, 클러스터링을 위해 주키퍼를 세가지 만들 것이다.
  • 세가지의 카프카 브로커들은 하나의 토픽(Topic)을 서빙(Serving)할 것이며, 하나의 토픽은 각각의 인스턴스 안에서 두개의 파티션(Partition)으로 나뉘어 총 여섯가지의 파티션을 이용하여 카프카 클러스터를 만들 것이다.
  • 파티션이 여러 개이기 때문에, 리플리케이션 팩터(Replication Factor)도 두 개 정도로 만들어 메세지가 도착했을 때, 다른 파티션으로 넘어갈 수 있게끔 구성할 것이다.
  • 앞서 실습했던 서버(Server)를 모두 꺼준다.
1
2
$ ./bin/kafka-server-stop.sh
$ ./bin/zookeeper-server-stop.sh

주키퍼(Zookeeper) 생성

  • 카프카 클러스터를 만들기 위해서 필요한 요소 중 하나인 주키퍼 인스턴스를 만들 것이다.
  • 도커 컴포즈(Compose) 설정을 생성하여 그 위에 주키퍼를 띄울 것이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# docker-compose.yml

version: '3' # 컴포즈 버전, 현재 최신 버전
services: # 인스턴스를 띄울 서비스를 리스트 형식으로 나열
  zookeeper: # 주키퍼 인스턴스
    image: zookeeper:3.7 # 이미지, 도커 허브의 주키퍼 오피셜 이미지를 사용
    hostname: zookeeper
    ports: # 버츄얼 도커 인스턴스와 로컬 컴퓨터를 이어주는 포트 설정 필요
      - "2181:2181" # 직접 코딩 가능
    environment: # 여러가지 설정
      ZOO_MY_ID: 1 # 주키퍼의 ID
      ZOO_PORT: 2181 # 주키퍼의 포트
      ZOO_SERVERS: server.1=zookeeper:2888:3888 # 주키퍼의 서버
    volumes: # 파일을 공유할 폴더를 입력
      - ./data/zookeeper/data:/data # 버츄얼 환경 안에서 생기는 폴더
      - ./data/zookeeper/datalog:/datalog # : 뒷단은 로컬에서도 접근할 수 있는 폴더, 로그
  • 위처럼 작성한 파일을 실행한다.
1
2
3
4
5
6
7
8
9
10
11
$ docker-compose up
'''
...
zookeeper_1  | java.lang.NullPointerException
zookeeper_1  |  at org.apache.zookeeper.server.ContainerManager.getCandidates(ContainerManager.java:162)
zookeeper_1  |  at org.apache.zookeeper.server.ContainerManager.checkContainers(ContainerManager.java:129)
zookeeper_1  |  at org.apache.zookeeper.server.ContainerManager$1.run(ContainerManager.java:97)
zookeeper_1  |  at java.base/java.util.TimerThread.mainLoop(Unknown Source)
zookeeper_1  |  at java.base/java.util.TimerThread.run(Unknown Source)
...
'''
  • 주키퍼 인스턴스가 전시되며 실행됨을 확인할 수 있다.

브로커(Broker) 생성

  • 위 yml파일에 이어서, 세 가지 카프카 인스턴스를 작성하여 도커로 띄운다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
...
  kafka1:
    image: confluentinc/cp-kafka:7.0.0 # 카프카를 만든 팀이 나와서 새로 설립한 회사(엄청 큼)
    hostname: kafka1
    ports:
      - "9091:9091" # 카프카 서버가 세개이기 때문에 각각의 포트가 필요
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091 # 도커를 이용할 때 설정
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" # 주키퍼와 연결
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka1/data:/tmp/kafka-logs # 주키퍼와 동일하게 로그가 작성되는 경로 지정
    depends_on:
      - zookeeper # 주키퍼가 먼저 실행되기 위한 의존성을 생성
  kafka2:
    image: confluentinc/cp-kafka:7.0.0
    hostname: kafka2
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka2/data:/tmp/kafka-logs
    depends_on:
      - zookeeper
  kafka3:
    image: confluentinc/cp-kafka:7.0.0
    hostname: kafka3
    ports:
      - "9093:9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka3/data:/tmp/kafka-logs
    depends_on:
      - zookeeper
1
1
$ docker-compose rm -svf
  • 도커 컴포즈 작성 후 띄우기 전, 위 명령어를 통해 동작하고 있던 도커 컴포즈를 삭제한다.
1
$ docker-compose up 
  • 위 명령어로 실행하면, 주키퍼가 실행되고 카프카 1-3이 실행된다.

image

  • 도커 GUI를 통해 실행되고 있는 것을 확인할 수 있다.

카프드롭(Kafdrop)

  • 카프카 브로커의 관리를 좀 더 쉽게 해주는 툴을 알아볼 것이다.
  • 카프드롭이라는 오픈 소스 라이브러리이며, 웹 UI를 통해 눈으로 확인할 수 있게 해주는 프로그램이다.
1
2
3
4
5
6
7
8
9
10
11
12
...
  kafdrop:
    image: obsidiandynamics/kafdrop # 도커 허브의 이미지를 사용
    restart: "no"
    ports: 
      - "9000:9000"
    environment:
      KAFKA_BROKER_CONNECT: "kafka1:19091" # 카프카 하나만 넣으면 됨
    depends_on: # 카프카 1-3이 모두 실행된 뒤 실행되도록 의존성 추가
      - kafka1
      - kafka2
      - kafka3
  • 이제 카프카 클러스터의 모든 설정이 끝났다.
  • 실행시켜보면 모두 정상적으로 실행됨을 확인할 수 있다.

image

  • localhost:9000으로 접속한다.

image

  • 카프드롭의 웹 뷰를 확인할 수 있다.

토픽(Topic) 생성

  • 지금까지 주키퍼 인스턴스를 만들고, 카프카 인스턴스 세 개를 띄운 다음, 마지막으로 카프드롭도 인스턴스로 뽑아 만들었다.
  • 이렇게 굉장히 작은 버추얼 카프카 클러스터를 구성해보았다.
  • 클러스터 안에서 쓸 토픽을 만들어 볼 것이다.
1
1
$ docker-compose rm -svf
  • 위 명령어로 모든 환경을 깔끔히 지우고 새롭게 시작한다.
1
$ docker-compose up -d
  • 카프카가 백그라운드에서 작동되면 관리가 더욱 쉬워진다.
  • 위 명령어를 통해 실행하면 만든 컨테이너가 백그라운드에서 돌아가며, 터미널을 닫더라도 지속적으로 실행이 된다.
1
$ docker exec -it sixat-kafka1-1 kafka-topics --bootstrap-server=localhost:19091 --create --topic first-cluster-topic --partitions 3 --replication-factor 1 
  • 토픽을 생성하기 위해서는 카프카 인스턴스 안에 들어가서 사용해야한다.
  • 위 명령어를 통해 토픽을 생성한다.

image

  • 카프드롭을 통해서도 토픽이 생성됨을 확인할 수 있다.
  • 명령어를 통해서도 생성할 수 있지만, 카프드롭을 통해 + New 버튼을 통해서도 생성하고 삭제도 가능하다.

카프카 클러스터 이용

프로듀서(Producer)

  • 만들어진 클러스터를 사용할 수 있는 카프카 프로듀서를 만들어 볼 것이다.
  • 새로운 파일을 만들어 코드를 작성한다.
  • 여기서 생기는 프로듀서는 브로커 세 가지를 쓸 수 있는 프로듀서이다.
1
2
3
4
5
6
7
8
9
10
# cluster-producer.py
from kafka import KafkaProducer # 패키지

brokers = ["localhost:9091", "localhost:9092", "localhost:9093"] # 스트링 값으로 브로커 리스트 저장
topicName = "first-cluster-topic" # 토픽 이름 지정

producer = KafkaProducer(bootstrap_servers = brokers) # 프로듀서를 인스턴스화

producer.send(topicName, b"Hello cluster world") # 토픽을 받아 스트링값을 전송
producer.flush()

컨슈머(Consumer)

  • 클러스터를 쓸 수 있는 컨슈머를 생성한다.
1
2
3
4
5
6
7
from kafka import KafkaConsumer # 패키지

brokers = ["localhost:9091", "localhost:9092", "localhost:9093"] # 통신 할 브로커 리스트 생성
consumer = KafkaConsumer("first-cluster-topic", bootstrap_servers=brokers) # 프로듀서 인스턴스화

for message in consumer: # 컨슈머가 메세지를 받을 때 마다 프린트
  print(message)
  • 두 가지 창을 띄워 프로듀서와 컨슈머를 실행시킨다.

image

  • 위 이미지처럼 컨슈머 동작 중 프로듀서를 실행하면 컨슈머에 전시됨을 확인할 수 있다.

카프드롭에서 메세지를 확인하는 방법

image

  • 토픽을 선택한다.

image

  • Total size에 토픽의 수가 전시되며, View Messages를 클릭한다.

image

  • View Messages를 클릭하면 입력한 메세지를 확인할 수 있다.
  • 카프드롭에서는 이렇게 편하게 볼 수 있기에, 관리적인 측면에서 굉장히 좋다.
0%