WIP:compiles!
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
index 7ebf049..eadd762 100644 (file)
@@ -80,18 +80,16 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
-    partitions.forEach(tp ->
+    partitions.forEach(topicPartition ->
     {
-      if (!tp.topic().equals(topic))
+      if (!topicPartition.topic().equals(topic))
       {
-        log.warn("Ignoring partition from unwanted topic: {}", tp);
+        log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
         return;
       }
 
-      int partition = tp.partition();
-      long unseenOffset = offsets[partition];
-
-      log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+      int partition = topicPartition.partition();
+      // long unseenOffset = offsets[partition]; TODO: Offset merken...?
     });
     log.info("Revoked partitions: {}", partitions);
   }
@@ -99,45 +97,8 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   @Override
   public void onPartitionsLost(Collection<TopicPartition> partitions)
   {
-    log.info("Revoked partitions: {}", partitions);
-  }
-
-  private void foo()
-  {
-    Set<Integer> owned = Arrays
-      .stream(ownedShards)
-      .collect(
-        () -> new HashSet<>(),
-        (set, i) -> set.add(i),
-        (a, b) -> a.addAll(b));
-    for (int shard = 0; shard < numShards; shard++)
-    {
-      chatRoomMaps[shard] = owned.contains(shard)
-        ? new HashMap<>()
-        : null;
-    }
-    chatroomFlux
-      .filter(chatRoom ->
-      {
-        if (owned.contains(chatRoom.getShard()))
-        {
-          return true;
-        }
-        else
-        {
-          log.info("Ignoring not owned chat-room {}", chatRoom);
-          return false;
-        }
-      })
-      .toStream()
-      .forEach(chatroom -> chatRoomMaps[chatroom.getShard()].put(chatroom.getId(), chatroom));
-  }
-
-  @Override
-  public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
-  {
-    chatRoomMaps[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
-    return Mono.just(chatRoom);
+    // TODO: Muss auf den Verlust anders reagiert werden?
+    onPartitionsRevoked(partitions);
   }
 
   @Override