From 0647f2ab664947b54a7c9dcdcbc9652f6fd92932 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 24 Jan 2023 18:43:01 +0100 Subject: [PATCH] WIP --- .../kafka/KafkaChatHomeService.java | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 04fbcfb5..4fa567ce 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -58,8 +58,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL long unseenOffset = offsets[partition]; log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset); - ChatRoomLoader loader = new ChatRoomLoader(partition, currentOffset, unseenOffset); - consumer.seek(tp, unseenOffset); + handlers[partition] = new ChatRoomLoadingMessageHandler(partition, currentOffset, unseenOffset); }); } @@ -135,23 +134,30 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL @RequiredArgsConstructor class NoOpMessageHandler implements MessageHandler { - private final int partition; + private final TopicPartition tp; @Override public MessageHandler handleMessage(Message message) { - log.warn("Not handling message {} for partition {}", message, partition); + log.warn("Not handling message {} for partition {}", message, tp); return this; } } - @RequiredArgsConstructor - class ChatRoomLoader implements MessageHandler + class ChatRoomLoadingMessageHandler implements MessageHandler { - private final int partition; + private final TopicPartition tp; private final long currentOffset; private final long unseenOffset; + ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset) + { + this.tp = tp; + this.currentOffset = currentOffset; + this.unseenOffset = unseenOffset; + + consumer.seek(tp, unseenOffset); + } @Override public MessageHandler handleMessage(Message message) @@ -160,4 +166,17 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL return this; } } + + @RequiredArgsConstructor + class DefaultMessageHandler implements MessageHandler + { + private final TopicPartition tp; + + @Override + public MessageHandler handleMessage(Message message) + { + chatrooms[tp.partition()].put() + return this; + } + } } -- 2.20.1