X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatMessageChannel.java;h=69947a9d1da9113e4cc93a1c4b277f8bc34f51a7;hb=2a47c081a50f8ebdda6db9955c3ddc69e5c601c2;hp=bff38ae216066876704e52e73f087c3fc20f5f3b;hpb=df2360875c75f25798f2f55d1e77fc8d46cc31b0;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index bff38ae2..69947a9d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -12,7 +12,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -29,8 +28,8 @@ import java.util.stream.IntStream; public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { private final String topic; - private final Consumer consumer; private final Producer producer; + private final Consumer consumer; private final ZoneId zoneId; private final int numShards; private final boolean[] isShardOwned; @@ -46,8 +45,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener public ChatMessageChannel( String topic, - Consumer consumer, Producer producer, + Consumer consumer, ZoneId zoneId, int numShards) { @@ -270,8 +269,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener return Mono.justOrEmpty(chatrooms[shard].get(id)); } - Flux getChatRooms(int shard) + Flux getChatRooms() { - return Flux.fromStream(chatrooms[shard].values().stream()); + return Flux.fromStream(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> Integer.valueOf(shard)) + .flatMap(shard -> chatrooms[shard].values().stream())); } }