From: Kai Moritz Date: Mon, 23 Jan 2023 16:32:56 +0000 (+0100) Subject: WIP X-Git-Tag: wip~6 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=4d6adecb44b8d072cdcdac99eb06828189684e32;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 0e18f685..cdeccec5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -3,6 +3,9 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ChatRoom; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.TopicPartition; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -10,18 +13,43 @@ import java.util.*; @Slf4j -public class KafkaChatHomeService implements ChatHomeService +public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener { + private Consumer + private final long[] offsets; private final Map[] chatrooms; - public KafkaChatHomeService( - int numShards, - int[] ownedShards, - Flux chatroomFlux) + public KafkaChatHomeService(int numShards) { - log.debug("Creating ChatHomeService"); + log.debug("Creating KafkaChatHomeService"); this.chatrooms = new Map[numShards]; + this.offsets = new long[numShards]; + for (int i=0; i< numShards; i++) + this.offsets[i] = 0l; + } + + + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("Assigned partitions: {}", partitions); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + log.info("Revoked partitions: {}", partitions); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.info("Revoked partitions: {}", partitions); + } + + private void foo() + { Set owned = Arrays .stream(ownedShards) .collect(