From e39d138cc89ee82676fc0a935e177c15ca71a473 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 12 Sep 2023 23:33:59 +0200 Subject: [PATCH] WIP:ALIGN --- .../chat/backend/ChatBackendProperties.java | 3 +- .../chat/backend/domain/ChatRoomInfo.java | 6 + .../implementation/kafka/DataChannel.java | 123 +----- .../implementation/kafka/InfoChannel.java | 362 ++++-------------- .../kafka/KafkaChatHomeService.java | 28 +- .../kafka/KafkaChatMessageService.java | 4 +- .../kafka/KafkaServicesApplicationRunner.java | 64 +++- .../kafka/KafkaServicesConfiguration.java | 101 +++-- .../kafka/messages/AbstractMessageTo.java | 1 + .../data/EventChatMessageReceivedTo.java | 3 +- .../messages/info/EventChatRoomCreated.java | 45 ++- .../backend/storage/mongodb/ChatRoomTo.java | 4 +- .../mongodb/MongoDbStorageStrategy.java | 13 +- .../chat/backend/KafkaConfigurationIT.java | 27 +- .../kafka/KafkaChatHomeServiceTest.java | 49 ++- .../data/EventChatMessageReceivedToTest.java | 2 +- 16 files changed, 347 insertions(+), 488 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 9c80f5d6..381c6c68 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -36,7 +36,8 @@ public class ChatBackendProperties { private String clientIdPrefix; private String bootstrapServers = ":9092"; - private String chatRoomChannelTopic = "message_channel"; + private String infoChannelTopic = "info_channel"; + private String dataChannelTopic = "data_channel"; private int numPartitions = 2; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java index 33c522d1..e91b28c5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java @@ -19,4 +19,10 @@ public class ChatRoomInfo private final String name; @Getter private final Integer shard; + + + public ChatRoomInfo(UUID id, String name) + { + this(id, name, null); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index d94bc659..f20f7c23 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -4,8 +4,7 @@ import de.juplo.kafka.chat.backend.domain.*; import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -16,7 +15,6 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; @@ -25,7 +23,7 @@ import java.util.stream.IntStream; @Slf4j -public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener +public class DataChannel implements Runnable, ConsumerRebalanceListener { private final String topic; private final Producer producer; @@ -37,7 +35,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; - private final Map[] chatRoomInfo; private final Map[] chatRoomData; private boolean running; @@ -45,21 +42,21 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener private volatile boolean loadInProgress; - public ChatRoomChannel( + public DataChannel( String topic, Producer producer, - Consumer consumer, + Consumer dataChannelConsumer, ZoneId zoneId, int numShards, int bufferSize, Clock clock) { log.debug( - "Creating ChatRoomChannel for topic {} with {} partitions", + "Creating DataChannel for topic {} with {} partitions", topic, numShards); this.topic = topic; - this.consumer = consumer; + this.consumer = dataChannelConsumer; this.producer = producer; this.zoneId = zoneId; this.numShards = numShards; @@ -68,55 +65,17 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; - this.chatRoomInfo = new Map[numShards]; this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) .forEach(shard -> { - this.chatRoomInfo[shard] = new HashMap<>(); this.chatRoomData[shard] = new HashMap<>(); }); } - Mono sendCreateChatRoomRequest( - UUID chatRoomId, - String name) - { - CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - chatRoomId.toString(), - createChatRoomRequestTo); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); - ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); - createChatRoom(chatRoomInfo); - sink.success(chatRoomInfo); - } - else - { - // On send-failure - log.error( - "Could not send create-request for chat room (id={}, name={}): {}", - chatRoomId, - name, - exception); - sink.error(exception); - } - })); - }); - } - Mono sendChatMessage( UUID chatRoomId, Message.MessageKey key, @@ -211,12 +170,12 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); log.info("Fetched {} messages", records.count()); if (loadInProgress) { - loadChatRoom(records); + loadChatRoomData(records); if (isLoadingCompleted()) { @@ -244,7 +203,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void loadChatRoomData(ConsumerRecords records) { for (ConsumerRecord record : records) { @@ -252,13 +211,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener switch (record.value().getType()) { - case COMMAND_CREATE_CHATROOM: - createChatRoom( - chatRoomId, - (CommandCreateChatRoomTo) record.value(), - record.partition()); - break; - case EVENT_CHATMESSAGE_RECEIVED: Instant instant = Instant.ofEpochSecond(record.timestamp()); LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); @@ -282,31 +234,14 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } } - private void createChatRoom( - UUID chatRoomId, - CommandCreateChatRoomTo createChatRoomRequestTo, - Integer partition) + void createChatRoom(ChatRoomInfo chatRoomInfo) { - log.info( - "Loading ChatRoom {} for shard {} with buffer-size {}", - chatRoomId, - partition, - bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - ChatRoomData chatRoomData = new ChatRoomData( - clock, - service, - bufferSize); - putChatRoom( - chatRoomId, - createChatRoomRequestTo.getName(), - partition, - chatRoomData); - } - + if (!isShardOwned[chatRoomInfo.getShard()]) + { + log.debug("Ignoring not owned chat-room {}", chatRoomInfo); + return; + } - private void createChatRoom(ChatRoomInfo chatRoomInfo) - { UUID id = chatRoomInfo.getId(); log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); KafkaChatMessageService service = new KafkaChatMessageService(this, id); @@ -359,7 +294,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener Integer partition, ChatRoomData chatRoomData) { - if (this.chatRoomInfo[partition].containsKey(chatRoomId)) + if (this.chatRoomData[partition].containsKey(chatRoomId)) { log.warn( "Ignoring existing chat-room for {}: {}", @@ -373,9 +308,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener partition, chatRoomData); - this.chatRoomInfo[partition].put( - chatRoomId, - new ChatRoomInfo(chatRoomId, name, partition)); this.chatRoomData[partition].put(chatRoomId, chatRoomData); } } @@ -402,27 +334,4 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener return Mono.justOrEmpty(chatRoomData[shard].get(id)); } - - Flux getChatRoomInfo() - { - return Flux - .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) - .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); - } - - Mono getChatRoomInfo(int shard, UUID id) - { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } - - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); - } - - return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); - } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index d94bc659..c7cc6ad5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -1,106 +1,94 @@ package de.juplo.kafka.chat.backend.implementation.kafka; -import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; -import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; -import lombok.Getter; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; -import java.util.*; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.stream.IntStream; @Slf4j -public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener +public class InfoChannel implements Runnable { private final String topic; private final Producer producer; private final Consumer consumer; - private final ZoneId zoneId; private final int numShards; - private final int bufferSize; - private final Clock clock; - private final boolean[] isShardOwned; - private final long[] currentOffset; + private final long[] startOffset; private final long[] nextOffset; - private final Map[] chatRoomInfo; - private final Map[] chatRoomData; + private final Map chatRoomInfo; + private final DataChannel dataChannel; private boolean running; - @Getter - private volatile boolean loadInProgress; - public ChatRoomChannel( + public InfoChannel( String topic, Producer producer, - Consumer consumer, - ZoneId zoneId, - int numShards, - int bufferSize, - Clock clock) + Consumer infoChannelConsumer, + DataChannel dataChannel) { log.debug( - "Creating ChatRoomChannel for topic {} with {} partitions", - topic, - numShards); + "Creating InfoChannel for topic {}", + topic); this.topic = topic; - this.consumer = consumer; + this.consumer = infoChannelConsumer; this.producer = producer; - this.zoneId = zoneId; - this.numShards = numShards; - this.bufferSize = bufferSize; - this.clock = clock; - this.isShardOwned = new boolean[numShards]; - this.currentOffset = new long[numShards]; + this.chatRoomInfo = new HashMap<>(); + + this.numShards = consumer + .partitionsFor(topic) + .size(); + this.startOffset = new long[numShards]; this.nextOffset = new long[numShards]; - this.chatRoomInfo = new Map[numShards]; - this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) - .forEach(shard -> - { - this.chatRoomInfo[shard] = new HashMap<>(); - this.chatRoomData[shard] = new HashMap<>(); - }); + .forEach(partition -> this.nextOffset[partition] = -1l); + + this.dataChannel = dataChannel; } + boolean loadInProgress() + { + return IntStream + .range(0, numShards) + .anyMatch(partition -> nextOffset[partition] < startOffset[partition]); + } - Mono sendCreateChatRoomRequest( + Mono sendChatRoomCreatedEvent( UUID chatRoomId, - String name) + String name, + int shard) { - CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); + EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard); return Mono.create(sink -> { ProducerRecord record = new ProducerRecord<>( topic, - chatRoomId.toString(), - createChatRoomRequestTo); + Integer.toString(shard), + to); producer.send(record, ((metadata, exception) -> { if (metadata != null) { - log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); + log.info("Successfully sent chreate-request for chat room: {}", to); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); - createChatRoom(chatRoomInfo); sink.success(chatRoomInfo); } else @@ -117,122 +105,28 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener }); } - Mono sendChatMessage( - UUID chatRoomId, - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - null, - zdt.toEpochSecond(), - chatRoomId.toString(), - EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - // On successful send - Message message = new Message(key, metadata.offset(), timestamp, text); - log.info("Successfully send message {}", message); - sink.success(message); - } - else - { - // On send-failure - log.error( - "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", - chatRoomId, - key, - timestamp, - text, - exception); - sink.error(exception); - } - })); - }); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - log.info("Newly assigned partitions! Pausing normal operations..."); - loadInProgress = true; - - consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = true; - this.currentOffset[partition] = currentOffset; - - log.info( - "Partition assigned: {} - loading messages: next={} -> current={}", - partition, - nextOffset[partition], - currentOffset); - - consumer.seek(topicPartition, nextOffset[partition]); - }); - - consumer.resume(partitions); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(topicPartition -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = false; - log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); - }); - } - - @Override - public void onPartitionsLost(Collection partitions) - { - log.warn("Lost partitions: {}, partitions"); - // TODO: Muss auf den Verlust anders reagiert werden? - onPartitionsRevoked(partitions); - } @Override public void run() { running = true; + consumer + .endOffsets(consumer.assignment()) + .entrySet() + .stream() + .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue()); + IntStream + .range(0, numShards) + .forEach(partition -> this.nextOffset[partition] = 0l); + while (running) { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); - log.info("Fetched {} messages", records.count()); - - if (loadInProgress) - { - loadChatRoom(records); - - if (isLoadingCompleted()) - { - log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); - loadInProgress = false; - } - } - else - { - if (!records.isEmpty()) - { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); - } - } + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + log.debug("Fetched {} messages", records.count()); + handleMessages(records); } catch (WakeupException e) { @@ -244,36 +138,22 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void handleMessages(ConsumerRecords records) { for (ConsumerRecord record : records) { - UUID chatRoomId = UUID.fromString(record.key()); - switch (record.value().getType()) { - case COMMAND_CREATE_CHATROOM: - createChatRoom( - chatRoomId, - (CommandCreateChatRoomTo) record.value(), - record.partition()); - break; - - case EVENT_CHATMESSAGE_RECEIVED: - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); - loadChatMessage( - chatRoomId, - timestamp, - record.offset(), - (EventChatMessageReceivedTo) record.value(), - record.partition()); + case EVENT_CHATROOM_CREATED: + EventChatRoomCreated eventChatRoomCreated = + (EventChatRoomCreated) record.value(); + createChatRoom(eventChatRoomCreated.toChatRoomInfo()); break; default: log.debug( - "Ignoring message for chat-room {} with offset {}: {}", - chatRoomId, + "Ignoring message for key={} with offset={}: {}", + record.key(), record.offset(), record.value()); } @@ -282,84 +162,12 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } } - private void createChatRoom( - UUID chatRoomId, - CommandCreateChatRoomTo createChatRoomRequestTo, - Integer partition) - { - log.info( - "Loading ChatRoom {} for shard {} with buffer-size {}", - chatRoomId, - partition, - bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - ChatRoomData chatRoomData = new ChatRoomData( - clock, - service, - bufferSize); - putChatRoom( - chatRoomId, - createChatRoomRequestTo.getName(), - partition, - chatRoomData); - } - - private void createChatRoom(ChatRoomInfo chatRoomInfo) { - UUID id = chatRoomInfo.getId(); - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, id); - ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); - putChatRoom( - chatRoomInfo.getId(), - chatRoomInfo.getName(), - chatRoomInfo.getShard(), - chatRoomData); - } - - private void loadChatMessage( - UUID chatRoomId, - LocalDateTime timestamp, - long offset, - EventChatMessageReceivedTo chatMessageTo, - int partition) - { - Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); - Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - - ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); - KafkaChatMessageService kafkaChatRoomService = - (KafkaChatMessageService) chatRoomData.getChatRoomService(); - - kafkaChatRoomService.persistMessage(message); - } - - private boolean isLoadingCompleted() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); - } - - private void pauseAllOwnedPartions() - { - consumer.pause(IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> new TopicPartition(topic, shard)) - .toList()); - } + UUID chatRoomId = chatRoomInfo.getId(); + Integer partition = chatRoomInfo.getShard(); - - private void putChatRoom( - UUID chatRoomId, - String name, - Integer partition, - ChatRoomData chatRoomData) - { - if (this.chatRoomInfo[partition].containsKey(chatRoomId)) + if (this.chatRoomInfo.containsKey(chatRoomId)) { log.warn( "Ignoring existing chat-room for {}: {}", @@ -369,60 +177,22 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener else { log.info( - "Adding new chat-room to partition {}: {}", + "Adding new chat-room for partition {}: {}", partition, - chatRoomData); - - this.chatRoomInfo[partition].put( - chatRoomId, - new ChatRoomInfo(chatRoomId, name, partition)); - this.chatRoomData[partition].put(chatRoomId, chatRoomData); - } - } - - int[] getOwnedShards() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .toArray(); - } - - Mono getChatRoomData(int shard, UUID id) - { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } + chatRoomId); - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); + this.chatRoomInfo.put(chatRoomId, chatRoomInfo); + this.dataChannel.createChatRoom(chatRoomInfo); } - - return Mono.justOrEmpty(chatRoomData[shard].get(id)); } Flux getChatRoomInfo() { - return Flux - .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) - .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); + return Flux.fromIterable(chatRoomInfo.values()); } - Mono getChatRoomInfo(int shard, UUID id) + Mono getChatRoomInfo(UUID id) { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } - - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); - } - - return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); + return Mono.fromSupplier(() -> chatRoomInfo.get(id)); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java index 5019ed20..9832519d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java @@ -18,45 +18,47 @@ import java.util.*; public class KafkaChatHomeService implements ChatHomeService { private final int numPartitions; - private final ChatRoomChannel chatRoomChannel; + private final InfoChannel infoChannel; + private final DataChannel dataChannel; @Override public Mono createChatRoom(UUID id, String name) { - log.info("Sending create-command for chat rooom: id={}, name={}"); - return chatRoomChannel.sendCreateChatRoomRequest(id, name); + int shard = selectShard(id); + log.info( + "Sending create-command for chat rooom: id={}, name={}, shard={}", + id, + name, + shard); + return infoChannel.sendChatRoomCreatedEvent(id, name, shard); } @Override public Mono getChatRoomInfo(UUID id) { - int shard = selectShard(id); - return chatRoomChannel - .getChatRoomInfo(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( - id, - shard, - chatRoomChannel.getOwnedShards()))); + return infoChannel + .getChatRoomInfo(id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } @Override public Flux getChatRoomInfo() { - return chatRoomChannel.getChatRoomInfo(); + return infoChannel.getChatRoomInfo(); } @Override public Mono getChatRoomData(UUID id) { int shard = selectShard(id); - return chatRoomChannel + return dataChannel .getChatRoomData(shard, id) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( id, shard, - chatRoomChannel.getOwnedShards()))); + dataChannel.getOwnedShards()))); } int selectShard(UUID chatRoomId) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java index df9ee733..8ab50f1f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java @@ -15,7 +15,7 @@ import java.util.UUID; @Slf4j public class KafkaChatMessageService implements ChatMessageService { - private final ChatRoomChannel chatRoomChannel; + private final DataChannel dataChannel; private final UUID chatRoomId; private final LinkedHashMap messages = new LinkedHashMap<>(); @@ -27,7 +27,7 @@ public class KafkaChatMessageService implements ChatMessageService LocalDateTime timestamp, String text) { - return chatRoomChannel + return dataChannel .sendChatMessage(chatRoomId, key, timestamp, text) .doOnSuccess(message -> persistMessage(message)); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index da7ee75d..6810d06a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -2,6 +2,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -17,12 +19,72 @@ import org.springframework.stereotype.Component; @Slf4j public class KafkaServicesApplicationRunner implements ApplicationRunner { - private final ConsumerTaskExecutor chatRoomChannelTaskExecutor; + private final String infoTopic; + private final ThreadPoolTaskExecutor taskExecutor; + private final InfoChannel infoChannel; + private final DataChannel dataChannel; + private final Consumer infoChannelConsumer; + private final Consumer dataChannelConsumer; + private final WorkAssignor workAssignor; + + CompletableFuture infoChannelConsumerJob; + CompletableFuture dataChannelConsumerJob; +>>>>>>> 7fb62d3 (WIP:ALIGN) @Override public void run(ApplicationArguments args) throws Exception { +<<<<<<< HEAD chatRoomChannelTaskExecutor.executeConsumerTask(); +======= + List partitions = infoChannelConsumer + .partitionsFor(infoTopic) + .stream() + .map(partitionInfo -> new TopicPartition( + infoTopic, + partitionInfo.partition())) + .toList(); + infoChannelConsumer.assign(partitions); + log.info("Starting the consumer for the InfoChannel"); + infoChannelConsumerJob = taskExecutor + .submitCompletable(infoChannel) + .exceptionally(e -> + { + log.error("The consumer for the InfoChannel exited abnormally!", e); + return null; + }); + + while (infoChannel.loadInProgress()) + { + log.info("InfoChannel is still loading..."); + Thread.sleep(1000); + } + + workAssignor.assignWork(dataChannelConsumer); + log.info("Starting the consumer for the DataChannel"); + dataChannelConsumerJob = taskExecutor + .submitCompletable(dataChannel) + .exceptionally(e -> + { + log.error("The consumer for the DataChannel exited abnormally!", e); + return null; + }); + } + + @PreDestroy + public void joinChatRoomChannelConsumerJob() + { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + infoChannelConsumer.wakeup(); + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + dataChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } + + + interface WorkAssignor + { + void assignWork(Consumer consumer); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index cda0b94f..58a470d7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -3,8 +3,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -36,54 +36,84 @@ import java.util.Properties; public class KafkaServicesConfiguration { @Bean - ConsumerTaskExecutor chatRoomChannelTaskExecutor( + ConsumerTaskExecutor infoChannelTaskExecutor( ThreadPoolTaskExecutor taskExecutor, - ChatRoomChannel chatRoomChannel, - Consumer chatRoomChannelConsumer, + InfoChannel infoChannel, + Consumer dataChannelConsumer, ConsumerTaskExecutor.WorkAssignor workAssignor) { return new ConsumerTaskExecutor( taskExecutor, - chatRoomChannel, - chatRoomChannelConsumer, + infoChannel, + dataChannelConsumer, + workAssignor); + } + + @Bean + ConsumerTaskExecutor dataChannelTaskExecutor( + ThreadPoolTaskExecutor taskExecutor, + DataChannel dataChannel, + Consumer dataChannelConsumer, + ConsumerTaskExecutor.WorkAssignor workAssignor) + { + return new ConsumerTaskExecutor( + taskExecutor, + dataChannel, + dataChannelConsumer, workAssignor); } @Bean ConsumerTaskExecutor.WorkAssignor workAssignor( ChatBackendProperties properties, - ChatRoomChannel chatRoomChannel) + DataChannel dataChannel) { return consumer -> { List topics = - List.of(properties.getKafka().getChatRoomChannelTopic()); - consumer.subscribe(topics, chatRoomChannel); + List.of(properties.getKafka().getDataChannelTopic()); + consumer.subscribe(topics, dataChannel); }; } @Bean ChatHomeService kafkaChatHome( ChatBackendProperties properties, - ChatRoomChannel chatRoomChannel) + InfoChannel infoChannel, + DataChannel dataChannel) { return new KafkaChatHomeService( properties.getKafka().getNumPartitions(), - chatRoomChannel); + infoChannel, + dataChannel); + } + + @Bean + InfoChannel infoChannel( + ChatBackendProperties properties, + Producer producer, + Consumer infoChannelConsumer, + DataChannel dataChannel) + { + return new InfoChannel( + properties.getKafka().getInfoChannelTopic(), + producer, + infoChannelConsumer, + dataChannel); } @Bean - ChatRoomChannel chatRoomChannel( + DataChannel dataChannel( ChatBackendProperties properties, - Producer chatRoomChannelProducer, - Consumer chatRoomChannelConsumer, + Producer producer, + Consumer dataChannelConsumer, ZoneId zoneId, Clock clock) { - return new ChatRoomChannel( - properties.getKafka().getChatRoomChannelTopic(), - chatRoomChannelProducer, - chatRoomChannelConsumer, + return new DataChannel( + properties.getKafka().getDataChannelTopic(), + producer, + dataChannelConsumer, zoneId, properties.getKafka().getNumPartitions(), properties.getChatroomBufferSize(), @@ -91,7 +121,7 @@ public class KafkaServicesConfiguration } @Bean - Producer chatRoomChannelProducer( + Producer producer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, @@ -101,7 +131,7 @@ public class KafkaServicesConfiguration defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER"); return new KafkaProducer<>( properties, stringSerializer, @@ -126,7 +156,28 @@ public class KafkaServicesConfiguration } @Bean - Consumer chatRoomChannelConsumer( + Consumer infoChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + StringDeserializer stringDeserializer, + JsonDeserializer messageDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "info_channel"); + return new KafkaConsumer<>( + properties, + stringDeserializer, + messageDeserializer); + } + + @Bean + Consumer dataChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, @@ -136,10 +187,10 @@ public class KafkaServicesConfiguration defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, - "chatroom_channel"); + "data_channel"); return new KafkaConsumer<>( properties, stringDeserializer, @@ -168,7 +219,7 @@ public class KafkaServicesConfiguration String typeMappings () { return - "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + + "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," + "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java index 6f61592c..1aef3fd1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java @@ -11,6 +11,7 @@ public class AbstractMessageTo public enum ToType { COMMAND_CREATE_CHATROOM, EVENT_CHATMESSAGE_RECEIVED, + EVENT_CHATROOM_CREATED, } @Getter diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java index 17d3a397..d4f6508d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java @@ -1,5 +1,6 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages; +package de.juplo.kafka.chat.backend.implementation.kafka.messages.data; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import lombok.*; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java index 0cf42320..ae5a5019 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java @@ -1,2 +1,45 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;public class EventChatRoomCreated { +package de.juplo.kafka.chat.backend.implementation.kafka.messages.info; + +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.util.UUID; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class EventChatRoomCreated extends AbstractMessageTo +{ + private String id; + private String name; + private Integer shard; + + + public EventChatRoomCreated() + { + super(ToType.EVENT_CHATROOM_CREATED); + } + + + public ChatRoomInfo toChatRoomInfo() + { + return new ChatRoomInfo(UUID.fromString(id), name, shard); + } + + public static EventChatRoomCreated of(UUID id, String name, Integer shard) + { + EventChatRoomCreated event = new EventChatRoomCreated(); + + event.setId(id.toString()); + event.setName(name); + event.setShard(shard); + + return event; + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java index 8ea9cc23..853ee1cf 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java @@ -11,20 +11,18 @@ import org.springframework.data.mongodb.core.mapping.Document; @Getter(AccessLevel.PACKAGE) @Setter(AccessLevel.PACKAGE) @EqualsAndHashCode(of = { "id" }) -@ToString(of = { "id", "shard", "name" }) +@ToString(of = { "id", "name" }) @Document public class ChatRoomTo { @Id private String id; - private Integer shard; private String name; public static ChatRoomTo from(ChatRoomInfo chatRoomInfo) { return new ChatRoomTo( chatRoomInfo.getId().toString(), - chatRoomInfo.getShard(), chatRoomInfo.getName()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index c87036c9..3eb90960 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -36,18 +36,7 @@ public class MongoDbStorageStrategy implements StorageStrategy .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); - int shard = shardingStrategy.selectShard(chatRoomId); - - log.info( - "{} - old shard: {}, new shard: {}", - chatRoomId, - chatRoomTo.getShard(), - shard); - - return new ChatRoomInfo( - chatRoomId, - chatRoomTo.getName(), - shard); + return new ChatRoomInfo(chatRoomId, chatRoomTo.getName()); }); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index f257d5e9..630dc630 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -1,6 +1,6 @@ package de.juplo.kafka.chat.backend; -import de.juplo.kafka.chat.backend.implementation.kafka.ChatRoomChannel; +import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel; import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesApplicationRunner; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -19,7 +19,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.List; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC; +import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC; +import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC; @SpringBootTest( @@ -29,14 +30,18 @@ import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC; "chat.backend.kafka.client-id-PREFIX=TEST", "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "chat.backend.kafka.chatroom-channel-topic=" + TOPIC, + "chat.backend.kafka.info-topic=" + INFO_TOPIC, + "chat.backend.kafka.data-topic=" + DATA_TOPIC, "chat.backend.kafka.num-partitions=10", }) -@EmbeddedKafka(topics = { TOPIC }, partitions = 10) +@EmbeddedKafka( + topics = { INFO_TOPIC, DATA_TOPIC }, + partitions = 10) @Slf4j class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT { - final static String TOPIC = "KAFKA_CONFIGURATION_IT"; + final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL"; + final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL"; static CompletableFuture CONSUMER_JOB; @@ -48,19 +53,19 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @Autowired KafkaTemplate messageTemplate, @Autowired Consumer chatRoomChannelConsumer, @Autowired ThreadPoolTaskExecutor taskExecutor, - @Autowired ChatRoomChannel chatRoomChannel) + @Autowired DataChannel dataChannel) { - send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); + send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "command_create_chatroom"); send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - List assignedPartitions = List.of(new TopicPartition(TOPIC, 2)); + List assignedPartitions = List.of(new TopicPartition(INFO_TOPIC, 2)); chatRoomChannelConsumer.assign(assignedPartitions); - chatRoomChannel.onPartitionsAssigned(assignedPartitions); + dataChannel.onPartitionsAssigned(assignedPartitions); CONSUMER_JOB = taskExecutor - .submitCompletable(chatRoomChannel) + .submitCompletable(dataChannel) .exceptionally(e -> { log.error("The consumer for the ChatRoomChannel exited abnormally!", e); @@ -70,7 +75,7 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) { - ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); + ProducerRecord record = new ProducerRecord<>(INFO_TOPIC, key, value); record.headers().add("__TypeId__", typeId.getBytes()); SendResult result = kafkaTemplate.send(record).join(); log.info( diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 4aa362d7..7e54ee32 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -18,12 +18,16 @@ import org.springframework.context.annotation.Import; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.testcontainers.shaded.org.awaitility.Awaitility; import java.time.Clock; +import java.time.Duration; import java.util.List; +import java.util.UUID; import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS; -import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.TOPIC; +import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC; +import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.INFO_TOPIC; @SpringBootTest( @@ -38,14 +42,18 @@ import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServ "chat.backend.kafka.client-id-PREFIX=TEST", "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "chat.backend.kafka.chatroom-channel-topic=" + TOPIC, + "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC, + "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC, "chat.backend.kafka.num-partitions=" + NUM_SHARDS, }) -@EmbeddedKafka(topics = { TOPIC }, partitions = 10) +@EmbeddedKafka( + topics = { INFO_TOPIC, DATA_TOPIC }, + partitions = 10) @Slf4j public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest { - final static String TOPIC = "KAFKA_CHAT_HOME_TEST"; + final static String INFO_TOPIC = "KAFKA_CHAT_HOME_TEST_INFO"; + final static String DATA_TOPIC = "KAFKA_CHAT_HOME_TEST_DATA"; @TestConfiguration @@ -55,14 +63,22 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest { @Bean ConsumerTaskExecutor.WorkAssignor workAssignor( - ChatRoomChannel chatRoomChannel) + InfoChannel infoChannel, + DataChannel dataChannel) { return consumer -> { + // TODO: Darauf warten, dass der Chat-Room erzeugt wurde! + Awaitility + .await() + .atMost(Duration.ofSeconds(10)) + .until(() -> infoChannel + .getChatRoomInfo(UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7")) + .block() != null); List assignedPartitions = - List.of(new TopicPartition(TOPIC, 2)); + List.of(new TopicPartition(DATA_TOPIC, 2)); consumer.assign(assignedPartitions); - chatRoomChannel.onPartitionsAssigned(assignedPartitions); + dataChannel.onPartitionsAssigned(assignedPartitions); }; } @@ -79,18 +95,23 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @Autowired ConsumerTaskExecutor consumerTaskExecutor, @Autowired KafkaTemplate messageTemplate) { - send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); + send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); + send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); consumerTaskExecutor.executeConsumerTask(); } - static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) + static void send( + KafkaTemplate kafkaTemplate, + String topic, + String key, + String value, + String typeId) { - ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); + ProducerRecord record = new ProducerRecord<>(topic, key, value); record.headers().add("__TypeId__", typeId.getBytes()); SendResult result = kafkaTemplate.send(record).join(); log.info( diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java index d9d5a08e..72ade064 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages; +package de.juplo.kafka.chat.backend.implementation.kafka.messages.data; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; -- 2.20.1