X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatMessageChannel.java;h=7b19bb6b1f2584c6246e05a831aa0b68efedbf81;hb=5c338c58065988f7841c4ab9ee1b193e754da9b9;hp=78665dee8adf8358e92049c976ab3fb68f42fae2;hpb=cbc0ed5c9bf21fc08ca4312627342738e331c634;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 78665dee..7b19bb6b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -12,25 +12,22 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.Callable; import java.util.stream.IntStream; @Slf4j -public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener +public class ChatMessageChannel implements Callable>, ConsumerRebalanceListener { private final String topic; - private final Consumer consumer; private final Producer producer; + private final Consumer consumer; private final ZoneId zoneId; private final int numShards; private final boolean[] isShardOwned; @@ -46,8 +43,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener public ChatMessageChannel( String topic, - Consumer consumer, Producer producer, + Consumer consumer, ZoneId zoneId, int numShards) { @@ -68,6 +65,50 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener } + Mono sendMessage( + UUID chatRoomId, + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + int shard = this.shardingStrategy.selectShard(chatRoomId); + TopicPartition tp = new TopicPartition(topic, shard); + ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + tp.topic(), + tp.partition(), + zdt.toEpochSecond(), + chatRoomId.toString(), + MessageTo.of(key.getUsername(), key.getMessageId(), text)); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + // On successful send + Message message = new Message(key, metadata.offset(), timestamp, text); + log.info("Successfully send message {}", message); + sink.success(message); + } + else + { + // On send-failure + log.error( + "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", + chatRoomId, + key, + timestamp, + text, + exception); + sink.error(exception); + } + })); + }); + } + @Override public void onPartitionsAssigned(Collection partitions) { @@ -112,7 +153,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener } @Override - public void run() + public Optional call() { consumer.subscribe(List.of(topic)); @@ -147,11 +188,18 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener } catch (WakeupException e) { + log.info("Received WakeupException, exiting!"); + running = false; } - catch (RecordDeserializationException e) + catch (Exception e) { + log.error("Exiting abnormally!"); + return Optional.of(e); } } + + log.info("Exiting normally"); + return Optional.empty(); } void loadMessages(ConsumerRecords records) @@ -202,50 +250,6 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener .toList()); } - Mono sendMessage( - UUID chatRoomId, - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - int shard = this.shardingStrategy.selectShard(chatRoomId); - TopicPartition tp = new TopicPartition(topic, shard); - ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - tp.topic(), - tp.partition(), - zdt.toEpochSecond(), - chatRoomId.toString(), - MessageTo.of(key.getUsername(), key.getMessageId(), text)); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - // On successful send - Message message = new Message(key, metadata.offset(), timestamp, text); - log.info("Successfully send message {}", message); - sink.success(message); - } - else - { - // On send-failure - log.error( - "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", - chatRoomId, - key, - timestamp, - text, - exception); - sink.error(exception); - } - })); - }); - } - void putChatRoom(ChatRoom chatRoom) { @@ -271,8 +275,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener return Mono.justOrEmpty(chatrooms[shard].get(id)); } - Flux getChatRooms(int shard) + Flux getChatRooms() { - return Flux.fromStream(chatrooms[shard].values().stream()); + return Flux.fromStream(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> Integer.valueOf(shard)) + .flatMap(shard -> chatrooms[shard].values().stream())); } }