From 91782ddca4ed528acc89c6d636367041ed49872c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 13 Sep 2023 23:15:13 +0200 Subject: [PATCH] WIP --- .../implementation/kafka/DataChannel.java | 4 ++-- .../implementation/kafka/InfoChannel.java | 24 +++++++++++-------- .../kafka/KafkaChatHomeServiceTest.java | 19 +++++++++------ 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index a7899e43..f20f7c23 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -175,7 +175,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener if (loadInProgress) { - loadChatRoom(records); + loadChatRoomData(records); if (isLoadingCompleted()) { @@ -203,7 +203,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void loadChatRoomData(ConsumerRecords records) { for (ConsumerRecord record : records) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 5a1d186d..a98e46ea 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -28,7 +28,7 @@ public class InfoChannel implements Runnable private final Consumer consumer; private final int numShards; private final long[] startOffset; - private final long[] currentOffset; + private final long[] nextOffset; private final Map chatRoomInfo; private final DataChannel dataChannel; @@ -53,15 +53,10 @@ public class InfoChannel implements Runnable .partitionsFor(topic) .size(); this.startOffset = new long[numShards]; - this.currentOffset = new long[numShards]; + this.nextOffset = new long[numShards]; IntStream .range(0, numShards) - .forEach(partition -> this.currentOffset[partition] = -1l); - consumer - .endOffsets(consumer.assignment()) - .entrySet() - .stream() - .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue()); + .forEach(partition -> this.nextOffset[partition] = -1l); this.dataChannel = dataChannel; } @@ -71,7 +66,7 @@ public class InfoChannel implements Runnable { return IntStream .range(0, numShards) - .anyMatch(partition -> currentOffset[partition] < startOffset[partition]); + .anyMatch(partition -> nextOffset[partition] < startOffset[partition]); } Mono sendChatRoomCreatedEvent( @@ -117,6 +112,15 @@ public class InfoChannel implements Runnable { running = true; + consumer + .endOffsets(consumer.assignment()) + .entrySet() + .stream() + .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue()); + IntStream + .range(0, numShards) + .forEach(partition -> this.nextOffset[partition] = 0l); + while (running) { try @@ -155,7 +159,7 @@ public class InfoChannel implements Runnable record.value()); } - startOffset[record.partition()] = record.offset() + 1; + nextOffset[record.partition()] = record.offset() + 1; } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 6bf3a742..cf097f84 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -80,16 +80,21 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @BeforeAll public static void sendAndLoadStoredData(@Autowired KafkaTemplate messageTemplate) { - send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "event_chatroom_created"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); + send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "event_chatroom_created"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); } - static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) + static void send( + KafkaTemplate kafkaTemplate, + String topic, + String key, + String value, + String typeId) { - ProducerRecord record = new ProducerRecord<>(DATA_TOPIC, key, value); + ProducerRecord record = new ProducerRecord<>(topic, key, value); record.headers().add("__TypeId__", typeId.getBytes()); SendResult result = kafkaTemplate.send(record).join(); log.info( -- 2.20.1