From: Kai Moritz Date: Mon, 11 Sep 2023 16:36:13 +0000 (+0200) Subject: WIP:ALIGN X-Git-Tag: rebase--2023-09-14--22-59~19 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=7adfd4a9c5f5e0b05447a084169423ad79f52338;p=demos%2Fkafka%2Fchat WIP:ALIGN --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 9c80f5d6..381c6c68 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -36,7 +36,8 @@ public class ChatBackendProperties { private String clientIdPrefix; private String bootstrapServers = ":9092"; - private String chatRoomChannelTopic = "message_channel"; + private String infoChannelTopic = "info_channel"; + private String dataChannelTopic = "data_channel"; private int numPartitions = 2; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 3f4faa5b..c1067bc7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -3,9 +3,9 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.*; import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -16,7 +16,6 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; @@ -28,8 +27,8 @@ import java.util.stream.IntStream; public class DataChannel implements Runnable, ConsumerRebalanceListener { private final String topic; - private final Producer producer; - private final Consumer consumer; + private final Producer producer; + private final Consumer consumer; private final ZoneId zoneId; private final int numShards; private final int bufferSize; @@ -46,15 +45,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener public DataChannel( String topic, - Producer producer, - Consumer consumer, + Producer producer, + Consumer consumer, ZoneId zoneId, int numShards, int bufferSize, Clock clock) { log.debug( - "Creating ChatRoomChannel for topic {} with {} partitions", + "Creating DataChannel for topic {} with {} partitions", topic, numShards); this.topic = topic; @@ -82,10 +81,10 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener UUID chatRoomId, String name) { - CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); + CommandCreateChatRoomToData createChatRoomRequestTo = CommandCreateChatRoomToData.of(name); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( topic, chatRoomId.toString(), @@ -123,13 +122,13 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( topic, null, zdt.toEpochSecond(), chatRoomId.toString(), - EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); + EventChatDataMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); producer.send(record, ((metadata, exception) -> { @@ -208,7 +207,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); if (loadInProgress) @@ -241,9 +240,9 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void loadChatRoom(ConsumerRecords records) { - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { UUID chatRoomId = UUID.fromString(record.key()); @@ -252,7 +251,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener case COMMAND_CREATE_CHATROOM: createChatRoom( chatRoomId, - (CommandCreateChatRoomTo) record.value(), + (CommandCreateChatRoomToData) record.value(), record.partition()); break; @@ -263,7 +262,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener chatRoomId, timestamp, record.offset(), - (EventChatMessageReceivedTo) record.value(), + (EventChatDataMessageReceivedTo) record.value(), record.partition()); break; @@ -281,7 +280,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private void createChatRoom( UUID chatRoomId, - CommandCreateChatRoomTo createChatRoomRequestTo, + CommandCreateChatRoomToData createChatRoomRequestTo, Integer partition) { log.info( @@ -319,7 +318,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener UUID chatRoomId, LocalDateTime timestamp, long offset, - EventChatMessageReceivedTo chatMessageTo, + EventChatDataMessageReceivedTo chatMessageTo, int partition) { Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); @@ -356,7 +355,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Integer partition, ChatRoomData chatRoomData) { - if (this.chatRoomInfo[partition].containsKey(chatRoomId)) + if (this.chatRoomData[partition].containsKey(chatRoomId)) { log.warn( "Ignoring existing chat-room for {}: {}", @@ -370,9 +369,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener partition, chatRoomData); - this.chatRoomInfo[partition].put( - chatRoomId, - new ChatRoomInfo(chatRoomId, name, partition)); this.chatRoomData[partition].put(chatRoomId, chatRoomData); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index a3a5b43a..2823d9df 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -5,13 +5,11 @@ import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; -import lombok.Getter; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; @@ -30,58 +28,43 @@ import java.util.stream.IntStream; @Slf4j -public class InfoChannel implements Runnable, ConsumerRebalanceListener +public class InfoChannel implements Runnable { - private final String topic; - private final Producer producer; - private final Consumer consumer; + private final DataChannel dataChannel; + private final String infoTopic; + private final String dataTopic; + private final Producer producer; + private final Consumer consumer; private final ZoneId zoneId; - private final int numShards; private final int bufferSize; private final Clock clock; - private final boolean[] isShardOwned; - private final long[] currentOffset; - private final long[] nextOffset; - private final Map[] chatRoomInfo; - private final Map[] chatRoomData; + private final Map chatRoomInfo; private boolean running; - @Getter - private volatile boolean loadInProgress; public InfoChannel( - String topic, - Producer producer, - Consumer consumer, + DataChannel dataChannel, + String infoTopic, + String dataTopic, + Producer producer, + Consumer consumer, ZoneId zoneId, - int numShards, int bufferSize, Clock clock) { log.debug( - "Creating ChatRoomChannel for topic {} with {} partitions", - topic, - numShards); - this.topic = topic; + "Creating InfoChannel for topic {}", + infoTopic); + this.dataChannel = dataChannel; + this.infoTopic = infoTopic; + this.dataTopic = dataTopic; this.consumer = consumer; this.producer = producer; this.zoneId = zoneId; - this.numShards = numShards; this.bufferSize = bufferSize; this.clock = clock; - this.isShardOwned = new boolean[numShards]; - this.currentOffset = new long[numShards]; - this.nextOffset = new long[numShards]; - this.chatRoomInfo = new Map[numShards]; - this.chatRoomData = new Map[numShards]; - IntStream - .range(0, numShards) - .forEach(shard -> - { - this.chatRoomInfo[shard] = new HashMap<>(); - this.chatRoomData[shard] = new HashMap<>(); - }); + this.chatRoomInfo = new HashMap<>(); } @@ -90,12 +73,12 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener UUID chatRoomId, String name) { - CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); + CommandCreateChatRoomToData createChatRoomRequestTo = CommandCreateChatRoomToData.of(name); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( - topic, + dataTopic, chatRoomId.toString(), createChatRoomRequestTo); @@ -131,13 +114,13 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( - topic, + infoTopic, null, zdt.toEpochSecond(), chatRoomId.toString(), - EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); + EventChatDataMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); producer.send(record, ((metadata, exception) -> { @@ -216,7 +199,7 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); if (loadInProgress) @@ -249,9 +232,9 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void loadChatRoom(ConsumerRecords records) { - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { UUID chatRoomId = UUID.fromString(record.key()); @@ -260,7 +243,7 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener case COMMAND_CREATE_CHATROOM: createChatRoom( chatRoomId, - (CommandCreateChatRoomTo) record.value(), + (CommandCreateChatRoomToData) record.value(), record.partition()); break; @@ -271,7 +254,7 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener chatRoomId, timestamp, record.offset(), - (EventChatMessageReceivedTo) record.value(), + (EventChatDataMessageReceivedTo) record.value(), record.partition()); break; @@ -289,7 +272,7 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener private void createChatRoom( UUID chatRoomId, - CommandCreateChatRoomTo createChatRoomRequestTo, + CommandCreateChatRoomToData createChatRoomRequestTo, Integer partition) { log.info( @@ -327,7 +310,7 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener UUID chatRoomId, LocalDateTime timestamp, long offset, - EventChatMessageReceivedTo chatMessageTo, + EventChatDataMessageReceivedTo chatMessageTo, int partition) { Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); @@ -353,7 +336,7 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener consumer.pause(IntStream .range(0, numShards) .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> new TopicPartition(topic, shard)) + .mapToObj(shard -> new TopicPartition(infoTopic, shard)) .toList()); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java index 5019ed20..73990e64 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java @@ -18,7 +18,8 @@ import java.util.*; public class KafkaChatHomeService implements ChatHomeService { private final int numPartitions; - private final ChatRoomChannel chatRoomChannel; + private final InfoChannel infoChannel; + private final DataChannel dataChannel; @@ -26,37 +27,37 @@ public class KafkaChatHomeService implements ChatHomeService public Mono createChatRoom(UUID id, String name) { log.info("Sending create-command for chat rooom: id={}, name={}"); - return chatRoomChannel.sendCreateChatRoomRequest(id, name); + return infoChannel.sendCreateChatRoomRequest(id, name); } @Override public Mono getChatRoomInfo(UUID id) { int shard = selectShard(id); - return chatRoomChannel + return infoChannel .getChatRoomInfo(shard, id) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( id, shard, - chatRoomChannel.getOwnedShards()))); + dataChannel.getOwnedShards()))); } @Override public Flux getChatRoomInfo() { - return chatRoomChannel.getChatRoomInfo(); + return infoChannel.getChatRoomInfo(); } @Override public Mono getChatRoomData(UUID id) { int shard = selectShard(id); - return chatRoomChannel + return dataChannel .getChatRoomData(shard, id) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( id, shard, - chatRoomChannel.getOwnedShards()))); + dataChannel.getOwnedShards()))); } int selectShard(UUID chatRoomId) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java index df9ee733..8ab50f1f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java @@ -15,7 +15,7 @@ import java.util.UUID; @Slf4j public class KafkaChatMessageService implements ChatMessageService { - private final ChatRoomChannel chatRoomChannel; + private final DataChannel dataChannel; private final UUID chatRoomId; private final LinkedHashMap messages = new LinkedHashMap<>(); @@ -27,7 +27,7 @@ public class KafkaChatMessageService implements ChatMessageService LocalDateTime timestamp, String text) { - return chatRoomChannel + return dataChannel .sendChatMessage(chatRoomId, key, timestamp, text) .doOnSuccess(message -> persistMessage(message)); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index a863353e..a7235b30 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -1,7 +1,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo; import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -26,7 +26,7 @@ import java.util.concurrent.CompletableFuture; public class KafkaServicesApplicationRunner implements ApplicationRunner { private final ThreadPoolTaskExecutor taskExecutor; - private final ChatRoomChannel chatRoomChannel; + private final DataChannel dataChannel; private final Consumer chatRoomChannelConsumer; private final WorkAssignor workAssignor; @@ -39,7 +39,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner workAssignor.assignWork(chatRoomChannelConsumer); log.info("Starting the consumer for the ChatRoomChannel"); chatRoomChannelConsumerJob = taskExecutor - .submitCompletable(chatRoomChannel) + .submitCompletable(dataChannel) .exceptionally(e -> { log.error("The consumer for the ChatRoomChannel exited abnormally!", e); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index bb67951a..96f5b333 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -2,9 +2,9 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -65,23 +65,23 @@ public class KafkaServicesConfiguration @Bean ChatHomeService kafkaChatHome( ChatBackendProperties properties, - ChatRoomChannel chatRoomChannel) + DataChannel dataChannel) { return new KafkaChatHomeService( properties.getKafka().getNumPartitions(), - chatRoomChannel); + dataChannel); } @Bean - ChatRoomChannel chatRoomChannel( + DataChannel chatRoomChannel( ChatBackendProperties properties, - Producer chatRoomChannelProducer, - Consumer chatRoomChannelConsumer, + Producer chatRoomChannelProducer, + Consumer chatRoomChannelConsumer, ZoneId zoneId, Clock clock) { - return new ChatRoomChannel( - properties.getKafka().getChatRoomChannelTopic(), + return new DataChannel( + properties.getKafka().getDataChannelTopic(), chatRoomChannelProducer, chatRoomChannelConsumer, zoneId, @@ -91,11 +91,11 @@ public class KafkaServicesConfiguration } @Bean - Producer chatRoomChannelProducer( + Producer chatRoomChannelProducer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, - JsonSerializer messageSerializer) + JsonSerializer messageSerializer) { Map properties = new HashMap<>(); defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -115,9 +115,9 @@ public class KafkaServicesConfiguration } @Bean - JsonSerializer chatMessageSerializer(String typeMappings) + JsonSerializer chatMessageSerializer(String typeMappings) { - JsonSerializer serializer = new JsonSerializer<>(); + JsonSerializer serializer = new JsonSerializer<>(); serializer.configure( Map.of( JsonSerializer.TYPE_MAPPINGS, typeMappings), @@ -126,11 +126,11 @@ public class KafkaServicesConfiguration } @Bean - Consumer chatRoomChannelConsumer( + Consumer chatRoomChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, - JsonDeserializer messageDeserializer) + JsonDeserializer messageDeserializer) { Map properties = new HashMap<>(); defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -153,9 +153,9 @@ public class KafkaServicesConfiguration } @Bean - JsonDeserializer chatMessageDeserializer(String typeMappings) + JsonDeserializer chatMessageDeserializer(String typeMappings) { - JsonDeserializer deserializer = new JsonDeserializer<>(); + JsonDeserializer deserializer = new JsonDeserializer<>(); deserializer.configure( Map.of( JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(), @@ -168,8 +168,8 @@ public class KafkaServicesConfiguration String typeMappings () { return - "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + - "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName(); + "command_create_chatroom:" + CommandCreateChatRoomToData.class.getCanonicalName() + "," + + "event_chatmessage_received:" + EventChatDataMessageReceivedTo.class.getCanonicalName(); } @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/AbstractDataMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/AbstractDataMessageTo.java index 6f61592c..c96f43bb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/AbstractDataMessageTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/AbstractDataMessageTo.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages; +package de.juplo.kafka.chat.backend.implementation.kafka.messages.data; import lombok.Getter; @@ -6,7 +6,7 @@ import lombok.RequiredArgsConstructor; @RequiredArgsConstructor -public class AbstractMessageTo +public class AbstractDataMessageTo { public enum ToType { COMMAND_CREATE_CHATROOM, diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToData.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToData.java index 29ba77c6..ee5fe0f8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToData.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToData.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages; +package de.juplo.kafka.chat.backend.implementation.kafka.messages.data; import lombok.*; @@ -7,20 +7,20 @@ import lombok.*; @Setter @EqualsAndHashCode @ToString -public class CommandCreateChatRoomTo extends AbstractMessageTo +public class CommandCreateChatRoomToData extends AbstractDataMessageTo { private String name; - public CommandCreateChatRoomTo() + public CommandCreateChatRoomToData() { super(ToType.COMMAND_CREATE_CHATROOM); } - public static CommandCreateChatRoomTo of(String name) + public static CommandCreateChatRoomToData of(String name) { - CommandCreateChatRoomTo to = new CommandCreateChatRoomTo(); + CommandCreateChatRoomToData to = new CommandCreateChatRoomToData(); to.name = name; return to; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatDataMessageReceivedTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatDataMessageReceivedTo.java index 17d3a397..20a0f5ca 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatDataMessageReceivedTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatDataMessageReceivedTo.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages; +package de.juplo.kafka.chat.backend.implementation.kafka.messages.data; import lombok.*; @@ -7,22 +7,22 @@ import lombok.*; @Setter @EqualsAndHashCode @ToString -public class EventChatMessageReceivedTo extends AbstractMessageTo +public class EventChatDataMessageReceivedTo extends AbstractDataMessageTo { private String user; private Long id; private String text; - public EventChatMessageReceivedTo() + public EventChatDataMessageReceivedTo() { super(ToType.EVENT_CHATMESSAGE_RECEIVED); } - public static EventChatMessageReceivedTo of(String user, Long id, String text) + public static EventChatDataMessageReceivedTo of(String user, Long id, String text) { - EventChatMessageReceivedTo to = new EventChatMessageReceivedTo(); + EventChatDataMessageReceivedTo to = new EventChatDataMessageReceivedTo(); to.user = user; to.id = id; to.text = text; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java index 0cf42320..c2311de6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java @@ -1,2 +1,24 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;public class EventChatRoomCreated { +package de.juplo.kafka.chat.backend.implementation.kafka.messages.info; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class EventChatRoomCreated extends AbstractInfoMessageTo +{ + private String id; + private String name; + private Integer shard; + + + public EventChatRoomCreated(ToType type) + { + super(type); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java index 8ea9cc23..853ee1cf 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java @@ -11,20 +11,18 @@ import org.springframework.data.mongodb.core.mapping.Document; @Getter(AccessLevel.PACKAGE) @Setter(AccessLevel.PACKAGE) @EqualsAndHashCode(of = { "id" }) -@ToString(of = { "id", "shard", "name" }) +@ToString(of = { "id", "name" }) @Document public class ChatRoomTo { @Id private String id; - private Integer shard; private String name; public static ChatRoomTo from(ChatRoomInfo chatRoomInfo) { return new ChatRoomTo( chatRoomInfo.getId().toString(), - chatRoomInfo.getShard(), chatRoomInfo.getName()); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index f257d5e9..630dc630 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -1,6 +1,6 @@ package de.juplo.kafka.chat.backend; -import de.juplo.kafka.chat.backend.implementation.kafka.ChatRoomChannel; +import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel; import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesApplicationRunner; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -19,7 +19,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.List; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC; +import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC; +import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC; @SpringBootTest( @@ -29,14 +30,18 @@ import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC; "chat.backend.kafka.client-id-PREFIX=TEST", "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "chat.backend.kafka.chatroom-channel-topic=" + TOPIC, + "chat.backend.kafka.info-topic=" + INFO_TOPIC, + "chat.backend.kafka.data-topic=" + DATA_TOPIC, "chat.backend.kafka.num-partitions=10", }) -@EmbeddedKafka(topics = { TOPIC }, partitions = 10) +@EmbeddedKafka( + topics = { INFO_TOPIC, DATA_TOPIC }, + partitions = 10) @Slf4j class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT { - final static String TOPIC = "KAFKA_CONFIGURATION_IT"; + final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL"; + final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL"; static CompletableFuture CONSUMER_JOB; @@ -48,19 +53,19 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @Autowired KafkaTemplate messageTemplate, @Autowired Consumer chatRoomChannelConsumer, @Autowired ThreadPoolTaskExecutor taskExecutor, - @Autowired ChatRoomChannel chatRoomChannel) + @Autowired DataChannel dataChannel) { - send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); + send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "command_create_chatroom"); send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - List assignedPartitions = List.of(new TopicPartition(TOPIC, 2)); + List assignedPartitions = List.of(new TopicPartition(INFO_TOPIC, 2)); chatRoomChannelConsumer.assign(assignedPartitions); - chatRoomChannel.onPartitionsAssigned(assignedPartitions); + dataChannel.onPartitionsAssigned(assignedPartitions); CONSUMER_JOB = taskExecutor - .submitCompletable(chatRoomChannel) + .submitCompletable(dataChannel) .exceptionally(e -> { log.error("The consumer for the ChatRoomChannel exited abnormally!", e); @@ -70,7 +75,7 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) { - ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); + ProducerRecord record = new ProducerRecord<>(INFO_TOPIC, key, value); record.headers().add("__TypeId__", typeId.getBytes()); SendResult result = kafkaTemplate.send(record).join(); log.info( diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToTest.java index b0311126..64995750 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToTest.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages; +package de.juplo.kafka.chat.backend.implementation.kafka.messages.data; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -29,7 +29,7 @@ public class CommandCreateChatRoomToTest @Test public void testDeserialization() throws Exception { - CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class); + CommandCreateChatRoomToData message = mapper.readValue(json, CommandCreateChatRoomToData.class); assertThat(message.getName()).isEqualTo("Foo-Room!"); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java index d9d5a08e..41d7bc12 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages; +package de.juplo.kafka.chat.backend.implementation.kafka.messages.data; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -31,7 +31,7 @@ public class EventChatMessageReceivedToTest @Test public void testDeserialization() throws Exception { - EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class); + EventChatDataMessageReceivedTo message = mapper.readValue(json, EventChatDataMessageReceivedTo.class); assertThat(message.getId()).isEqualTo(1l); assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); assertThat(message.getUser()).isEqualTo("Peter");