From cc38c4721e5076970d5dcea57dbd066906f0f3da Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 12 Sep 2023 23:33:59 +0200 Subject: [PATCH] refactor: Separated channels for data and info -- Refactored/aligned code * Split `ChatRoomChannel` into `InfoChannel` and `DataChannel` ** `DataChannel` manages only data for chat-messages ** `InfoChannel` manages all info-data (at the moment only `EventChatRoomCreated`) * Aligned test-setup for kafka-related tests --- .../chat/backend/ChatBackendProperties.java | 3 +- .../chat/backend/domain/ChatRoomInfo.java | 6 + .../kafka/ConsumerTaskExecutor.java | 2 +- .../kafka/ConsumerTaskRunner.java | 25 ++ .../implementation/kafka/DataChannel.java | 161 +------- .../implementation/kafka/InfoChannel.java | 353 +++--------------- .../kafka/KafkaChatHomeService.java | 28 +- .../kafka/KafkaChatMessageService.java | 4 +- .../kafka/KafkaServicesApplicationRunner.java | 11 +- .../kafka/KafkaServicesConfiguration.java | 133 +++++-- .../kafka/messages/AbstractMessageTo.java | 1 + .../data/EventChatMessageReceivedTo.java | 3 +- .../messages/info/EventChatRoomCreated.java | 45 ++- .../backend/storage/mongodb/ChatRoomTo.java | 4 +- .../mongodb/MongoDbStorageStrategy.java | 13 +- .../chat/backend/KafkaConfigurationIT.java | 89 +++-- .../kafka/KafkaChatHomeServiceTest.java | 50 ++- .../data/EventChatMessageReceivedToTest.java | 2 +- 18 files changed, 372 insertions(+), 561 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index d772fc9a..ec0b35f8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -36,7 +36,8 @@ public class ChatBackendProperties { private String clientIdPrefix = "DEV"; private String bootstrapServers = ":9092"; - private String chatRoomChannelTopic = "message_channel"; + private String infoChannelTopic = "info_channel"; + private String dataChannelTopic = "data_channel"; private int numPartitions = 2; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java index 33c522d1..e91b28c5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java @@ -19,4 +19,10 @@ public class ChatRoomInfo private final String name; @Getter private final Integer shard; + + + public ChatRoomInfo(UUID id, String name) + { + this(id, name, null); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java index 9ebc26b2..881dd293 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java @@ -46,7 +46,7 @@ public class ConsumerTaskExecutor } - interface WorkAssignor + public interface WorkAssignor { void assignWork(Consumer consumer); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java new file mode 100644 index 00000000..c8600039 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java @@ -0,0 +1,25 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + + +@RequiredArgsConstructor +@Slf4j +public class ConsumerTaskRunner +{ + private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; + private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; + + public void executeConsumerTasks() + { + infoChannelConsumerTaskExecutor.executeConsumerTask(); + dataChannelConsumerTaskExecutor.executeConsumerTask(); + } + + public void joinConsumerTasks() + { + dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); + infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index d94bc659..4d5a1412 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -4,8 +4,7 @@ import de.juplo.kafka.chat.backend.domain.*; import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -16,7 +15,6 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; @@ -25,7 +23,7 @@ import java.util.stream.IntStream; @Slf4j -public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener +public class DataChannel implements Runnable, ConsumerRebalanceListener { private final String topic; private final Producer producer; @@ -37,7 +35,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; - private final Map[] chatRoomInfo; private final Map[] chatRoomData; private boolean running; @@ -45,21 +42,21 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener private volatile boolean loadInProgress; - public ChatRoomChannel( + public DataChannel( String topic, Producer producer, - Consumer consumer, + Consumer dataChannelConsumer, ZoneId zoneId, int numShards, int bufferSize, Clock clock) { log.debug( - "Creating ChatRoomChannel for topic {} with {} partitions", + "Creating DataChannel for topic {} with {} partitions", topic, numShards); this.topic = topic; - this.consumer = consumer; + this.consumer = dataChannelConsumer; this.producer = producer; this.zoneId = zoneId; this.numShards = numShards; @@ -68,55 +65,17 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; - this.chatRoomInfo = new Map[numShards]; this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) .forEach(shard -> { - this.chatRoomInfo[shard] = new HashMap<>(); this.chatRoomData[shard] = new HashMap<>(); }); } - Mono sendCreateChatRoomRequest( - UUID chatRoomId, - String name) - { - CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - chatRoomId.toString(), - createChatRoomRequestTo); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); - ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); - createChatRoom(chatRoomInfo); - sink.success(chatRoomInfo); - } - else - { - // On send-failure - log.error( - "Could not send create-request for chat room (id={}, name={}): {}", - chatRoomId, - name, - exception); - sink.error(exception); - } - })); - }); - } - Mono sendChatMessage( UUID chatRoomId, Message.MessageKey key, @@ -211,12 +170,12 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); log.info("Fetched {} messages", records.count()); if (loadInProgress) { - loadChatRoom(records); + loadChatRoomData(records); if (isLoadingCompleted()) { @@ -244,7 +203,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void loadChatRoomData(ConsumerRecords records) { for (ConsumerRecord record : records) { @@ -252,13 +211,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener switch (record.value().getType()) { - case COMMAND_CREATE_CHATROOM: - createChatRoom( - chatRoomId, - (CommandCreateChatRoomTo) record.value(), - record.partition()); - break; - case EVENT_CHATMESSAGE_RECEIVED: Instant instant = Instant.ofEpochSecond(record.timestamp()); LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); @@ -282,42 +234,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } } - private void createChatRoom( - UUID chatRoomId, - CommandCreateChatRoomTo createChatRoomRequestTo, - Integer partition) - { - log.info( - "Loading ChatRoom {} for shard {} with buffer-size {}", - chatRoomId, - partition, - bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - ChatRoomData chatRoomData = new ChatRoomData( - clock, - service, - bufferSize); - putChatRoom( - chatRoomId, - createChatRoomRequestTo.getName(), - partition, - chatRoomData); - } - - - private void createChatRoom(ChatRoomInfo chatRoomInfo) - { - UUID id = chatRoomInfo.getId(); - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, id); - ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); - putChatRoom( - chatRoomInfo.getId(), - chatRoomInfo.getName(), - chatRoomInfo.getShard(), - chatRoomData); - } - private void loadChatMessage( UUID chatRoomId, LocalDateTime timestamp, @@ -328,7 +244,14 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); + ChatRoomData chatRoomData = this.chatRoomData[partition].computeIfAbsent( + chatRoomId, + (id) -> + { + log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, id); + return new ChatRoomData(clock, service, bufferSize); + }); KafkaChatMessageService kafkaChatRoomService = (KafkaChatMessageService) chatRoomData.getChatRoomService(); @@ -353,33 +276,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } - private void putChatRoom( - UUID chatRoomId, - String name, - Integer partition, - ChatRoomData chatRoomData) - { - if (this.chatRoomInfo[partition].containsKey(chatRoomId)) - { - log.warn( - "Ignoring existing chat-room for {}: {}", - partition, - chatRoomId); - } - else - { - log.info( - "Adding new chat-room to partition {}: {}", - partition, - chatRoomData); - - this.chatRoomInfo[partition].put( - chatRoomId, - new ChatRoomInfo(chatRoomId, name, partition)); - this.chatRoomData[partition].put(chatRoomId, chatRoomData); - } - } - int[] getOwnedShards() { return IntStream @@ -402,27 +298,4 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener return Mono.justOrEmpty(chatRoomData[shard].get(id)); } - - Flux getChatRoomInfo() - { - return Flux - .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) - .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); - } - - Mono getChatRoomInfo(int shard, UUID id) - { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } - - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); - } - - return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); - } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index d94bc659..ad03f0df 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -1,106 +1,90 @@ package de.juplo.kafka.chat.backend.implementation.kafka; -import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; -import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; -import lombok.Getter; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; -import java.util.*; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.stream.IntStream; @Slf4j -public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener +public class InfoChannel implements Runnable { private final String topic; private final Producer producer; private final Consumer consumer; - private final ZoneId zoneId; private final int numShards; - private final int bufferSize; - private final Clock clock; - private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; - private final Map[] chatRoomInfo; - private final Map[] chatRoomData; + private final Map chatRoomInfo; private boolean running; - @Getter - private volatile boolean loadInProgress; - public ChatRoomChannel( + public InfoChannel( String topic, Producer producer, - Consumer consumer, - ZoneId zoneId, - int numShards, - int bufferSize, - Clock clock) + Consumer infoChannelConsumer) { log.debug( - "Creating ChatRoomChannel for topic {} with {} partitions", - topic, - numShards); + "Creating InfoChannel for topic {}", + topic); this.topic = topic; - this.consumer = consumer; + this.consumer = infoChannelConsumer; this.producer = producer; - this.zoneId = zoneId; - this.numShards = numShards; - this.bufferSize = bufferSize; - this.clock = clock; - this.isShardOwned = new boolean[numShards]; + this.chatRoomInfo = new HashMap<>(); + + this.numShards = consumer + .partitionsFor(topic) + .size(); this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; - this.chatRoomInfo = new Map[numShards]; - this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) - .forEach(shard -> - { - this.chatRoomInfo[shard] = new HashMap<>(); - this.chatRoomData[shard] = new HashMap<>(); - }); + .forEach(partition -> this.nextOffset[partition] = -1l); } + boolean loadInProgress() + { + return IntStream + .range(0, numShards) + .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]); + } - Mono sendCreateChatRoomRequest( + Mono sendChatRoomCreatedEvent( UUID chatRoomId, - String name) + String name, + int shard) { - CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); + EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard); return Mono.create(sink -> { ProducerRecord record = new ProducerRecord<>( topic, - chatRoomId.toString(), - createChatRoomRequestTo); + Integer.toString(shard), + to); producer.send(record, ((metadata, exception) -> { if (metadata != null) { - log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); + log.info("Successfully sent chreate-request for chat room: {}", to); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); - createChatRoom(chatRoomInfo); sink.success(chatRoomInfo); } else @@ -117,122 +101,28 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener }); } - Mono sendChatMessage( - UUID chatRoomId, - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - null, - zdt.toEpochSecond(), - chatRoomId.toString(), - EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - // On successful send - Message message = new Message(key, metadata.offset(), timestamp, text); - log.info("Successfully send message {}", message); - sink.success(message); - } - else - { - // On send-failure - log.error( - "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", - chatRoomId, - key, - timestamp, - text, - exception); - sink.error(exception); - } - })); - }); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - log.info("Newly assigned partitions! Pausing normal operations..."); - loadInProgress = true; - - consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = true; - this.currentOffset[partition] = currentOffset; - - log.info( - "Partition assigned: {} - loading messages: next={} -> current={}", - partition, - nextOffset[partition], - currentOffset); - - consumer.seek(topicPartition, nextOffset[partition]); - }); - - consumer.resume(partitions); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(topicPartition -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = false; - log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); - }); - } - - @Override - public void onPartitionsLost(Collection partitions) - { - log.warn("Lost partitions: {}, partitions"); - // TODO: Muss auf den Verlust anders reagiert werden? - onPartitionsRevoked(partitions); - } @Override public void run() { running = true; + consumer + .endOffsets(consumer.assignment()) + .entrySet() + .stream() + .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue()); + IntStream + .range(0, numShards) + .forEach(partition -> this.nextOffset[partition] = 0l); + while (running) { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); - log.info("Fetched {} messages", records.count()); - - if (loadInProgress) - { - loadChatRoom(records); - - if (isLoadingCompleted()) - { - log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); - loadInProgress = false; - } - } - else - { - if (!records.isEmpty()) - { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); - } - } + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + log.debug("Fetched {} messages", records.count()); + handleMessages(records); } catch (WakeupException e) { @@ -244,36 +134,22 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void handleMessages(ConsumerRecords records) { for (ConsumerRecord record : records) { - UUID chatRoomId = UUID.fromString(record.key()); - switch (record.value().getType()) { - case COMMAND_CREATE_CHATROOM: - createChatRoom( - chatRoomId, - (CommandCreateChatRoomTo) record.value(), - record.partition()); - break; - - case EVENT_CHATMESSAGE_RECEIVED: - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); - loadChatMessage( - chatRoomId, - timestamp, - record.offset(), - (EventChatMessageReceivedTo) record.value(), - record.partition()); + case EVENT_CHATROOM_CREATED: + EventChatRoomCreated eventChatRoomCreated = + (EventChatRoomCreated) record.value(); + createChatRoom(eventChatRoomCreated.toChatRoomInfo()); break; default: log.debug( - "Ignoring message for chat-room {} with offset {}: {}", - chatRoomId, + "Ignoring message for key={} with offset={}: {}", + record.key(), record.offset(), record.value()); } @@ -282,84 +158,12 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } } - private void createChatRoom( - UUID chatRoomId, - CommandCreateChatRoomTo createChatRoomRequestTo, - Integer partition) - { - log.info( - "Loading ChatRoom {} for shard {} with buffer-size {}", - chatRoomId, - partition, - bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - ChatRoomData chatRoomData = new ChatRoomData( - clock, - service, - bufferSize); - putChatRoom( - chatRoomId, - createChatRoomRequestTo.getName(), - partition, - chatRoomData); - } - - private void createChatRoom(ChatRoomInfo chatRoomInfo) { - UUID id = chatRoomInfo.getId(); - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, id); - ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); - putChatRoom( - chatRoomInfo.getId(), - chatRoomInfo.getName(), - chatRoomInfo.getShard(), - chatRoomData); - } - - private void loadChatMessage( - UUID chatRoomId, - LocalDateTime timestamp, - long offset, - EventChatMessageReceivedTo chatMessageTo, - int partition) - { - Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); - Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - - ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); - KafkaChatMessageService kafkaChatRoomService = - (KafkaChatMessageService) chatRoomData.getChatRoomService(); - - kafkaChatRoomService.persistMessage(message); - } - - private boolean isLoadingCompleted() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); - } - - private void pauseAllOwnedPartions() - { - consumer.pause(IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> new TopicPartition(topic, shard)) - .toList()); - } + UUID chatRoomId = chatRoomInfo.getId(); + Integer partition = chatRoomInfo.getShard(); - - private void putChatRoom( - UUID chatRoomId, - String name, - Integer partition, - ChatRoomData chatRoomData) - { - if (this.chatRoomInfo[partition].containsKey(chatRoomId)) + if (this.chatRoomInfo.containsKey(chatRoomId)) { log.warn( "Ignoring existing chat-room for {}: {}", @@ -369,60 +173,21 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener else { log.info( - "Adding new chat-room to partition {}: {}", + "Adding new chat-room for partition {}: {}", partition, - chatRoomData); - - this.chatRoomInfo[partition].put( - chatRoomId, - new ChatRoomInfo(chatRoomId, name, partition)); - this.chatRoomData[partition].put(chatRoomId, chatRoomData); - } - } - - int[] getOwnedShards() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .toArray(); - } - - Mono getChatRoomData(int shard, UUID id) - { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } + chatRoomId); - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); + this.chatRoomInfo.put(chatRoomId, chatRoomInfo); } - - return Mono.justOrEmpty(chatRoomData[shard].get(id)); } Flux getChatRoomInfo() { - return Flux - .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) - .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); + return Flux.fromIterable(chatRoomInfo.values()); } - Mono getChatRoomInfo(int shard, UUID id) + Mono getChatRoomInfo(UUID id) { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } - - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); - } - - return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); + return Mono.fromSupplier(() -> chatRoomInfo.get(id)); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java index 5019ed20..9832519d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java @@ -18,45 +18,47 @@ import java.util.*; public class KafkaChatHomeService implements ChatHomeService { private final int numPartitions; - private final ChatRoomChannel chatRoomChannel; + private final InfoChannel infoChannel; + private final DataChannel dataChannel; @Override public Mono createChatRoom(UUID id, String name) { - log.info("Sending create-command for chat rooom: id={}, name={}"); - return chatRoomChannel.sendCreateChatRoomRequest(id, name); + int shard = selectShard(id); + log.info( + "Sending create-command for chat rooom: id={}, name={}, shard={}", + id, + name, + shard); + return infoChannel.sendChatRoomCreatedEvent(id, name, shard); } @Override public Mono getChatRoomInfo(UUID id) { - int shard = selectShard(id); - return chatRoomChannel - .getChatRoomInfo(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( - id, - shard, - chatRoomChannel.getOwnedShards()))); + return infoChannel + .getChatRoomInfo(id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } @Override public Flux getChatRoomInfo() { - return chatRoomChannel.getChatRoomInfo(); + return infoChannel.getChatRoomInfo(); } @Override public Mono getChatRoomData(UUID id) { int shard = selectShard(id); - return chatRoomChannel + return dataChannel .getChatRoomData(shard, id) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( id, shard, - chatRoomChannel.getOwnedShards()))); + dataChannel.getOwnedShards()))); } int selectShard(UUID chatRoomId) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java index df9ee733..8ab50f1f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java @@ -15,7 +15,7 @@ import java.util.UUID; @Slf4j public class KafkaChatMessageService implements ChatMessageService { - private final ChatRoomChannel chatRoomChannel; + private final DataChannel dataChannel; private final UUID chatRoomId; private final LinkedHashMap messages = new LinkedHashMap<>(); @@ -27,7 +27,7 @@ public class KafkaChatMessageService implements ChatMessageService LocalDateTime timestamp, String text) { - return chatRoomChannel + return dataChannel .sendChatMessage(chatRoomId, key, timestamp, text) .doOnSuccess(message -> persistMessage(message)); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index da7ee75d..722508bd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka; +import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; @@ -17,12 +18,18 @@ import org.springframework.stereotype.Component; @Slf4j public class KafkaServicesApplicationRunner implements ApplicationRunner { - private final ConsumerTaskExecutor chatRoomChannelTaskExecutor; + private final ConsumerTaskRunner consumerTaskRunner; @Override public void run(ApplicationArguments args) throws Exception { - chatRoomChannelTaskExecutor.executeConsumerTask(); + consumerTaskRunner.executeConsumerTasks(); + } + + @PreDestroy + public void joinConsumerTasks() + { + consumerTaskRunner.joinConsumerTasks(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index cda0b94f..5bde07cd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -3,14 +3,15 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -36,54 +37,109 @@ import java.util.Properties; public class KafkaServicesConfiguration { @Bean - ConsumerTaskExecutor chatRoomChannelTaskExecutor( + ConsumerTaskRunner consumerTaskRunner( + ConsumerTaskExecutor infoChannelConsumerTaskExecutor, + ConsumerTaskExecutor dataChannelConsumerTaskExecutor) + { + return new ConsumerTaskRunner( + infoChannelConsumerTaskExecutor, + dataChannelConsumerTaskExecutor); + } + + @Bean + ConsumerTaskExecutor infoChannelConsumerTaskExecutor( ThreadPoolTaskExecutor taskExecutor, - ChatRoomChannel chatRoomChannel, - Consumer chatRoomChannelConsumer, - ConsumerTaskExecutor.WorkAssignor workAssignor) + InfoChannel infoChannel, + Consumer infoChannelConsumer, + ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor) { return new ConsumerTaskExecutor( taskExecutor, - chatRoomChannel, - chatRoomChannelConsumer, - workAssignor); + infoChannel, + infoChannelConsumer, + infoChannelWorkAssignor); + } + + @Bean + ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor( + ChatBackendProperties properties) + { + return consumer -> + { + String topic = properties.getKafka().getInfoChannelTopic(); + List partitions = consumer + .partitionsFor(topic) + .stream() + .map(partitionInfo -> + new TopicPartition(topic, partitionInfo.partition())) + .toList(); + consumer.assign(partitions); + }; } @Bean - ConsumerTaskExecutor.WorkAssignor workAssignor( + ConsumerTaskExecutor dataChannelConsumerTaskExecutor( + ThreadPoolTaskExecutor taskExecutor, + DataChannel dataChannel, + Consumer dataChannelConsumer, + ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor) + { + return new ConsumerTaskExecutor( + taskExecutor, + dataChannel, + dataChannelConsumer, + dataChannelWorkAssignor); + } + + @Bean + ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor( ChatBackendProperties properties, - ChatRoomChannel chatRoomChannel) + DataChannel dataChannel) { return consumer -> { List topics = - List.of(properties.getKafka().getChatRoomChannelTopic()); - consumer.subscribe(topics, chatRoomChannel); + List.of(properties.getKafka().getDataChannelTopic()); + consumer.subscribe(topics, dataChannel); }; } @Bean ChatHomeService kafkaChatHome( ChatBackendProperties properties, - ChatRoomChannel chatRoomChannel) + InfoChannel infoChannel, + DataChannel dataChannel) { return new KafkaChatHomeService( properties.getKafka().getNumPartitions(), - chatRoomChannel); + infoChannel, + dataChannel); } @Bean - ChatRoomChannel chatRoomChannel( + InfoChannel infoChannel( ChatBackendProperties properties, - Producer chatRoomChannelProducer, - Consumer chatRoomChannelConsumer, + Producer producer, + Consumer infoChannelConsumer) + { + return new InfoChannel( + properties.getKafka().getInfoChannelTopic(), + producer, + infoChannelConsumer); + } + + @Bean + DataChannel dataChannel( + ChatBackendProperties properties, + Producer producer, + Consumer dataChannelConsumer, ZoneId zoneId, Clock clock) { - return new ChatRoomChannel( - properties.getKafka().getChatRoomChannelTopic(), - chatRoomChannelProducer, - chatRoomChannelConsumer, + return new DataChannel( + properties.getKafka().getDataChannelTopic(), + producer, + dataChannelConsumer, zoneId, properties.getKafka().getNumPartitions(), properties.getChatroomBufferSize(), @@ -91,7 +147,7 @@ public class KafkaServicesConfiguration } @Bean - Producer chatRoomChannelProducer( + Producer producer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, @@ -101,7 +157,7 @@ public class KafkaServicesConfiguration defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER"); return new KafkaProducer<>( properties, stringSerializer, @@ -126,7 +182,28 @@ public class KafkaServicesConfiguration } @Bean - Consumer chatRoomChannelConsumer( + Consumer infoChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + StringDeserializer stringDeserializer, + JsonDeserializer messageDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "info_channel"); + return new KafkaConsumer<>( + properties, + stringDeserializer, + messageDeserializer); + } + + @Bean + Consumer dataChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, @@ -136,10 +213,10 @@ public class KafkaServicesConfiguration defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, - "chatroom_channel"); + "data_channel"); return new KafkaConsumer<>( properties, stringDeserializer, @@ -168,7 +245,7 @@ public class KafkaServicesConfiguration String typeMappings () { return - "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + + "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," + "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java index 6f61592c..1aef3fd1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java @@ -11,6 +11,7 @@ public class AbstractMessageTo public enum ToType { COMMAND_CREATE_CHATROOM, EVENT_CHATMESSAGE_RECEIVED, + EVENT_CHATROOM_CREATED, } @Getter diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java index 17d3a397..d4f6508d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java @@ -1,5 +1,6 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages; +package de.juplo.kafka.chat.backend.implementation.kafka.messages.data; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import lombok.*; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java index 0cf42320..ae5a5019 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java @@ -1,2 +1,45 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;public class EventChatRoomCreated { +package de.juplo.kafka.chat.backend.implementation.kafka.messages.info; + +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.util.UUID; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class EventChatRoomCreated extends AbstractMessageTo +{ + private String id; + private String name; + private Integer shard; + + + public EventChatRoomCreated() + { + super(ToType.EVENT_CHATROOM_CREATED); + } + + + public ChatRoomInfo toChatRoomInfo() + { + return new ChatRoomInfo(UUID.fromString(id), name, shard); + } + + public static EventChatRoomCreated of(UUID id, String name, Integer shard) + { + EventChatRoomCreated event = new EventChatRoomCreated(); + + event.setId(id.toString()); + event.setName(name); + event.setShard(shard); + + return event; + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java index 8ea9cc23..853ee1cf 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java @@ -11,20 +11,18 @@ import org.springframework.data.mongodb.core.mapping.Document; @Getter(AccessLevel.PACKAGE) @Setter(AccessLevel.PACKAGE) @EqualsAndHashCode(of = { "id" }) -@ToString(of = { "id", "shard", "name" }) +@ToString(of = { "id", "name" }) @Document public class ChatRoomTo { @Id private String id; - private Integer shard; private String name; public static ChatRoomTo from(ChatRoomInfo chatRoomInfo) { return new ChatRoomTo( chatRoomInfo.getId().toString(), - chatRoomInfo.getShard(), chatRoomInfo.getName()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index c87036c9..3eb90960 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -36,18 +36,7 @@ public class MongoDbStorageStrategy implements StorageStrategy .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); - int shard = shardingStrategy.selectShard(chatRoomId); - - log.info( - "{} - old shard: {}, new shard: {}", - chatRoomId, - chatRoomTo.getShard(), - shard); - - return new ChatRoomInfo( - chatRoomId, - chatRoomTo.getName(), - shard); + return new ChatRoomInfo(chatRoomId, chatRoomTo.getName()); }); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index f257d5e9..8fbc557c 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -1,44 +1,48 @@ package de.juplo.kafka.chat.backend; -import de.juplo.kafka.chat.backend.implementation.kafka.ChatRoomChannel; -import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesApplicationRunner; +import de.juplo.kafka.chat.backend.implementation.kafka.*; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Import; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.List; -import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC; +import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC; +import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC; @SpringBootTest( webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { + "spring.main.allow-bean-definition-overriding=true", "chat.backend.services=kafka", "chat.backend.kafka.client-id-PREFIX=TEST", "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "chat.backend.kafka.chatroom-channel-topic=" + TOPIC, + "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC, + "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC, "chat.backend.kafka.num-partitions=10", }) -@EmbeddedKafka(topics = { TOPIC }, partitions = 10) +@EmbeddedKafka( + topics = { INFO_TOPIC, DATA_TOPIC }, + partitions = 10) @Slf4j class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT { - final static String TOPIC = "KAFKA_CONFIGURATION_IT"; - - static CompletableFuture CONSUMER_JOB; + final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL"; + final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL"; @MockBean KafkaServicesApplicationRunner kafkaServicesApplicationRunner; @@ -46,31 +50,25 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @BeforeAll public static void sendAndLoadStoredData( @Autowired KafkaTemplate messageTemplate, - @Autowired Consumer chatRoomChannelConsumer, - @Autowired ThreadPoolTaskExecutor taskExecutor, - @Autowired ChatRoomChannel chatRoomChannel) + @Autowired ConsumerTaskRunner consumerTaskRunner) { - send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); + send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - List assignedPartitions = List.of(new TopicPartition(TOPIC, 2)); - chatRoomChannelConsumer.assign(assignedPartitions); - chatRoomChannel.onPartitionsAssigned(assignedPartitions); - CONSUMER_JOB = taskExecutor - .submitCompletable(chatRoomChannel) - .exceptionally(e -> - { - log.error("The consumer for the ChatRoomChannel exited abnormally!", e); - return null; - }); + consumerTaskRunner.executeConsumerTasks(); } - static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) + static void send( + KafkaTemplate kafkaTemplate, + String topic, + String key, + String value, + String typeId) { - ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); + ProducerRecord record = new ProducerRecord<>(topic, key, value); record.headers().add("__TypeId__", typeId.getBytes()); SendResult result = kafkaTemplate.send(record).join(); log.info( @@ -79,14 +77,29 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT value, new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); } - @AfterAll - static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer) + static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner) + { + consumerTaskRunner.joinConsumerTasks(); + } + + + @TestConfiguration + @EnableConfigurationProperties(ChatBackendProperties.class) + @Import(KafkaServicesConfiguration.class) + static class KafkaConfigurationITConfiguration { - log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - chatRoomChannelConsumer.wakeup(); - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - CONSUMER_JOB.join(); - log.info("Joined the consumer of the ChatRoomChannel"); + @Bean + ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor( + DataChannel dataChannel) + { + return consumer -> + { + List assignedPartitions = + List.of(new TopicPartition(DATA_TOPIC, 2)); + consumer.assign(assignedPartitions); + dataChannel.onPartitionsAssigned(assignedPartitions); + }; + } } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 4aa362d7..23c1751a 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -23,7 +23,8 @@ import java.time.Clock; import java.util.List; import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS; -import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.TOPIC; +import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC; +import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.INFO_TOPIC; @SpringBootTest( @@ -38,14 +39,18 @@ import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServ "chat.backend.kafka.client-id-PREFIX=TEST", "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "chat.backend.kafka.chatroom-channel-topic=" + TOPIC, + "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC, + "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC, "chat.backend.kafka.num-partitions=" + NUM_SHARDS, }) -@EmbeddedKafka(topics = { TOPIC }, partitions = 10) +@EmbeddedKafka( + topics = { INFO_TOPIC, DATA_TOPIC }, + partitions = 10) @Slf4j public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest { - final static String TOPIC = "KAFKA_CHAT_HOME_TEST"; + final static String INFO_TOPIC = "KAFKA_CHAT_HOME_TEST_INFO"; + final static String DATA_TOPIC = "KAFKA_CHAT_HOME_TEST_DATA"; @TestConfiguration @@ -54,15 +59,15 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest static class KafkaChatHomeTestConfiguration { @Bean - ConsumerTaskExecutor.WorkAssignor workAssignor( - ChatRoomChannel chatRoomChannel) + ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor( + DataChannel dataChannel) { return consumer -> { List assignedPartitions = - List.of(new TopicPartition(TOPIC, 2)); + List.of(new TopicPartition(DATA_TOPIC, 2)); consumer.assign(assignedPartitions); - chatRoomChannel.onPartitionsAssigned(assignedPartitions); + dataChannel.onPartitionsAssigned(assignedPartitions); }; } @@ -76,21 +81,26 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @BeforeAll public static void sendAndLoadStoredData( - @Autowired ConsumerTaskExecutor consumerTaskExecutor, - @Autowired KafkaTemplate messageTemplate) + @Autowired ConsumerTaskRunner consumerTaskRunner, + @Autowired KafkaTemplate messageTemplate) throws InterruptedException { - send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); + send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - consumerTaskExecutor.executeConsumerTask(); + consumerTaskRunner.executeConsumerTasks(); } - static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) + static void send( + KafkaTemplate kafkaTemplate, + String topic, + String key, + String value, + String typeId) { - ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); + ProducerRecord record = new ProducerRecord<>(topic, key, value); record.headers().add("__TypeId__", typeId.getBytes()); SendResult result = kafkaTemplate.send(record).join(); log.info( @@ -101,8 +111,8 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest } @AfterAll - static void joinConsumerJob(@Autowired ConsumerTaskExecutor consumerTaskExecutor) + static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner) { - consumerTaskExecutor.joinConsumerTaskJob(); + consumerTaskRunner.joinConsumerTasks(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java index d9d5a08e..72ade064 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages; +package de.juplo.kafka.chat.backend.implementation.kafka.messages.data; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; -- 2.20.1