NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatMessageChannel.java
index bff38ae..69947a9 100644 (file)
@@ -12,7 +12,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -29,8 +28,8 @@ import java.util.stream.IntStream;
 public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
-  private final Consumer<String, MessageTo> consumer;
   private final Producer<String, MessageTo> producer;
+  private final Consumer<String, MessageTo> consumer;
   private final ZoneId zoneId;
   private final int numShards;
   private final boolean[] isShardOwned;
@@ -46,8 +45,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
 
   public ChatMessageChannel(
     String topic,
-    Consumer<String, MessageTo> consumer,
     Producer<String, MessageTo> producer,
+    Consumer<String, MessageTo> consumer,
     ZoneId zoneId,
     int numShards)
   {
@@ -270,8 +269,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
     return Mono.justOrEmpty(chatrooms[shard].get(id));
   }
 
-  Flux<ChatRoom> getChatRooms(int shard)
+  Flux<ChatRoom> getChatRooms()
   {
-    return Flux.fromStream(chatrooms[shard].values().stream());
+    return Flux.fromStream(IntStream
+        .range(0, numShards)
+        .filter(shard -> isShardOwned[shard])
+        .mapToObj(shard -> Integer.valueOf(shard))
+        .flatMap(shard -> chatrooms[shard].values().stream()));
   }
 }