Spring Data

Kafka Streams로 실시간 데이터 분석, Kafka Connect로 데이터 통합

백엔드 유성 2023. 8. 8. 22:05

Kafka Streams의 개념 및 특징

Kafka Streams은 Kafka 기능 위에 구축된 라이브러리로, 실시간 데이터 스트림을 처리하고 분석하기 위한 간단한 방법을 제공합니다. 그 중 주요 특징은 다음과 같다고 합니다.

 

  1. 분산 처리: Kafka Streams 어플리케이션은 분산시스템으로 쉽게 확장될 수 있습니다.
  2. 상태 저장: 내부적으로 RocksDB와 같은 내장 상태 저장소를 활용하여 처리 중인 데이터를 유지합니다.
  3. 탄력성과 내결함성: 어플리케이션 장애나 Kafka 클러스터의 변화에 강하게 설계되어 있습니다.

 

사실 위에 내용을 읽어봐도 크게 와닿지가 않아서 Kafka와의 차이점을 적어보겠습니다.

처리 유형

  • Kafka: 기본적인 메시지 전송과 저장에 초점을 맞춥니다. 메시지를 생성하고, 저장하며, 다른 시스템이나 응용 프로그램으로 전송하는 기본적인 작업에 이상적입니다.
  • Kafka Streams: 복잡한 실시간 스트림 처리 작업에 필요합니다. 예를 들어, 데이터 집계, 변환, 필터링, 조인 등의 연산이 필요한 경우입니다.

복잡성

  • Kafka: 메시지를 생성, 저장, 소비하는 단순한 작업에 적합합니다.
  • Kafka Streams: 상태 관리, 윈도잉, 시간 기반 연산 등의 복잡한 스트림 처리를 수행할 수 있습니다.

지연 시간과 처리 속도

  • Kafka: 높은 처리량과 낮은 지연 시간을 위한 메시지 전송을 목표로 합니다.
  • Kafka Streams: 실시간으로 데이터 스트림을 처리할 수 있습니다.

스케일

  • Kafka: 대용량 데이터의 분산 저장과 전송을 지원합니다.
  • Kafka Streams: 데이터의 병렬 처리와 확장성을 제공합니다.

통합

  • Kafka: 다양한 소스에서 데이터를 수집하거나 다양한 대상 시스템으로 데이터를 전송하는데 사용됩니다.
  • Kafka Streams: Kafka의 데이터를 처리하여 다른 Kafka 토픽에 결과를 저장하거나 외부 시스템과 통합하는데 사용됩니다.

개발 및 운영 복잡성

  • Kafka: Kafka 클러스터의 설정, 관리, 모니터링이 필요합니다.
  • Kafka Streams: 응용 프로그램 개발의 복잡성과 Kafka Streams API의 학습 곡선을 고려해야 합니다.

 

 

실시간 데이터 처리에 Kafka Streams가 어떻게 도움을 주는지

Kafka Streams는 low-latency로 데이터를 처리합니다. 이를 통해서

실시간 대시보드를 위한 데이터 집계

실시간 경고 및 모니터링

실시간으로 데이터 변환 또는 보정

등 다양한 실시간 데이터 애플리케이션의 구현이 가능해집니다.

간단한 Kafka Streams 어플리케이션 예제

Java 17

Kafka-streams 3.4.0

아래 종속성을 추가해주세요.

 

이 코드는 Kafka Streams를 사용하여 텍스트를 단어별로 분리하고, 각 단어의 출현 빈도를 카운트하는 간단한 예제입니다.

implementation 'org.apache.kafka:kafka-streams:3.5.1'

 

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Properties;

public class App {

    private static String kafkaHost = "localhost";
    private static String kafkaPort = "9092";
    private static String consumerGroupId = "word-count-consumer-group";

    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> textLines = builder.stream("TextLinesTopic");

        // TextLInesTopic의 데이터를 출력하는 코드
        textLines.foreach((key, value) ->
                System.out.println("Key: [" + key + "]  Value: [" + value + "]"));

        // 단어를 카운트하는 코드
        KTable<String, Long> wordCounts = textLines
                .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
                .groupBy((key, word) -> word)
                .count(Materialized.as("counts-store"));

        // 단어 카운트 결과를 출력하는 코드
        wordCounts.toStream().foreach((word, count) -> {
            System.out.println("Word: [" + word + "]  Count: [" + count + "]");
        });

        // 단어 카운트 결과를 Kafka WordsWithCountsTopic Topic에 전송하는 코드
        wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(getProperties()));
        streams.start();
    }

    private static Properties getProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, consumerGroupId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + kafkaPort);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");

        return props;
    }
}


먼저 TextLinesTopic 토픽과 WordsWithCountsTopic 토픽을 만들고

TetLInesTopic에 문장을 입력하면 문장에 단어가 각각 몇번 쓰였는지 빈도수가

WordsWithCountsTopic에 <String, Long> 타입으로 저장됩니다.

 

 

Kafka Connect

Kafka Connect의 개념 및 구성요소

Kafka Connect는 외부 시스템과 Apache Kafka 사이의 데이터를 통합하기 위한 플랫폼입니다. 주요 구성요소는 다음과 같습니다.

(그림 추가 예정)

API/DB 등 ----Source Conn----> Kafka Topic ----Sink Conn----> API/DB 등

1. Connector

외부 시스템과 Kafka 사이에 데이터 복사 작업을 관리합니다.

Connector 종류로는 외부 데이터를 가져오는 Source Connector와 Kafka의 데이터를 외부로 보내는 Sink Connector가 있습니다.

2. Tasks

Kafka Connect는 대규모 데이터 워크로드를 효율적으로 처리하기 위해 Connectors의 작업을 여러 개의 Tasks로 분할합니다.
Tasks는 Connector의 단위 작업을 병렬로 처리하여 스케일 아웃 및 고성능 데이터 처리를 지원합니다.
예를 들면, 100개의 데이터가 있는 DB에서 10개씩 병렬로 불러와, 처리하고, 전송하는 것이죠.

Worker

Worker는 Kafka Connect 클러스터에서 Connector와 Task의 실행 환경을 제공하는 구성 요소입니다. Connectors와 Tasks의 실제 실행은 Worker에서 이루어집니다.

Kafka Connect는 단독 모드와 분산 모드 두 가지 방식으로 운영될 수 있습니다.

  • 단독 모드 (Standalone mode): Kafka Connect가 단일 Worker 프로세스 내에서 실행되는 모드입니다. 이 모드에서 모든 Connector와 Task는 해당 Worker에서 동작합니다. 주로 개발 및 테스트 환경에서 사용되는 실행 모드입니다.
  • 분산 모드 (Distributed mode): 여러 Worker 프로세스에서 Kafka Connect가 실행되는 모드입니다. 이 모드에서 Connector와 Task의 작업은 여러 Worker들 사이에 분산되어 처리됩니다. Worker들의 상태와 구성 정보는 특정 Kafka 토픽에 저장되어 관리됩니다. 이 분산 모드는 프로덕션 환경에서 고가용성, 확장성 및 장애 복구를 목적으로 사용됩니다.

각 Worker는 독립적인 JVM 환경에서 실행되기 때문에, 한 Worker에 문제가 발생하더라도 다른 Worker들에는 직접적인 영향을 주지 않습니다.

 

 

실시간 데이터 통합에 Kafka Connect가 어떻게 도움을 주는지

Kafka Connect는 다양한 소스와 싱크에서 데이터를 연속적으로 통합합니다. 이를 통해서 

  • 실시간 데이터베이스와의 데이터 동기화 (DB 마이그레이션 등)
  • 실시간 로그 또는 이벤트 데이터의 수집 및 저장
  • 외부 시스템과의 실시간 데이터 연계 및 통합

등 다양한 실시간 데이터 통합 애플리케이션의 구현이 가능해집니다.

 

 

 

이렇게 Kafka Streams와 Kafka Connect를 활용하면, 실시간 스트림 처리 및 다양한 시스템과의 연계가 간편해집니다.