From: Kai Moritz Date: Tue, 24 Jan 2023 17:33:13 +0000 (+0100) Subject: WIP X-Git-Tag: wip~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=53cf5ca04f85f03ab35d5635304c43d7ef9eb862;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 4a87d310..04fbcfb5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -2,6 +2,8 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -18,6 +20,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL private final Consumer consumer; private final String topic; private final long[] offsets; + private final MessageHandler[] handlers; private final Map[] chatrooms; @@ -30,8 +33,12 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL this.consumer = consumer; this.topic = topic; this.offsets = new long[numShards]; + this.handlers = new MessageHandler[numShards]; for (int i=0; i< numShards; i++) + { this.offsets[i] = 0l; + this.handlers[i] = new NoOpMessageHandler(i); + } this.chatrooms = new Map[numShards]; } @@ -51,6 +58,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL long unseenOffset = offsets[partition]; log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset); + ChatRoomLoader loader = new ChatRoomLoader(partition, currentOffset, unseenOffset); consumer.seek(tp, unseenOffset); }); } @@ -116,4 +124,40 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL { return Flux.fromStream(chatrooms[shard].values().stream()); } + + + interface MessageHandler + { + MessageHandler handleMessage(Message message); + } + + + @RequiredArgsConstructor + class NoOpMessageHandler implements MessageHandler + { + private final int partition; + + @Override + public MessageHandler handleMessage(Message message) + { + log.warn("Not handling message {} for partition {}", message, partition); + return this; + } + } + + @RequiredArgsConstructor + class ChatRoomLoader implements MessageHandler + { + private final int partition; + private final long currentOffset; + private final long unseenOffset; + + + @Override + public MessageHandler handleMessage(Message message) + { + // todo + return this; + } + } }