From 6125dc1a824c4549f8fb45183f2e5f8d7f7ea5b1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 12 Sep 2023 19:47:35 +0200 Subject: [PATCH] WIP:ALIGN --- .../implementation/kafka/DataChannel.java | 36 ++++--------------- .../implementation/kafka/InfoChannel.java | 26 +++++++------- .../kafka/KafkaChatHomeService.java | 2 +- .../messages/info/EventChatRoomCreated.java | 3 ++ 4 files changed, 22 insertions(+), 45 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 4c9abd1e..85984642 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -212,13 +212,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener switch (record.value().getType()) { - case COMMAND_CREATE_CHATROOM: - createChatRoom( - chatRoomId, - (CommandCreateChatRoomTo) record.value(), - record.partition()); - break; - case EVENT_CHATMESSAGE_RECEIVED: Instant instant = Instant.ofEpochSecond(record.timestamp()); LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); @@ -242,31 +235,14 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener } } - private void createChatRoom( - UUID chatRoomId, - CommandCreateChatRoomTo createChatRoomRequestTo, - Integer partition) + void createChatRoom(ChatRoomInfo chatRoomInfo) { - log.info( - "Loading ChatRoom {} for shard {} with buffer-size {}", - chatRoomId, - partition, - bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - ChatRoomData chatRoomData = new ChatRoomData( - clock, - service, - bufferSize); - putChatRoom( - chatRoomId, - createChatRoomRequestTo.getName(), - partition, - chatRoomData); - } - + if (!isShardOwned[chatRoomInfo.getShard()]) + { + log.debug("Ignoring not owned chat-room {}", chatRoomInfo); + return; + } - private void createChatRoom(ChatRoomInfo chatRoomInfo) - { UUID id = chatRoomInfo.getId(); log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); KafkaChatMessageService service = new KafkaChatMessageService(this, id); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 77f0cbb9..97739b53 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -8,6 +8,7 @@ import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -29,8 +30,7 @@ import java.util.stream.IntStream; @Slf4j public class InfoChannel implements Runnable { - private final String infoTopic; - private final String dataTopic; + private final String topic; private final Producer producer; private final Consumer consumer; private final Map chatRoomInfo; @@ -39,16 +39,14 @@ public class InfoChannel implements Runnable public InfoChannel( - String infoTopic, - String dataTopic, + String topic, Producer producer, Consumer infoChannelConsumer) { log.debug( "Creating InfoChannel for topic {}", - infoTopic); - this.infoTopic = infoTopic; - this.dataTopic = dataTopic; + topic); + this.topic = topic; this.consumer = infoChannelConsumer; this.producer = producer; this.chatRoomInfo = new HashMap<>(); @@ -56,16 +54,16 @@ public class InfoChannel implements Runnable - Mono sendCreateChatRoomRequest( + Mono sendChatRoomCreatedEvent( UUID chatRoomId, String name) { - CommandCreateChatRoomTo to = CommandCreateChatRoomTo.of(name); + EventChatRoomCreated to = EventChatRoomCreated.of(name); return Mono.create(sink -> { ProducerRecord record = new ProducerRecord<>( - dataTopic, + topic<, chatRoomId.toString(), to); @@ -135,15 +133,15 @@ public class InfoChannel implements Runnable log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void loadChatRoom(ConsumerRecords records) { - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { UUID chatRoomId = UUID.fromString(record.key()); switch (record.value().getType()) { - case COMMAND_CREATE_CHATROOM: + case EVENT_CHATROOM_CREATED: createChatRoom( chatRoomId, (CommandCreateChatRoomTo) record.value(), @@ -239,7 +237,7 @@ public class InfoChannel implements Runnable consumer.pause(IntStream .range(0, numShards) .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> new TopicPartition(infoTopic, shard)) + .mapToObj(shard -> new TopicPartition(topic, shard)) .toList()); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java index 73990e64..707e843b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java @@ -27,7 +27,7 @@ public class KafkaChatHomeService implements ChatHomeService public Mono createChatRoom(UUID id, String name) { log.info("Sending create-command for chat rooom: id={}, name={}"); - return infoChannel.sendCreateChatRoomRequest(id, name); + return infoChannel.sendChatRoomCreatedEvent(id, name); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java index 75907e17..b5cf458b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java @@ -22,4 +22,7 @@ public class EventChatRoomCreated extends AbstractMessageTo { super(ToType.EVENT_CHATROOM_CREATED); } + + + public static EventChatRoomCreated of(String ) } -- 2.20.1