From 7d8b0b3c36669b5e18c1f43134e2e1e55a4e6f65 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 17 Feb 2023 19:27:09 +0100 Subject: [PATCH] WIP --- .../persistence/kafka/KafkaChatHomeService.java | 2 +- .../backend/persistence/kafka/MessageHandler.java | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 70a96bc3..2e3b42f6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -35,7 +35,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL for (int i=0; i< numShards; i++) { this.offsets[i] = 0l; - this.handlers[i] = new MessageHandler(new TopicPartition(topic, i)); + this.handlers[i] = new MessageHandler(consumer, new TopicPartition(topic, i)); } this.chatrooms = new Map[numShards]; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java index 271f52d8..7209c224 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java @@ -1,6 +1,20 @@ package de.juplo.kafka.chat.backend.persistence.kafka; +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; + + +@RequiredArgsConstructor class MessageHandler { + private final Consumer consumer; + private final TopicPartition tp; + + + void handleMessage(Message message) + { + } } -- 2.20.1