From 74ba4de7ad08d69a88118f893297c9889a7879c0 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 11 Sep 2023 19:07:00 +0200 Subject: [PATCH] WIP:ALIGN --- .../implementation/kafka/DataChannel.java | 22 +++++++++---------- .../implementation/kafka/InfoChannel.java | 18 +++++++-------- .../kafka/KafkaServicesApplicationRunner.java | 2 +- .../kafka/KafkaServicesConfiguration.java | 22 +++++++++---------- .../kafka/messages/AbstractMessageTo.java | 5 +++-- .../data/CommandCreateChatRoomTo.java | 3 ++- .../data/EventChatMessageReceivedTo.java | 3 ++- .../messages/info/EventChatRoomCreated.java | 3 ++- 8 files changed, 40 insertions(+), 38 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 241ea9b9..738901aa 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -3,7 +3,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.*; import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import lombok.Getter; @@ -27,8 +27,8 @@ import java.util.stream.IntStream; public class DataChannel implements Runnable, ConsumerRebalanceListener { private final String topic; - private final Producer producer; - private final Consumer consumer; + private final Producer producer; + private final Consumer consumer; private final ZoneId zoneId; private final int numShards; private final int bufferSize; @@ -45,8 +45,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener public DataChannel( String topic, - Producer producer, - Consumer consumer, + Producer producer, + Consumer dataChannelConsumer, ZoneId zoneId, int numShards, int bufferSize, @@ -57,7 +57,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener topic, numShards); this.topic = topic; - this.consumer = consumer; + this.consumer = dataChannelConsumer; this.producer = producer; this.zoneId = zoneId; this.numShards = numShards; @@ -84,7 +84,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( topic, chatRoomId.toString(), @@ -122,7 +122,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( topic, null, @@ -207,7 +207,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); if (loadInProgress) @@ -240,9 +240,9 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void loadChatRoom(ConsumerRecords records) { - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { UUID chatRoomId = UUID.fromString(record.key()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 2edf1f05..77f0cbb9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -5,7 +5,7 @@ import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import lombok.extern.slf4j.Slf4j; @@ -20,7 +20,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -32,8 +31,8 @@ public class InfoChannel implements Runnable { private final String infoTopic; private final String dataTopic; - private final Producer producer; - private final Consumer consumer; + private final Producer producer; + private final Consumer consumer; private final Map chatRoomInfo; private boolean running; @@ -42,16 +41,15 @@ public class InfoChannel implements Runnable public InfoChannel( String infoTopic, String dataTopic, - Producer producer, - Consumer consumer, - ZoneId zoneId) + Producer producer, + Consumer infoChannelConsumer) { log.debug( "Creating InfoChannel for topic {}", infoTopic); this.infoTopic = infoTopic; this.dataTopic = dataTopic; - this.consumer = consumer; + this.consumer = infoChannelConsumer; this.producer = producer; this.chatRoomInfo = new HashMap<>(); } @@ -65,7 +63,7 @@ public class InfoChannel implements Runnable CommandCreateChatRoomTo to = CommandCreateChatRoomTo.of(name); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( dataTopic, chatRoomId.toString(), @@ -104,7 +102,7 @@ public class InfoChannel implements Runnable { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); if (loadInProgress) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index a7235b30..a35cee02 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -1,7 +1,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 5fbaa7bc..ed1ccddc 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -2,7 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import org.apache.kafka.clients.consumer.Consumer; @@ -75,8 +75,8 @@ public class KafkaServicesConfiguration @Bean DataChannel chatRoomChannel( ChatBackendProperties properties, - Producer chatRoomChannelProducer, - Consumer chatRoomChannelConsumer, + Producer chatRoomChannelProducer, + Consumer chatRoomChannelConsumer, ZoneId zoneId, Clock clock) { @@ -91,11 +91,11 @@ public class KafkaServicesConfiguration } @Bean - Producer chatRoomChannelProducer( + Producer chatRoomChannelProducer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, - JsonSerializer messageSerializer) + JsonSerializer messageSerializer) { Map properties = new HashMap<>(); defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -115,9 +115,9 @@ public class KafkaServicesConfiguration } @Bean - JsonSerializer chatMessageSerializer(String typeMappings) + JsonSerializer chatMessageSerializer(String typeMappings) { - JsonSerializer serializer = new JsonSerializer<>(); + JsonSerializer serializer = new JsonSerializer<>(); serializer.configure( Map.of( JsonSerializer.TYPE_MAPPINGS, typeMappings), @@ -126,11 +126,11 @@ public class KafkaServicesConfiguration } @Bean - Consumer chatRoomChannelConsumer( + Consumer chatRoomChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, - JsonDeserializer messageDeserializer) + JsonDeserializer messageDeserializer) { Map properties = new HashMap<>(); defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -153,9 +153,9 @@ public class KafkaServicesConfiguration } @Bean - JsonDeserializer chatMessageDeserializer(String typeMappings) + JsonDeserializer chatMessageDeserializer(String typeMappings) { - JsonDeserializer deserializer = new JsonDeserializer<>(); + JsonDeserializer deserializer = new JsonDeserializer<>(); deserializer.configure( Map.of( JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(), diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java index c96f43bb..1aef3fd1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages.data; +package de.juplo.kafka.chat.backend.implementation.kafka.messages; import lombok.Getter; @@ -6,11 +6,12 @@ import lombok.RequiredArgsConstructor; @RequiredArgsConstructor -public class AbstractDataMessageTo +public class AbstractMessageTo { public enum ToType { COMMAND_CREATE_CHATROOM, EVENT_CHATMESSAGE_RECEIVED, + EVENT_CHATROOM_CREATED, } @Getter diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomTo.java index 00c8b94a..9c8bd757 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomTo.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka.messages.data; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import lombok.*; @@ -7,7 +8,7 @@ import lombok.*; @Setter @EqualsAndHashCode @ToString -public class CommandCreateChatRoomTo extends AbstractDataMessageTo +public class CommandCreateChatRoomTo extends AbstractMessageTo { private String name; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java index 97090111..d4f6508d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka.messages.data; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import lombok.*; @@ -7,7 +8,7 @@ import lombok.*; @Setter @EqualsAndHashCode @ToString -public class EventChatMessageReceivedTo extends AbstractDataMessageTo +public class EventChatMessageReceivedTo extends AbstractMessageTo { private String user; private Long id; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java index 27df2aa4..75907e17 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka.messages.info; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; @@ -10,7 +11,7 @@ import lombok.ToString; @Setter @EqualsAndHashCode @ToString -public class EventChatRoomCreated extends AbstractInfoMessageTo +public class EventChatRoomCreated extends AbstractMessageTo { private String id; private String name; -- 2.20.1