X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatMessageChannel.java;h=ac30f1d34e04287bca9b27727244648cfe55a9d0;hb=18d05361eed887f038c9b01b593dad56ea00ff1c;hp=7b19bb6b1f2584c6246e05a831aa0b68efedbf81;hpb=5c338c58065988f7841c4ab9ee1b193e754da9b9;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 7b19bb6b..ac30f1d3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -18,12 +18,11 @@ import reactor.core.publisher.Mono; import java.time.*; import java.util.*; -import java.util.concurrent.Callable; import java.util.stream.IntStream; @Slf4j -public class ChatMessageChannel implements Callable>, ConsumerRebalanceListener +public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { private final String topic; private final Producer producer; @@ -61,6 +60,9 @@ public class ChatMessageChannel implements Callable>, Consum this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; this.chatrooms = new Map[numShards]; + IntStream + .range(0, numShards) + .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); } @@ -153,9 +155,9 @@ public class ChatMessageChannel implements Callable>, Consum } @Override - public Optional call() + public void run() { - consumer.subscribe(List.of(topic)); + consumer.subscribe(List.of(topic), this); running = true; @@ -191,15 +193,9 @@ public class ChatMessageChannel implements Callable>, Consum log.info("Received WakeupException, exiting!"); running = false; } - catch (Exception e) - { - log.error("Exiting abnormally!"); - return Optional.of(e); - } } log.info("Exiting normally"); - return Optional.empty(); } void loadMessages(ConsumerRecords records) @@ -234,11 +230,7 @@ public class ChatMessageChannel implements Callable>, Consum return IntStream .range(0, numShards) .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard]) - .collect( - () -> Boolean.TRUE, // TODO: Boolean is immutable - (acc, v) -> Boolean.valueOf(acc && v), // TODO: Boolean is immutable - (a, b) -> Boolean.valueOf(a && b)); // TODO: Boolean is immutable + .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); } void pauseAllOwnedPartions()