Apache Kafka を触ってみた

Apache Kafka

オブジェクト指向カンファレンスでメッセージソーシングに触れトライしてみたいと思い、まずは、その実装要素である メッセージブローカー について検証してみました。

今回は代表的なメッセージブローカである Apache Kafka についてです。

Kafkaは、大量のデータをリアルタイムで処理できる分散ストリーミングプラットフォームであり、高速でスケーラブルなメッセージングシステムを提供します。
Amazon MSK のような、Kafka のフルマネージド型サービスも提供されています。

今回は、docker-compose で kafka を local pc に起動し、 Java の SDK を利用して Produce と Consume の様子を見ていきます。

環境準備

以下の docker-compose.yml を用意して起動しました。kafka 自体は UI を持たないため、初心者の私は kafka-ui も入れています。

version: '3.7'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_HOST://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

  kafka-ui:
    image: provectuslabs/kafka-ui
    ports:
      - "8080:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092

Apache Kafka の構成要素

Kafka には「トピック」「ブローカー」などのワードが登場します。それらを端的に絵にすると以下のようになります。

データはトピックと呼ばれるカテゴリーに整理され、トピックはさらにパーティションに分割されてデータのスケーラビリティと並行性を向上させます。ブローカーはKafkaサーバーのインスタンスであり、クラスタ内のブローカー群がメッセージの保存と配信を担います。

Producer と Consumer と Partition

プロデューサーはデータをメッセージとしてKafkaトピックに送信します。このとき、プロデューサーはどのトピックにメッセージを送るかを指定し、必要に応じて特定のパーティションを指定することもできます。パーティションはトピック内でデータを分割し、保存する単位であり、これによりデータの並行処理とスケーラビリティが向上します。

コンシューマーはトピックをサブスクライブし、プロデューサーが送信したメッセージをリアルタイムに処理します。コンシューマーは、コンシューマーグループを形成し、同じグループ内のコンシューマーでトピックのパーティションを共有します。

実際にやってみた

なんとも言葉だけだと理解が難しかったので、実際に簡単なサンプルコードで動きを確認してみました。

トピックの作成

kafka 上にトピックを作成します。kafka のコンテナに入り下記コマンドを実行します。

kafka-topics --create --topic demo_kafka --partitions 3 --replication-factor 1 --bootstrap-server localhost:29092

demo_kafka という名前のトピックを、パーティション 3 つで指定しました。

Producer

Producer を作成し、 Kafka のトピックにメッセージが送信されることを確認します。

public class Producer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
            ProducerRecord<String, String> record = new ProducerRecord<>("demo_kafka", "key_1", "hello world");
            producer.send(record);
        }
    }
}

上記を実行し、kafka-ui を確認すると、メッセージが送信されていました!

みづらくて恐縮ですが、、パーティションは指定していないですが、 partitin:1 に保存されたみたいです。

続いて、この Producer 下記のように修正し、10 のメッセージを送信するようにしました。
この時、キーは key_i(カウンタ) としています。

public class Producer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("demo_kafka", "key_" + i, "hello world");
                producer.send(record);
            }
        }
    }
}

kafka-ui をみてみるとメッセージが保存されていました。

ここで注目ポイントは、 key_1 というメッセージが 2 つありますが、どちらも partition:1 に保存されています。

つまり、同じキーを持つメッセージは、同じパーティションに保存されるようになっているのです!

Consumer

続いて、 Consumer を以下のように作成します。簡易コードです。

public class Consumer {

    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "demo-group-1");
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        properties.put("auto.offset.reset", "earliest");

        while (true) {
            try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
                consumer.subscribe(List.of("demo_kafka"));
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
                for (ConsumerRecord<String, String> record : records) {
                    log.info("partition: " + record.partition() + " - " +
                            "key: " + record.key() + " - " +
                            "value: " + record.value());
                }
            }
        }
    }
}

そして Consumer 実行中に、もう一度 Producer を実行すると、以下のようなログが出力されました。

[main] INFO org.example.Consumer - partition: 2 - key: key_0 - value: hello world
[main] INFO org.example.Consumer - partition: 2 - key: key_6 - value: hello world
[main] INFO org.example.Consumer - partition: 2 - key: key_7 - value: hello world
[main] INFO org.example.Consumer - partition: 2 - key: key_8 - value: hello world
[main] INFO org.example.Consumer - partition: 2 - key: key_9 - value: hello world
[main] INFO org.example.Consumer - partition: 0 - key: key_2 - value: hello world
[main] INFO org.example.Consumer - partition: 0 - key: key_4 - value: hello world
[main] INFO org.example.Consumer - partition: 0 - key: key_5 - value: hello world
[main] INFO org.example.Consumer - partition: 1 - key: key_1 - value: hello world
[main] INFO org.example.Consumer - partition: 1 - key: key_3 - value: hello world

3 つすべてのパーティションからメッセージを取得していることがわかります。

複数の Consumer

Consumer を 2 プロセス同時に起動し、Producer を実行しました。
すると以下のようなログになりました。

  • Consumer1
    [main] INFO org.example.Consumer - partition: 0 - key: key_2 - value: hello world
    [main] INFO org.example.Consumer - partition: 0 - key: key_4 - value: hello world
    [main] INFO org.example.Consumer - partition: 0 - key: key_5 - value: hello world
    [main] INFO org.example.Consumer - partition: 1 - key: key_1 - value: hello world
    [main] INFO org.example.Consumer - partition: 1 - key: key_3 - value: hello world
  • Consumer2
    [main] INFO org.example.Consumer - partition: 2 - key: key_0 - value: hello world
    [main] INFO org.example.Consumer - partition: 2 - key: key_6 - value: hello world
    [main] INFO org.example.Consumer - partition: 2 - key: key_7 - value: hello world
    [main] INFO org.example.Consumer - partition: 2 - key: key_8 - value: hello world
    [main] INFO org.example.Consumer - partition: 2 - key: key_9 - value: hello world

Consumer ごとにパーティションが割り振られていることがわかります。
つまり、同一のパーティションは同一の Consumer がサブスクライブするようになっているのです。(すごい!)

まとめ

Kafka の検証は簡単なものであれば、 PC 上で簡単にできることがわかりました。
そして分かりづらかったトピックやパーティションの振る舞いですが、以下の要素を覚えることとしました。

  • 同じキーを持つメッセージは同じパーティションに割り当てられる
  • 1つのパーティションは1つの Consumer に割り当てられる

つまり、例えば key をアグリゲートIDにすることで、イベントの順序を担保した Consumer の処理ができることになると思います。

Kafka には Kafka Connect などの様々な機能があるようなので、次回はそちらにチャレンジしたいと思います。