Kafka는 실시간 데이터 스트리밍을 처리하는 강력한 메시징 시스템으로, 대규모 분산 시스템에서 데이터 전송 및 처리에 널리 사용됩니다. 이번 포스팅에서는 Kafka의 기본 개념과 파이썬을 사용한 Kafka 실습을 다룬 후, Iris 데이터셋을 Kafka를 통해 전송하고, Consumer가 이를 데이터베이스에 저장하는 고급 예제를 포함하여 설명합니다.
Kafka란?
Kafka는 오픈소스로 개발된 분산 스트리밍 플랫폼으로, 대용량 데이터 처리와 실시간 스트리밍에 사용됩니다. Kafka는 주로 실시간 데이터 처리와 메시징 서비스로 활용되며, IoT, 로그 수집, 실시간 데이터 분석 등 다양한 분야에서 사용됩니다.
Kafka를 사용하는 이유?
1. 데이터 흐름의 실시간성
- 일반적인 데이터 저장:
- 데이터를 직접 데이터베이스에 저장할 때는 보통 애플리케이션이 데이터베이스와 바로 통신하여 데이터를 삽입합니다.
- 이 방식은 일괄 처리(batch processing)에 적합하며, 실시간 데이터 스트리밍에는 부적합할 수 있습니다.
- 예를 들어, 특정 시간대에 데이터를 수집하여 일괄로 데이터베이스에 저장하거나, 정기적으로 데이터를 업데이트할 때 사용합니다.
- Kafka를 사용한 데이터 저장:
- Kafka를 사용하면 데이터를 실시간으로 스트리밍하여 데이터베이스에 저장할 수 있습니다.
- 데이터가 발생하는 즉시 Kafka Producer가 Kafka Topic에 데이터를 전송하고, Consumer가 이를 실시간으로 수신하여 데이터베이스에 기록합니다.
- 예를 들어, 웹 로그, IoT 센서 데이터, 금융 거래 기록 등 연속적으로 발생하는 데이터를 즉각적으로 저장하고 분석하는 데 유리합니다.
2. 확장성 및 분산 처리
- 일반적인 데이터 저장:
- 일반적으로 애플리케이션이 데이터베이스에 직접 연결되는 경우, 특정 데이터베이스에 대한 부하가 집중될 수 있습니다.
- 데이터베이스가 대규모 데이터를 실시간으로 저장하기 위해 처리 용량을 확장해야 할 때 어려움이 있을 수 있습니다.
- Kafka를 사용한 데이터 저장:
- Kafka는 분산형 메시징 시스템으로 확장성이 뛰어나며, 대량의 데이터를 병렬로 처리하고 전송할 수 있습니다.
- Kafka Topic에 저장된 데이터는 여러 Consumer가 동시에 접근할 수 있으므로, 데이터를 다양한 목적지에 동시에 분산시키고 저장할 수 있습니다.
- 이를 통해 데이터 저장이 더 효율적이고 유연해지며, 여러 시스템에 데이터를 동시에 제공해야 하는 경우에도 부하를 분산시킬 수 있습니다.
3. 데이터 처리 효율성 및 내결함성
- 일반적인 데이터 저장:
- 데이터베이스에 직접 데이터를 저장하는 경우, 데이터베이스에 의존하여 데이터의 일관성을 유지합니다. 만약 데이터베이스 서버에 문제가 생기면 데이터가 손실될 위험이 있습니다.
- 트랜잭션 제어나 백업이 필요한 경우가 많고, 데이터 처리 효율이 데이터베이스 성능에 직접적으로 영향을 받습니다.
- Kafka를 사용한 데이터 저장:
- Kafka는 내결함성을 제공하며, 데이터가 손실 없이 안정적으로 전송될 수 있도록 보장합니다.
- 데이터를 일시적으로 저장하는 역할을 하여 데이터베이스에 부하를 줄이고, 데이터베이스의 가용성을 높이는 데 기여합니다.
- Kafka가 데이터를 수신하면 데이터를 안전하게 유지할 수 있고, Consumer는 필요한 시점에 데이터를 수신하므로 데이터 처리의 유연성이 높아집니다.
4. 비동기 처리 지원
- 일반적인 데이터 저장:
- 데이터베이스에 직접 연결해 데이터를 저장하는 방식은 동기식으로 처리되는 경우가 많습니다. 데이터 저장 시점에 애플리케이션은 데이터베이스 응답을 기다려야 하며, 데이터 전송이 완료되어야 다음 작업을 수행할 수 있습니다.
- Kafka를 사용한 데이터 저장:
- Kafka를 사용하면 비동기식 데이터 전송이 가능합니다. Producer가 데이터를 Topic에 전송하면 Consumer가 해당 데이터를 비동기적으로 수신하고 처리할 수 있습니다.
- 이로 인해 애플리케이션이 데이터를 전송하는 동안 다른 작업을 수행할 수 있어 성능을 향상시킬 수 있습니다.
구분 | 일반 데이터 저장 | Kafka를 이용한 데이터 저장 |
실시간성 | 일괄 처리에 적합 | 실시간 스트리밍 처리 가능 |
확장성 및 분산 처리 | 데이터베이스 부하 집중 | 분산 처리 및 확장성 용이 |
데이터 처리 효율성 | 데이터베이스 의존성 높음 | 내결함성 및 데이터 보존 보장 |
비동기 처리 | 동기식 처리 주로 사용 | 비동기식 처리로 성능 향상 가능 |
Kafka의 기본 구성 요소
- Producer: Kafka에 데이터를 전송하는 애플리케이션입니다.
- Consumer: Kafka에서 데이터를 읽어오는 애플리케이션입니다.
- Broker: 메시지를 저장하고 관리하는 서버입니다.
- Topic: 메시지가 전송되는 주제입니다.
Kafka 기본 실습: Producer와 Consumer 구현
1. Kafka 설치 및 실행
Kafka는 분산 시스템으로 설계되어 클러스터 환경에서 운영됩니다. Kafka 클러스터 내의 브로커와 노드 간의 상태 동기화와 메타데이터 관리는 Zookeeper를 통해 이루어집니다. Zookeeper의 주요 역할은 다음과 같습니다:
- 브로커의 메타데이터 관리: Kafka 클러스터의 모든 브로커와 파티션 정보를 관리하고 공유합니다.
- 리더 선출 및 클러스터 안정성 유지: Kafka 클러스터에서 특정 파티션의 리더를 선출하며, 장애가 발생하면 리더를 재선출하여 데이터 처리를 지속합니다.
- 클러스터 구성 정보 관리: 새로운 브로커 추가, 설정 변경 등이 발생하면 클러스터 전체에 알립니다.
Zookeeper가 Kafka의 브로커 역할을 하므로, Kafka를 실행하기 전에 반드시 Zookeeper를 먼저 설치하고 실행해야 합니다.
2. Python에서 Kafka 사용을 위한 라이브러리 설치
Kafka와 연동하기 위해 kafka-python 라이브러리를 사용합니다.
pip install kafka-python
3. Kafka Producer 구현
Kafka Producer를 사용해 간단한 메시지를 전송합니다.
from kafka import KafkaProducer
import json
# Kafka Producer 설정
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8') )
# 데이터 전송
def send_message(topic, message):
producer.send(topic, message)
producer.flush() # 메시지가 전송될 때까지 대기
# 예시 데이터 전송
message = {'user': 'test_user', 'action': 'login'}
send_message('user_activity', message)
print("Message sent to Kafka topic")
4. Kafka Consumer 구현
Kafka Consumer를 통해 데이터를 수신합니다.
from kafka import KafkaConsumer
import json
# Kafka Consumer 설정
consumer = KafkaConsumer(
'user_activity',
bootstrap_servers='localhost:9092',
group_id='my-consumer-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# 메시지 소비
def consume_message():
for message in consumer:
print(f"Received message: {message.value}")
consume_message()
Kafka와 데이터베이스 연동: Iris 데이터 송수신 예제
이제 Kafka를 이용해 Iris 데이터셋을 전송하고, Consumer가 이를 데이터베이스에 저장하는 예제를 살펴보겠습니다.
Iris 데이터셋 설명
Iris 데이터셋은 꽃받침과 꽃잎의 길이와 너비, 그리고 품종 정보로 구성된 데이터셋입니다.
1. Iris 데이터를 Kafka로 전송하는 Producer 구현
from kafka import KafkaProducer
from sklearn.datasets import load_iris
import json
import time
# Kafka Producer 설정
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Iris 데이터 로드
iris = load_iris()
data = iris['data']
target = iris['target']
# Kafka로 Iris 데이터 전송
def send_iris_data():
for i in range(len(data)):
message = {
'features': data[i].tolist(),
'label': int(target[i])
}
producer.send('iris_topic', message)
producer.flush()
time.sleep(0.1)
send_iris_data()
print("Iris data sent to Kafka topic 'iris_topic'")
2. 데이터베이스에 저장할 Kafka Consumer 구현
Consumer는 iris_topic에서 데이터를 수신하고, PostgreSQL 데이터베이스에 저장합니다.
PostgreSQL 데이터베이스 설정
아래 SQL 명령어로 iris_data 테이블을 생성합니다.
CREATE TABLE iris_data (
id SERIAL PRIMARY KEY,
sepal_length FLOAT,
sepal_width FLOAT,
petal_length FLOAT,
petal_width FLOAT,
label INT
);
Consumer 구현 및 데이터베이스 저장 코드
from kafka import KafkaConsumer
import psycopg2
import json
# Kafka Consumer 설정
consumer = KafkaConsumer(
'iris_topic',
bootstrap_servers='localhost:9092',
group_id='iris-consumer-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# PostgreSQL 데이터베이스 연결
connection = psycopg2.connect(
dbname="your_db_name",
user="your_db_user",
password="your_db_password",
host="localhost"
)
cursor = connection.cursor()
# Kafka 메시지를 데이터베이스에 저장
def consume_and_store_data():
for message in consumer:
data = message.value
features = data['features']
label = data['label']
cursor.execute(
"""
INSERT INTO iris_data (sepal_length, sepal_width, petal_length, petal_width, label)
VALUES (%s, %s, %s, %s, %s)
""",
(features[0], features[1], features[2], features[3], label)
)
connection.commit()
print(f"Stored record with label {label}")
consume_and_store_data()
Kafka와 데이터베이스 연동 활용 사례
- 실시간 데이터 분석: 실시간으로 수집되는 데이터를 Kafka로 전송하고 데이터베이스에 저장하여 실시간 분석 시스템과 연동할 수 있습니다.
- IoT 센서 데이터 관리: Kafka와 데이터베이스 연동을 통해 센서 데이터를 실시간으로 수집하고 분석할 수 있습니다.
- 모델 학습 데이터 관리: 수집된 데이터를 머신러닝 모델의 학습 및 테스트 데이터로 활용할 수 있습니다.
Reference
https://zookeeper.apache.org/releases.html
https://kafka.apache.org/downloads