import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
-public class KafkaChatHomeService implements ChatHomeService
+public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
{
+ private Consumer<String, >
+ private final long[] offsets;
private final Map<UUID, ChatRoom>[] chatrooms;
- public KafkaChatHomeService(
- int numShards,
- int[] ownedShards,
- Flux<ChatRoom> chatroomFlux)
+ public KafkaChatHomeService(int numShards)
{
- log.debug("Creating ChatHomeService");
+ log.debug("Creating KafkaChatHomeService");
this.chatrooms = new Map[numShards];
+ this.offsets = new long[numShards];
+ for (int i=0; i< numShards; i++)
+ this.offsets[i] = 0l;
+ }
+
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ log.info("Assigned partitions: {}", partitions);
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ log.info("Revoked partitions: {}", partitions);
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ log.info("Revoked partitions: {}", partitions);
+ }
+
+ private void foo()
+ {
Set<Integer> owned = Arrays
.stream(ownedShards)
.collect(