WIP:compiles!
authorKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 18:26:50 +0000 (19:26 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 18:26:50 +0000 (19:26 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java

index cfd0e4e..04564b9 100644 (file)
@@ -1,9 +1,8 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import lombok.RequiredArgsConstructor;
-import lombok.extern.log4j.Log4j;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
@@ -15,7 +14,7 @@ import java.util.UUID;
 
 
 @RequiredArgsConstructor
-@Log4j
+@Slf4j
 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
 {
   private final KafkaChatRoomService kafkaChatRoomService;
index 7ebf049..eadd762 100644 (file)
@@ -80,18 +80,16 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
-    partitions.forEach(tp ->
+    partitions.forEach(topicPartition ->
     {
-      if (!tp.topic().equals(topic))
+      if (!topicPartition.topic().equals(topic))
       {
-        log.warn("Ignoring partition from unwanted topic: {}", tp);
+        log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
         return;
       }
 
-      int partition = tp.partition();
-      long unseenOffset = offsets[partition];
-
-      log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+      int partition = topicPartition.partition();
+      // long unseenOffset = offsets[partition]; TODO: Offset merken...?
     });
     log.info("Revoked partitions: {}", partitions);
   }
@@ -99,45 +97,8 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   @Override
   public void onPartitionsLost(Collection<TopicPartition> partitions)
   {
-    log.info("Revoked partitions: {}", partitions);
-  }
-
-  private void foo()
-  {
-    Set<Integer> owned = Arrays
-      .stream(ownedShards)
-      .collect(
-        () -> new HashSet<>(),
-        (set, i) -> set.add(i),
-        (a, b) -> a.addAll(b));
-    for (int shard = 0; shard < numShards; shard++)
-    {
-      chatRoomMaps[shard] = owned.contains(shard)
-        ? new HashMap<>()
-        : null;
-    }
-    chatroomFlux
-      .filter(chatRoom ->
-      {
-        if (owned.contains(chatRoom.getShard()))
-        {
-          return true;
-        }
-        else
-        {
-          log.info("Ignoring not owned chat-room {}", chatRoom);
-          return false;
-        }
-      })
-      .toStream()
-      .forEach(chatroom -> chatRoomMaps[chatroom.getShard()].put(chatroom.getId(), chatroom));
-  }
-
-  @Override
-  public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
-  {
-    chatRoomMaps[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
-    return Mono.just(chatRoom);
+    // TODO: Muss auf den Verlust anders reagiert werden?
+    onPartitionsRevoked(partitions);
   }
 
   @Override