From: Kai Moritz Date: Fri, 17 Feb 2023 18:27:09 +0000 (+0100) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=7d8b0b3c36669b5e18c1f43134e2e1e55a4e6f65;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 70a96bc3..2e3b42f6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -35,7 +35,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL for (int i=0; i< numShards; i++) { this.offsets[i] = 0l; - this.handlers[i] = new MessageHandler(new TopicPartition(topic, i)); + this.handlers[i] = new MessageHandler(consumer, new TopicPartition(topic, i)); } this.chatrooms = new Map[numShards]; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java index 271f52d8..7209c224 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java @@ -1,6 +1,20 @@ package de.juplo.kafka.chat.backend.persistence.kafka; +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; + + +@RequiredArgsConstructor class MessageHandler { + private final Consumer consumer; + private final TopicPartition tp; + + + void handleMessage(Message message) + { + } }