X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatMessageChannel.java;h=94f6fa6b2ac9d94e99f534b3a9f2a0f44846f8bd;hb=f5c4bff6a6103d0415c8b2ac9e4ab7517c04c215;hp=8a53d3c9fd0ef62781bc745cf706bad9f76b79b3;hpb=28a98bfdbed0cf56697bece5efbe6eb52f331611;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 8a53d3c9..94f6fa6b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -3,7 +3,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -30,11 +29,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener private final Consumer consumer; private final ZoneId zoneId; private final int numShards; + private final int bufferSize; + private final Clock clock; private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; private final Map[] chatrooms; - private final KafkaLikeShardingStrategy shardingStrategy; private boolean running; @Getter @@ -46,7 +46,9 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener Producer producer, Consumer consumer, ZoneId zoneId, - int numShards) + int numShards, + int bufferSize, + Clock clock) { log.debug( "Creating ChatMessageChannel for topic {} with {} partitions", @@ -57,6 +59,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener this.producer = producer; this.zoneId = zoneId; this.numShards = numShards; + this.bufferSize = bufferSize; + this.clock = clock; this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; @@ -64,7 +68,6 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener IntStream .range(0, numShards) .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); - this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); } @@ -111,15 +114,13 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener LocalDateTime timestamp, String text) { - int shard = this.shardingStrategy.selectShard(chatRoomId); - TopicPartition tp = new TopicPartition(topic, shard); ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( - tp.topic(), - tp.partition(), + topic, + null, zdt.toEpochSecond(), chatRoomId.toString(), ChatMessageTo.of(key.getUsername(), key.getMessageId(), text)); @@ -261,6 +262,13 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener (ChatMessageTo) record.value(), record.partition()); break; + + default: + log.debug( + "Ignoring message for chat-room {} with offset {}: {}", + chatRoomId, + record.offset(), + record.value()); } nextOffset[record.partition()] = record.offset() + 1; @@ -348,13 +356,4 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { return Mono.justOrEmpty(chatrooms[shard].get(id)); } - - Flux getChatRooms() - { - return Flux.fromStream(IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> Integer.valueOf(shard)) - .flatMap(shard -> chatrooms[shard].values().stream())); - } }