From: Kai Moritz Date: Sat, 9 Mar 2024 10:17:33 +0000 (+0100) Subject: fix: GREEN - `ChatRoomData` obeys to the added expectations. X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9a3898831e031353da657533e2f14cf8925036a3;p=demos%2Fkafka%2Fchat fix: GREEN - `ChatRoomData` obeys to the added expectations. * Switched `ChatRoomData` from a multicast- to a replay-sink. * Before, listening was implemented with a multicast-sink, that enabled back-pressure. * Now, it was refactored to use a replay-sink, that enables a (configurable) limitted replay. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index c3609a39..ba794523 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -17,7 +17,7 @@ public class ChatBackendProperties { private String instanceId = "DEV"; private String allowedOrigins = "http://localhost:4200"; - private int chatroomBufferSize = 1024; + private int chatroomHistoryLimit = 100; private ServiceType services = ServiceType.inmemory; private InMemoryServicesProperties inmemory = new InMemoryServicesProperties(); private KafkaServicesProperties kafka = new KafkaServicesProperties(); diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java index d783eb4c..9dbeda9e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java @@ -21,21 +21,21 @@ public class ChatRoomData private final ChatMessageService service; private final Clock clock; - private final int bufferSize; + private final int historyLimit; private Sinks.Many sink; public ChatRoomData( Clock clock, ChatMessageService service, - int bufferSize) + int historyLimit) { - log.info("Created ChatRoom with buffer-size {}", bufferSize); + log.info("Created ChatRoom with history-limit {}", historyLimit); this.clock = clock; this.service = service; - this.bufferSize = bufferSize; + this.historyLimit = historyLimit; // @RequiredArgsConstructor unfortunately not possible, because - // the `bufferSize` is not set, if `createSink()` is called + // the `historyLimit` is not set, if `createSink()` is called // from the variable declaration! this.sink = createSink(); } @@ -110,7 +110,7 @@ public class ChatRoomData { return Sinks .many() - .multicast() - .onBackpressureBuffer(bufferSize); + .replay() + .limit(historyLimit); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java index 3f3d8884..43973ae0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java @@ -33,7 +33,7 @@ public class InMemoryServicesConfiguration { SimpleChatHomeService chatHomeService = new SimpleChatHomeService( clock, - properties.getChatroomBufferSize()); + properties.getChatroomHistoryLimit()); chatHomeService.restore(storageStrategy).block(); return chatHomeService; } @@ -57,7 +57,7 @@ public class InMemoryServicesConfiguration SimpleChatHomeService service = chatHomes[shard] = new SimpleChatHomeService( shard, clock, - properties.getChatroomBufferSize()); + properties.getChatroomHistoryLimit()); service.restore(storageStrategy).block(); }); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index d568a9b4..2aac0fac 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -18,24 +18,24 @@ public class SimpleChatHomeService implements ChatHomeService private final Map chatRoomInfo; private final Map chatRoomData; private final Clock clock; - private final int bufferSize; + private final int historyLimit; public SimpleChatHomeService( Clock clock, - int bufferSize) + int historyLimit) { this( null, clock, - bufferSize); + historyLimit); } public SimpleChatHomeService( Integer shard, Clock clock, - int bufferSize) + int historyLimit) { log.debug("Creating SimpleChatHomeService"); @@ -43,7 +43,7 @@ public class SimpleChatHomeService implements ChatHomeService this.chatRoomInfo = new HashMap<>(); this.chatRoomData = new HashMap<>(); this.clock = clock; - this.bufferSize = bufferSize; + this.historyLimit = historyLimit; } @@ -81,7 +81,7 @@ public class SimpleChatHomeService implements ChatHomeService new ChatRoomData( clock, chatMessageService, - bufferSize)); + historyLimit)); return chatMessageService.restore(storageStrategy); }) @@ -95,11 +95,11 @@ public class SimpleChatHomeService implements ChatHomeService @Override public Mono createChatRoom(UUID id, String name) { - log.info("Creating ChatRoom with buffer-size {}", bufferSize); + log.info("Creating ChatRoom with history-limit {}", historyLimit); ChatMessageService service = new InMemoryChatMessageService(id); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); this.chatRoomInfo.put(id, chatRoomInfo); - ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); + ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit); this.chatRoomData.put(id, chatRoomData); return Mono.just(chatRoomInfo); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 2468af5a..63f36f53 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -36,7 +36,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener private final ZoneId zoneId; private final int numShards; private final Duration pollingInterval; - private final int bufferSize; + private final int historyLimit; private final Clock clock; private final boolean[] isShardOwned; private final long[] currentOffset; @@ -58,7 +58,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener ZoneId zoneId, int numShards, Duration pollingInterval, - int bufferSize, + int historyLimit, Clock clock, ChannelMediator channelMediator, ShardingPublisherStrategy shardingPublisherStrategy) @@ -75,7 +75,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener this.zoneId = zoneId; this.numShards = numShards; this.pollingInterval = pollingInterval; - this.bufferSize = bufferSize; + this.historyLimit = historyLimit; this.clock = clock; this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; @@ -355,9 +355,9 @@ public class DataChannel implements Channel, ConsumerRebalanceListener } else { - log.info("Creating ChatRoomData {} with buffer-size {}", chatRoomId, bufferSize); + log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit); KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - chatRoomData = new ChatRoomData(clock, service, bufferSize); + chatRoomData = new ChatRoomData(clock, service, historyLimit); this.chatRoomData[shard].put(chatRoomId, chatRoomData); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index bebed14b..58e11173 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -150,7 +150,7 @@ public class KafkaServicesConfiguration zoneId, properties.getKafka().getNumPartitions(), properties.getKafka().getPollingInterval(), - properties.getChatroomBufferSize(), + properties.getChatroomHistoryLimit(), clock, channelMediator, shardingPublisherStrategy);