From: Kai Moritz Date: Mon, 11 Sep 2023 17:00:51 +0000 (+0200) Subject: WIP:ALIGN X-Git-Tag: rebase--2023-09-13--21-01~12 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=718cfec3e7cae4341189a0d09aa6ad0ac37aa520;p=demos%2Fkafka%2Fchat WIP:ALIGN --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index c1067bc7..241ea9b9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -4,8 +4,8 @@ import de.juplo.kafka.chat.backend.domain.*; import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -81,7 +81,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener UUID chatRoomId, String name) { - CommandCreateChatRoomToData createChatRoomRequestTo = CommandCreateChatRoomToData.of(name); + CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); return Mono.create(sink -> { ProducerRecord record = @@ -128,7 +128,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener null, zdt.toEpochSecond(), chatRoomId.toString(), - EventChatDataMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); + EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); producer.send(record, ((metadata, exception) -> { @@ -251,7 +251,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener case COMMAND_CREATE_CHATROOM: createChatRoom( chatRoomId, - (CommandCreateChatRoomToData) record.value(), + (CommandCreateChatRoomTo) record.value(), record.partition()); break; @@ -262,7 +262,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener chatRoomId, timestamp, record.offset(), - (EventChatDataMessageReceivedTo) record.value(), + (EventChatMessageReceivedTo) record.value(), record.partition()); break; @@ -280,7 +280,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private void createChatRoom( UUID chatRoomId, - CommandCreateChatRoomToData createChatRoomRequestTo, + CommandCreateChatRoomTo createChatRoomRequestTo, Integer partition) { log.info( @@ -318,7 +318,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener UUID chatRoomId, LocalDateTime timestamp, long offset, - EventChatDataMessageReceivedTo chatMessageTo, + EventChatMessageReceivedTo chatMessageTo, int partition) { Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 2823d9df..2edf1f05 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -6,8 +6,8 @@ import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -30,40 +30,29 @@ import java.util.stream.IntStream; @Slf4j public class InfoChannel implements Runnable { - private final DataChannel dataChannel; private final String infoTopic; private final String dataTopic; private final Producer producer; private final Consumer consumer; - private final ZoneId zoneId; - private final int bufferSize; - private final Clock clock; private final Map chatRoomInfo; private boolean running; public InfoChannel( - DataChannel dataChannel, String infoTopic, String dataTopic, Producer producer, Consumer consumer, - ZoneId zoneId, - int bufferSize, - Clock clock) + ZoneId zoneId) { log.debug( "Creating InfoChannel for topic {}", infoTopic); - this.dataChannel = dataChannel; this.infoTopic = infoTopic; this.dataTopic = dataTopic; this.consumer = consumer; this.producer = producer; - this.zoneId = zoneId; - this.bufferSize = bufferSize; - this.clock = clock; this.chatRoomInfo = new HashMap<>(); } @@ -73,20 +62,20 @@ public class InfoChannel implements Runnable UUID chatRoomId, String name) { - CommandCreateChatRoomToData createChatRoomRequestTo = CommandCreateChatRoomToData.of(name); + CommandCreateChatRoomTo to = CommandCreateChatRoomTo.of(name); return Mono.create(sink -> { ProducerRecord record = new ProducerRecord<>( dataTopic, chatRoomId.toString(), - createChatRoomRequestTo); + to); producer.send(record, ((metadata, exception) -> { if (metadata != null) { - log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); + log.info("Successfully send chreate-request for chat room: {}", to); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); createChatRoom(chatRoomInfo); sink.success(chatRoomInfo); @@ -105,90 +94,6 @@ public class InfoChannel implements Runnable }); } - Mono sendChatMessage( - UUID chatRoomId, - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - infoTopic, - null, - zdt.toEpochSecond(), - chatRoomId.toString(), - EventChatDataMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - // On successful send - Message message = new Message(key, metadata.offset(), timestamp, text); - log.info("Successfully send message {}", message); - sink.success(message); - } - else - { - // On send-failure - log.error( - "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", - chatRoomId, - key, - timestamp, - text, - exception); - sink.error(exception); - } - })); - }); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - log.info("Newly assigned partitions! Pausing normal operations..."); - loadInProgress = true; - - consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = true; - this.currentOffset[partition] = currentOffset; - - log.info( - "Partition assigned: {} - loading messages: next={} -> current={}", - partition, - nextOffset[partition], - currentOffset); - - consumer.seek(topicPartition, nextOffset[partition]); - }); - - consumer.resume(partitions); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(topicPartition -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = false; - log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); - }); - } - - @Override - public void onPartitionsLost(Collection partitions) - { - log.warn("Lost partitions: {}, partitions"); - // TODO: Muss auf den Verlust anders reagiert werden? - onPartitionsRevoked(partitions); - } @Override public void run() @@ -232,9 +137,9 @@ public class InfoChannel implements Runnable log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void loadChatRoom(ConsumerRecords records) { - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { UUID chatRoomId = UUID.fromString(record.key()); @@ -243,7 +148,7 @@ public class InfoChannel implements Runnable case COMMAND_CREATE_CHATROOM: createChatRoom( chatRoomId, - (CommandCreateChatRoomToData) record.value(), + (CommandCreateChatRoomTo) record.value(), record.partition()); break; @@ -254,7 +159,7 @@ public class InfoChannel implements Runnable chatRoomId, timestamp, record.offset(), - (EventChatDataMessageReceivedTo) record.value(), + (EventChatMessageReceivedTo) record.value(), record.partition()); break; @@ -272,7 +177,7 @@ public class InfoChannel implements Runnable private void createChatRoom( UUID chatRoomId, - CommandCreateChatRoomToData createChatRoomRequestTo, + CommandCreateChatRoomTo createChatRoomRequestTo, Integer partition) { log.info( @@ -310,7 +215,7 @@ public class InfoChannel implements Runnable UUID chatRoomId, LocalDateTime timestamp, long offset, - EventChatDataMessageReceivedTo chatMessageTo, + EventChatMessageReceivedTo chatMessageTo, int partition) { Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index ecd44195..c7d5115b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -3,8 +3,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -139,8 +139,8 @@ public class KafkaServicesConfiguration String typeMappings () { return - "command_create_chatroom:" + CommandCreateChatRoomToData.class.getCanonicalName() + "," + - "event_chatmessage_received:" + EventChatDataMessageReceivedTo.class.getCanonicalName(); + "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + + "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName(); } @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomTo.java index ee5fe0f8..00c8b94a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomTo.java @@ -7,20 +7,20 @@ import lombok.*; @Setter @EqualsAndHashCode @ToString -public class CommandCreateChatRoomToData extends AbstractDataMessageTo +public class CommandCreateChatRoomTo extends AbstractDataMessageTo { private String name; - public CommandCreateChatRoomToData() + public CommandCreateChatRoomTo() { super(ToType.COMMAND_CREATE_CHATROOM); } - public static CommandCreateChatRoomToData of(String name) + public static CommandCreateChatRoomTo of(String name) { - CommandCreateChatRoomToData to = new CommandCreateChatRoomToData(); + CommandCreateChatRoomTo to = new CommandCreateChatRoomTo(); to.name = name; return to; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java index 20a0f5ca..97090111 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java @@ -7,22 +7,22 @@ import lombok.*; @Setter @EqualsAndHashCode @ToString -public class EventChatDataMessageReceivedTo extends AbstractDataMessageTo +public class EventChatMessageReceivedTo extends AbstractDataMessageTo { private String user; private Long id; private String text; - public EventChatDataMessageReceivedTo() + public EventChatMessageReceivedTo() { super(ToType.EVENT_CHATMESSAGE_RECEIVED); } - public static EventChatDataMessageReceivedTo of(String user, Long id, String text) + public static EventChatMessageReceivedTo of(String user, Long id, String text) { - EventChatDataMessageReceivedTo to = new EventChatDataMessageReceivedTo(); + EventChatMessageReceivedTo to = new EventChatMessageReceivedTo(); to.user = user; to.id = id; to.text = text; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java index c2311de6..27df2aa4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java @@ -17,8 +17,8 @@ public class EventChatRoomCreated extends AbstractInfoMessageTo private Integer shard; - public EventChatRoomCreated(ToType type) + public EventChatRoomCreated() { - super(type); + super(ToType.EVENT_CHATROOM_CREATED); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToTest.java index 64995750..044bf1dc 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToTest.java @@ -29,7 +29,7 @@ public class CommandCreateChatRoomToTest @Test public void testDeserialization() throws Exception { - CommandCreateChatRoomToData message = mapper.readValue(json, CommandCreateChatRoomToData.class); + CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class); assertThat(message.getName()).isEqualTo("Foo-Room!"); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java index 41d7bc12..72ade064 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java @@ -31,7 +31,7 @@ public class EventChatMessageReceivedToTest @Test public void testDeserialization() throws Exception { - EventChatDataMessageReceivedTo message = mapper.readValue(json, EventChatDataMessageReceivedTo.class); + EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class); assertThat(message.getId()).isEqualTo(1l); assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); assertThat(message.getUser()).isEqualTo("Peter");