X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatMessageChannel.java;h=138d9a7b706bfab4c01f2933e9502109a1b483ef;hb=5d366ac288990782c88927a4ff967b3f28fdec19;hp=69947a9d1da9113e4cc93a1c4b277f8bc34f51a7;hpb=2a47c081a50f8ebdda6db9955c3ddc69e5c601c2;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 69947a9d..138d9a7b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -17,10 +17,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.stream.IntStream; @@ -63,6 +60,9 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; this.chatrooms = new Map[numShards]; + IntStream + .range(0, numShards) + .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); } @@ -157,7 +157,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener @Override public void run() { - consumer.subscribe(List.of(topic)); + consumer.subscribe(List.of(topic), this); running = true; @@ -194,6 +194,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener running = false; } } + + log.info("Exiting normally"); } void loadMessages(ConsumerRecords records) @@ -214,7 +216,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId); if (chatRoom == null) { - // Alles pausieren und erst von putChatRoom wieder resumen lassen! + // TODO: Alles pausieren und erst von putChatRoom wieder resumen lassen! } KafkaChatRoomService kafkaChatRoomService = (KafkaChatRoomService) chatRoom.getChatRoomService(); @@ -228,11 +230,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener return IntStream .range(0, numShards) .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard]) - .collect( - () -> Boolean.TRUE, // TODO: Boolean is immutable - (acc, v) -> Boolean.valueOf(acc && v), // TODO: Boolean is immutable - (a, b) -> Boolean.valueOf(a && b)); // TODO: Boolean is immutable + .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); } void pauseAllOwnedPartions()