#시작하기

오늘은 Apache Kafka라는 분산 처리 메시지 시스템을 설치해 보려고 한다.

글의 순서는 크게 문제가 되지 않지만 그래도 앞서 NginX 설치 방법을 작성 후 생각해 보니 

하나의 시스템을 구성 방법에 대해 설명을 하는 것이 좋겠다는 생각이 들었다.

이유는 똥쟁이 블로그 글을 보다 보면 간접적으로 나마 데이터 수집과 분석 시스템을 

구성한 그림을 그려 볼수 있을거 같아서 이다.

그럼 시작을 해보자.


# Oracle JDK 설치

Apache Kafka를 사용하기 위해서는 우선 jdk가 설치 되어 있어야 함으로 jdk를 먼저 설치하겠다.

AWS의 ubuntu server 기준으로 java가 설치되어 있지 않다.

따라서 shell에서 "java"라고 명령어를 입력하면 다음과 같이 나온다.

$ java

The program 'java' can be found in the following packages:
 * default-jre
 * gcj-4.8-jre-headless
 * openjdk-7-jre-headless
 * gcj-4.6-jre-headless
 * openjdk-6-jre-headless
Try: sudo apt-get install 

이 글에서는 위에 나오는 openjdk를 사용하지 않고 oracle jdk를 설치하는 방법을 다루겠다.

자세한 내용은 아래와 같다.

#PPA를 이용하여 oracle java 8 설치
$ sudo apt-add-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java8-installer -y

java 8 버전 설치가 완료 되었는지 확인은 아래와 같이 확인 가능하다.

$ java -version

java version "1.8.0_101"
Java(TM) SE Runtime Environment (build 1.8.0_101-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.101-b13, mixed mode)


# Kafka 설치

자 선행작업으로 jdk 설치가 완료 되었으니 이제 본격적으로 kafka를 설치하고 만져보도록 하자.

먼저 kafka를 설치해야 하는데 설치 방법이 어렵지 않다.

간단하게 파일을 다운로드 후 압축을 해제하면 끝난다.

바로 아래와 같이 말이다.

$ wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
$ tar -zxf kafka_2.11-0.10.1.1.tgz
$ mv kafka_2.11-0.10.1.1.tgz kafka

간단히 설치가 완료 되었다. 이제 카프카를 구동해 보고 간단한 테스트를 해보자.


# kafka 구동

kafka는 zookeeper에 의존하여 동작을 한다.

따라서 kafka를 설치하면 zookeeper도 함께 들어있다. 물론 따로 패키지를 다운로드 하여 사용하거나

이미 사용하고 있다면 그것을 써도 된다.

하지만 이 글에서는 포함되어진 패키지를 활용하여 진행하려 한다.


먼저 zookeeper config 파일을 간단히 보고 zookeeper를 구동해보자.

kafka의 config파일은 "kafka 압축 해제한 경로/config" 이다.

$ cd kafka
$ nano config/zookeeper.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/opt/data/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

위와 같이 config 파일에 설정 내역이 나온다.

zookeeper에서 사용하는 data directory를 변경하고 싶다면 위에서 변경을 하면 되고

2181포트를 다른 곳에서 사용중이라면 위에서 포트 변경을 해주면 된다.

그리고 2대 이상의 cluster를 구성할 경우 위 config파일에 다음과 같이 추가를 하면 된다.

server.1="host name or ip":2888:3888
server.2="host name or ip":2888:3888
server.3="host name or ip":2888:3888
.
.
.
.

위와 같이 cluster를 구성할 hostname(or ip)를 넣어주면 되며 주의 할 점은 server.#ID# 부분을 잘 봐야 한다.

#ID#는 cluster로 구성 될 Instance의 ID로 cluster로 구성될 instance의 zookeeper 상에 지정한 dataDir 내부에 myid 파일에 입력을 하여야 한다.

아래 간단하게 예시를 보여주겠다.

$ cd /opt/data/zookeeper
$ ls -al

drwxr-xr-x 3 xxx xxx 4096 Sep  7 16:57 .
drwxr-xr-x 5 root   root   4096 Sep  7 16:49 ..
-rw-rw-r-- 1 xxx xxx    2 Sep  7 16:56 myid
drwxrwxr-x 2 xxx xxx 4096 Dec  9 14:43 version-2

$ cat myid
1

보이는 것과 같이 해당 cluster로 구성할 instance에 위와 같이 저장하면 되는 것이다.


zookeeper config 파일을 들여다 봤으니 이제는 Kafka config파일을 보자.

위치는 동일하고 파일 이름은 "server.properties"이다.

$ nano config/server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

...

# instance의 broker id
broker.id=1

# instance의 host정보
host.name=xxx.xxx.xxx.xxx

# kafka 로그 저장 경로
log.dirs=/opt/data/kafka-logs

# kafka 데이터 저장 기간
log.retention.hours=36

# zookeeper 접속 정보
zookeeper.connect="hostname or ip":2181,"hostname or ip":2181, ....
# topic 삭제 허용 여부 delete.topic.enable=true

더 많은 설정 내용이 있으나 간단하게만 보도록 하겠다.

broker id는 zookeeper와 다르게 자신의 id 값만 넣어 주면 된다.

host name은 host 정보를 입력하는데 아마 대부분이 private network 환경에 구성할 거라 생각한다.

그럴 경우 private ip나 hostname을 적으면 된다. local test의 경우는 수정하지 않아도 된다.

log.dir은 kafka에서 데이터를 보관하는 저장 경로로 보면 된다.

위의 경로는이해를 돕기 쉽게 zookeeper의 경로와 동일하게 맞춰 수정하였다.

log retention hour는 데이터 보관 기간이다. stream 데이터 처리는 대부분이 real 혹은 near-real처리를 할 것으로 보인다. 

따라서 환경에 따라 알맞게 설정하면 될것으로 보이며, 

위의 설정 36은 데이터가 저장 된 이후 36시간만을 보관하도록 설정한 것이다.

zookeeper connect 는 zookeeper 접속 정보이다.

zookeeper의 host와 앞서 설정한 port 정보를 입력하면 된다.

마지막으로 delete topic enable의 경우는 topic을 실제로 삭제할 것인지를 정해주는 옵션이다.

default는 false이며 topci 삭제 명령을 할 경우 표면적으로 지워진 것으로 보이나 실제로는 topic과 데이터는 지워지지 않는다.

따라서 환경에 따라 설정하면 되겠다.


별로 설정할 건 없지만 말만 길었던거 같아 걱정이 되는데(글재주가 없다..ㅠㅠ)

이제 실제로 구동해보자.

테스트의 구동은 config default 상태로 구동한다.


먼저 zookeeper 부터

$  bin/zookeeper-server-start.sh config/zookeeper.properties
[2017-01-13 16:10:07,113] INFO Reading configuration from: config/zookeeper.properties (org.apache....
[2017-01-13 16:10:07,114] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.Dat....
[2017-01-13 16:10:07,114] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.Datad....
[2017-01-13 16:10:07,114] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCl....
[2017-01-13 16:10:07,114] WARN Either no config or no quorum defined in config, running  in standa....
[2017-01-13 16:10:07,124] INFO Reading configuration from: config/zookeeper.properties (org.apache....
[2017-01-13 16:10:07,124] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2017-01-13 16:10:07,128] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/....
[2017-01-13 16:10:07,128] INFO Server environment:host.name=kafka-dev (org.apache.zookeeper.server....
[2017-01-13 16:10:07,128] INFO Server environment:java.version=1.8.0_91 (org.apache.zookeeper.serv....
[2017-01-13 16:10:07,128] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zooke....
[2017-01-13 16:10:07,128] INFO Server environment:java.home=/usr/lib/jvm/java-8-oracle/jre (org.ap....
...

실행을 하면 위와 같은 형식으로 에러가 없이 진행 된다.

참고로 damon 형태로 구동을 하기 위해서는 "-daemon" 옵션을 이용하면 된다.


$  bin/zookeeper-server-start.sh -daemon config/zookeeper.properties


zookeeper를 구동했으니 이제 kafka를 구동해보자.

마찬가지로 daemon 옵션이 있으며 config 파일의 정보는 아래를 확인하자.


$  $ bin/kafka-server-start.sh -daemon config/server.properties

자 정상적으로 구동이 되었다면 다음으로 데이터를 생성하는 producer와 consumer를 이용하여 테스트를 진행한다.


# kafka producer & consumer

kafka는 아래와 같은 구조를 가지고 있다.

producer는 메시지를 생산해 내는 역할을하고 kafka cluster에서 일정 기간동안 보관을 하고 consumer에서 소비를 하는 구조이다.

자세한 내용은 출처에 링크한 페이지에서 확인 바라며 실제 테스트를 통해서 이해를 돕도록 하겠다.

kafka 패키지에 보면 console producer와 consumer를 제공한다.

shell 상에서 간단하게 테스트가 가능하다. 자세한 사용법은 아래를 참고 하자.

producer 구동 방법은 다음과 같다.

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic
hello world

위와 같이 구동 후 hello world 라고 입력을 하면 kafka cluster의 TestTopic에 보관을 하게 된다.

다음으로 kafka console consumer를 구동해 보겠다.

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic TestTopic
hello world

위와 같이 하면 consumer가 구동 되며 producer에서 입력한 메시지를 볼 수 있다.

gif나 동영상을 업로드해서 보여주고 싶지만 그 만한 능력과 시간이 없음을 이해 바란다.


# Kafka Topic 생성

아.. 글을 쓰다 보니 Topic 생성을 먼저 했어야 했는데....

순서가 바뀐거 같아 난감하다...

블로그를 시작한지 얼마 되지 않아 미숙한 점이 있는데 양해 바란다.

앞서 구동한 producer와 consumer에는 동일하게 topic이란 옵션이 들어간다.

이 topic은 사용자가 환경에 맞게 생성하여 사용하는 것이다.

아래 생성하는 방법을 보겠다.


$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \
--partition 1 --topic TopicName

생성 방법은 간단하다.

bin 경로에 kafka-topic.sh 파일을 사용하며 옵션으로 "create" 명시를 한후 zookeeper의 접속 정보와

replication-factor라고 하는 데이터 복제 수를 지정하면 된다.(위에서는 서버 한대로 구동하니 1개로 하겠다.)

그리고 partition의 수를 결정해야 하는데 위의 옵션은 1이다. 

테스트 환경이 아닌 실제 운영이나 개발 시점에서는 여러번의 성능 테스트를 통하여 적절한 수를 찾기 바란다.

그리고 마지막으로 Topic name을 지정하면 끝난다.

Topic이 정상적으로 생성이 되었는지 혹은 어떤 Topic이 있는지 확인하기 위해서는 다음의 명령어를 활용하면 된다.

# topic list 확인
$ bin/kafka-topics.sh --list --zookeeper localhost:2181

# topic의 상세 정보
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic TopicName

이건 실제로 확인 해보기를 바란다.


# 마치며

설치 방버에 대해서 쓰는데도 양이 만만치가 않다.

블로그를 통해 기술적인 내용을 적으시는 분들은 참 대단하다는 걸 다시 한번 느끼게 되었다.

물론 업무를 보며 짬짬히 써서 더 그런 것도 있겠지만 그렇다 해도 대단한건 대단한거다.

앞으로 개념적인 내용과 사용시 팁, 주의할점 등 쓰고 싶은 내용이 많지만 차차 하기로 하고 오늘은 여기까지 하려고 한다.

한주의 마지막 금요일 인데 눈도 오고 다들 불금 보내길 바란다.









'Big Data System > Apache Kafka' 카테고리의 다른 글

[Apache Kafka] Ubuntu에 Kafka 설치 방법  (2) 2017.01.13

WRITTEN BY
똥쟁이찰스

트랙백  0 , 댓글  2개가 달렸습니다.
  1. 좋은글 감사해요ㅎ
  2. 좋은글 감사합니다.

    2.12-0.11.0.1 version에서는

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \ --partition 1 --topic TopicName

    에서 partition -> partitions 해야 돌아가는군요.
secret