From 07823608c1a26b9c3dd6654dc3f21ba7fb856d1e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 28 Feb 2023 18:54:37 +0100 Subject: [PATCH] NEU --- README.txt | 72 +++++ pom.xml | 9 + .../chat/backend/ChatBackendProperties.java | 14 +- .../kafka/chat/backend/domain/ChatRoom.java | 5 + .../chat/backend/domain/ChatRoomFactory.java | 2 +- .../inmemory/InMemoryChatRoomFactory.java | 2 +- .../persistence/kafka/ChatHomeLoader.java | 73 ----- .../persistence/kafka/ChatMessageChannel.java | 278 ++++++++++++++++++ ...ChatRoomActiveMessageHandlingStrategy.java | 69 ----- .../persistence/kafka/ChatRoomChannel.java | 110 +++++++ ...hatRoomLoadingMessageHandlingStrategy.java | 44 --- .../backend/persistence/kafka/ChatRoomTo.java | 30 ++ ...atroomInactiveMessageHandlingStrategy.java | 25 -- .../persistence/kafka/KafkaChatHome.java | 49 +++ .../kafka/KafkaChatHomeService.java | 115 -------- .../kafka/KafkaChatRoomFactory.java | 13 +- .../kafka/KafkaChatRoomService.java | 65 +--- .../kafka/KafkaServicesApplicationRunner.java | 83 ++++++ .../kafka/KafkaServicesConfiguration.java | 270 +++++++++++++++++ .../kafka/LoadInProgressException.java | 17 ++ .../kafka/MessageHandlingStrategy.java | 15 - .../chat/backend/KafkaConfigurationIT.java | 46 +++ 22 files changed, 1005 insertions(+), 401 deletions(-) create mode 100644 README.txt delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java create mode 100644 src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java diff --git a/README.txt b/README.txt new file mode 100644 index 00000000..8aaea7be --- /dev/null +++ b/README.txt @@ -0,0 +1,72 @@ +Aktuelle Idee für die Kafka-Anbindung +===================================== + +- *Beobachtung:* Alle schreibenden Anfragen für Nachrichten müssen erst + durch `ChatHomeService.getChatRoom(int, UUID)` den zuständigen + `ChatRoom` ermitteln, bevor sie die Nachricht schreiben können. + - D.h., das Locking, das während einem Rebalance nötig ist, kann + *vollständig* in `KafkaChatHomeService` umgesetzt werden. + - In `KafkaChatRoomService` muss *keinerlei* Kontrolle mehr erfolgen, + ob der `ChatRoom` tatsächlich gerade in die Zuständigkeit der Instanz + fällt, da die Anfragen *hier nie ankommen*, wenn die Instanz nicht + zuständig ist, da sie dann bereits in `getChatRoom(int, UUID)` + abgelehnt werden! + - Die in der Domain-Klasse `ChatRoom` definierte Logik, für die + Behandlung doppelter Nachrichten *ist vollständig valide*, da Anfragen + für einen bestimmten `ChatRoom` dort (bei korrekt implementiertem Locking + in `KafkaChatHomeService`) nur ankommen, wenn die Instanz *tatsächlich* + für den `ChatRoom` zuständig ist. + - D.h. insbesondere auch, dass die Antwort dort (also in dem `ChatRoom`) + erst ankommen, wenn dieser *vollständig geladen* ist, so dass die lokale + Kontrolle auf doppelte Nachrichten logisch gültig ist. +- *Anforderung:* Wenn ein Rebalance aktiv ist, wird die Instanz gelockt. + - Das Locking erfolg in `KafkaChatRoomService`, durch das alle Anfragen + durchgreifen müssen, so dass hier *zentral alle Aktionen* auf einzelnen + `ChatRoom`-Instanzen *unterbunden* werden können. +- *Vereinfachung:* Wenn `KafkaChatRoomService` gelockt ist, wird für alle + Zugriffe eine `ShardNotOwnedException` erzeugt. + - Dadurch wird das Zustands-Handling *extrem vereinfacht*, da Anfragen, + die *während* einem Rebalance auflaufen +- *Lade-Modus - Initialisierung und Abschluss-Bedingung:* + - Wenn durch einen Rebalance in den Lade-Modus gewechselt wird, muss die + *tatsächliche* Offset-Position der zuletzt geschriebenen Nachrichten + für die zugeordneten Partitionen ermittelt werden. + - Anschließend wird ein Seek auf die Offset-Position 0 (später: auf die + in den lokalen Daten gespeicherte Offset-Position) durchgeführt. + - Der Lade-Modus ist abgeschlossen, wenn für alle zugeordneten Partitionen + der zum Rebalance-Zeitpunkt ermittelte Offset der aktuellsten Nachrichten + erreicht ist. + - Wenn ein weiterer Rebalance erfolgt, während der Lade-Modus bereits + aktiv ist, sollte es genügen, die Informationen über die zugeordneten + Partitionen zu aktualisieren und die Aktuellen Offsets für diese neu + zu ermitteln. +- *Lade-Modus vs. Default-Modus:* + - Nur während des Lade-Modus *liest* die `KafkaChatRoomServcie`-Instanz + tatsächlich die Nachrichten aus den zugeordneten Partitionen. + - Im Default-Modus *schreibt* sie die Nachrichten nur in die Partitionen + und speichert sie lokal ab, sobald die *Bestätigung durch den `Producer`* + erfolgt. + - D.h. insbesondere, dass der `KafkaConsumer` im Default-Modus für alle + zugeordneten Partitionen *pausiert* wird! + - Damit die Offset-Positon nicht unnötig zurückfällt, sollte ggf. + regelmäßig für alle zugeordneten Partitionen ein Seek auf die zuletzt + vom Producer bestätigt geschriebenen Offsets durchgeführt werden. + - *Beachte:_ Dies ist nicht nötig, wenn die Offsets eh in den lokal + gespeicherten Daten gehalten und aus diesen wiederhergestellt werden! +- *Umsetzungs-Details:* + - Da die in dem Interface `ConsumerRebalanceListener` definierten Methoden + in einem zeitkritischem Setting laufen, muss das eigentliche Laden der + `ChatRoom`-Zustände separat erfolgen, so dass die Kontrolle schnell an + den `KafkaConsumer` zurückgegeben werden kann. + - Dafür muss der `KafkaChatRoomService` in einen speziellen Lade-Modus + wechseln, der aktiv ist, bis die `ChatRoom`-Instanzen für alle durch + den Rebalance zugeteilten Partitionen aus dem Log wiederhergestellt + wurden. + - Das Lock der `KafkaChatRoomService`-Instanz muss während dieser + gesmaten Phase aufrecht erhalten werden: Es wird erst gelöst, wenn + die Instanz in den normalen Modus zurückwechselt. + - D.h. insbesondere auch, dass während dieser ganzen Phase _alle_ + Anfragen mit `ShardNotOwnedException` abgelehnt werden! + - Eine besondere Herausforderung sind *erneute* Rebalances, die + Auftreten, *während* der `KafkaChatRoomService` sich noch in einem + durch einen vorherigen Rebalance ausgelösten Lade-Modus befindet! diff --git a/pom.xml b/pom.xml index 2819be45..23fcec3f 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,10 @@ org.apache.kafka kafka-clients + + org.springframework.kafka + spring-kafka + org.springframework.boot spring-boot-starter-test @@ -93,6 +97,11 @@ awaitility test + + org.springframework.kafka + spring-kafka-test + test + diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 177d4f51..da58b530 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -16,6 +16,7 @@ public class ChatBackendProperties private int chatroomBufferSize = 8; private ServiceType services = ServiceType.inmemory; private InMemoryServicesProperties inmemory = new InMemoryServicesProperties(); + private KafkaServicesProperties kafka = new KafkaServicesProperties(); @Getter @@ -29,7 +30,18 @@ public class ChatBackendProperties private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString(); } - public enum ServiceType { inmemory } + @Getter + @Setter + public static class KafkaServicesProperties + { + private String clientIdPrefix; + private String bootstrapServers = ":9092"; + private String chatroomChannelTopic = "chatroom_channel"; + private String messageChannelTopic = "message_channel"; + private int numPartitions = 2; + } + + public enum ServiceType { inmemory, kafka } public enum StorageStrategyType { files, mongodb } public enum ShardingStrategyType { none, kafkalike } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index cffc0ad0..b9463095 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -80,6 +80,11 @@ public class ChatRoom extends ChatRoomInfo } + public ChatRoomService getChatRoomService() + { + return service; + } + public Mono getMessage(String username, Long messageId) { Message.MessageKey key = Message.MessageKey.of(username, messageId); diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java index 324e4b02..603795d9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java @@ -7,5 +7,5 @@ import java.util.UUID; public interface ChatRoomFactory { - Mono createChatRoom(UUID id, String name); + Mono createChatRoom(UUID id, String name); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java index 9872ccb1..2bde2361 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java @@ -21,7 +21,7 @@ public class InMemoryChatRoomFactory implements ChatRoomFactory @Override - public Mono createChatRoom(UUID id, String name) + public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); int shard = shardingStrategy.selectShard(id); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java deleted file mode 100644 index 465775f2..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java +++ /dev/null @@ -1,73 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.Message; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.TopicPartition; - -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - - -@RequiredArgsConstructor -@Slf4j -class ChatHomeLoader -{ - private final Producer producer; - private final long offsetOfFirstUnseenMessage; - private final ZoneId zoneId; - @Getter - private final Map kafkaChatRoomServiceMap = new HashMap<>(); - - - /** - * Rebuilds the state of the {@link KafkaChatHomeService} by consuming - * all messages, that belong to the partition, that defines the shard this - * service represents. - * The loader signals, that its work is done, if the given end offset is reached. - * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition. - * @return {@code true}, if all messages are consumed. - */ - boolean handleMessage(ConsumerRecord record) - { - TopicPartition topicPartition = - new TopicPartition(record.topic(), record.partition()); - Message.MessageKey messageKey = Message.MessageKey.of( - record.value().getUser(), - record.value().getId()); - - if (record.offset() >= offsetOfFirstUnseenMessage) - { - // All messages consumed: DONE! - log.trace( - "Ignoring unseen message {} on {}, offset={}", - messageKey, - topicPartition, - record.offset()); - return true; - } - - Instant timestamp = Instant.ofEpochMilli(record.timestamp()); - LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId); - - KafkaChatRoomService service = kafkaChatRoomServiceMap - .computeIfAbsent(record.key(), key -> - new KafkaChatRoomService(producer, topicPartition)); - - service.addMessage(new Message( - messageKey, - record.offset(), - time, - record.value().getText())); - - return false; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java new file mode 100644 index 00000000..ac30f1d3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -0,0 +1,278 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.*; +import java.util.*; +import java.util.stream.IntStream; + + +@Slf4j +public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener +{ + private final String topic; + private final Producer producer; + private final Consumer consumer; + private final ZoneId zoneId; + private final int numShards; + private final boolean[] isShardOwned; + private final long[] currentOffset; + private final long[] nextOffset; + private final Map[] chatrooms; + private final KafkaLikeShardingStrategy shardingStrategy; + + private boolean running; + @Getter + private volatile boolean loadInProgress; + + + public ChatMessageChannel( + String topic, + Producer producer, + Consumer consumer, + ZoneId zoneId, + int numShards) + { + log.debug( + "Creating ChatMessageChannel for topic {} with {} partitions", + topic, + numShards); + this.topic = topic; + this.consumer = consumer; + this.producer = producer; + this.zoneId = zoneId; + this.numShards = numShards; + this.isShardOwned = new boolean[numShards]; + this.currentOffset = new long[numShards]; + this.nextOffset = new long[numShards]; + this.chatrooms = new Map[numShards]; + IntStream + .range(0, numShards) + .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); + this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); + } + + + Mono sendMessage( + UUID chatRoomId, + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + int shard = this.shardingStrategy.selectShard(chatRoomId); + TopicPartition tp = new TopicPartition(topic, shard); + ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + tp.topic(), + tp.partition(), + zdt.toEpochSecond(), + chatRoomId.toString(), + MessageTo.of(key.getUsername(), key.getMessageId(), text)); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + // On successful send + Message message = new Message(key, metadata.offset(), timestamp, text); + log.info("Successfully send message {}", message); + sink.success(message); + } + else + { + // On send-failure + log.error( + "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", + chatRoomId, + key, + timestamp, + text, + exception); + sink.error(exception); + } + })); + }); + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("Newly assigned partitions! Pausing normal operations..."); + loadInProgress = true; + + consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = true; + this.currentOffset[partition] = currentOffset; + + log.info( + "Partition assigned: {} - loading messages: next={} -> current={}", + partition, + nextOffset[partition], + currentOffset); + + consumer.seek(topicPartition, nextOffset[partition]); + }); + + consumer.resume(partitions); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(topicPartition -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = false; + log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + }); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.warn("Lost partitions: {}, partitions"); + // TODO: Muss auf den Verlust anders reagiert werden? + onPartitionsRevoked(partitions); + } + + @Override + public void run() + { + consumer.subscribe(List.of(topic), this); + + running = true; + + while (running) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + log.info("Fetched {} messages", records.count()); + + if (loadInProgress) + { + loadMessages(records); + + if (isLoadingCompleted()) + { + log.info("Loading of messages completed! Pausing all owned partitions..."); + pauseAllOwnedPartions(); + log.info("Resuming normal operations..."); + loadInProgress = false; + } + } + else + { + if (!records.isEmpty()) + { + throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); + } + } + } + catch (WakeupException e) + { + log.info("Received WakeupException, exiting!"); + running = false; + } + } + + log.info("Exiting normally"); + } + + void loadMessages(ConsumerRecords records) + { + for (ConsumerRecord record : records) + { + nextOffset[record.partition()] = record.offset() + 1; + UUID chatRoomId = UUID.fromString(record.key()); + MessageTo messageTo = record.value(); + + Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId()); + + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + + Message message = new Message(key, record.offset(), timestamp, messageTo.getText()); + + ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId); + if (chatRoom == null) + { + // Alles pausieren und erst von putChatRoom wieder resumen lassen! + } + KafkaChatRoomService kafkaChatRoomService = + (KafkaChatRoomService) chatRoom.getChatRoomService(); + + kafkaChatRoomService.persistMessage(message); + } + } + + boolean isLoadingCompleted() + { + return IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); + } + + void pauseAllOwnedPartions() + { + consumer.pause(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> new TopicPartition(topic, shard)) + .toList()); + } + + + void putChatRoom(ChatRoom chatRoom) + { + Integer partition = chatRoom.getShard(); + UUID chatRoomId = chatRoom.getId(); + if (chatrooms[partition].containsKey(chatRoomId)) + { + log.warn("Ignoring existing chat-room: " + chatRoom); + } + else + { + log.info( + "Adding new chat-room to partition {}: {}", + partition, + chatRoom); + + chatrooms[partition].put(chatRoomId, chatRoom); + } + } + + Mono getChatRoom(int shard, UUID id) + { + return Mono.justOrEmpty(chatrooms[shard].get(id)); + } + + Flux getChatRooms() + { + return Flux.fromStream(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> Integer.valueOf(shard)) + .flatMap(shard -> chatrooms[shard].values().stream())); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java deleted file mode 100644 index 562e2df8..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java +++ /dev/null @@ -1,69 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.Message; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.util.UUID; - - -/** - * TODO: - * Actual the only active strategy! - * {@link MessageHandlingStrategy} probably not needed: Refactor! - */ -@RequiredArgsConstructor -@Slf4j -class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy -{ - private final KafkaChatRoomService kafkaChatRoomService; - private final Producer producer; - private final TopicPartition tp; - private final UUID chatRoomId; - private final ZoneOffset zoneOffset; - - - @Override - public Mono handleMessage( - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - tp.topic(), - tp.partition(), - timestamp.toEpochSecond(zoneOffset), - chatRoomId.toString(), - MessageTo.of(key.getUsername(), key.getMessageId(), text)); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - // On successful send - { - // Emit new message - Message message = new Message(key, metadata.offset(), timestamp, text); - kafkaChatRoomService.addMessage(message); - } - - sink.success(); - } - else - { - // On send-failure - sink.error(exception); - } - })); - }); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java new file mode 100644 index 00000000..97ee9885 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -0,0 +1,110 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.*; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import reactor.core.publisher.Mono; + +import java.time.*; +import java.util.List; +import java.util.UUID; + + +@RequiredArgsConstructor +@Slf4j +public class ChatRoomChannel implements Runnable +{ + private final String topic; + private final Producer producer; + private final Consumer consumer; + private final ShardingStrategy shardingStrategy; + private final ChatMessageChannel chatMessageChannel; + private final Clock clock; + private final int bufferSize; + + private boolean running; + + + Mono sendCreateChatRoomRequest( + UUID chatRoomId, + String name) + { + int shard = this.shardingStrategy.selectShard(chatRoomId); + ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId.toString(), name, shard); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + shard, + chatRoomTo); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully send chreate-request for chat room: {}", chatRoomTo); + sink.success(chatRoomTo.toChatRoomInfo()); + } + else + { + // On send-failure + log.error( + "Could not send create-request for chat room (id={}, name={}): {}", + chatRoomId, + name, + exception); + sink.error(exception); + } + })); + }); + } + + @Override + public void run() + { + consumer.assign(List.of(new TopicPartition(topic, 0))); + + running = true; + + while (running) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + log.info("Fetched {} messages", records.count()); + + for (ConsumerRecord record : records) + { + createChatRoom(record.value().toChatRoomInfo()); + } + } + catch (WakeupException e) + { + log.info("Received WakeupException, exiting!"); + running = false; + } + } + + log.info("Exiting normally"); + } + + + void createChatRoom(ChatRoomInfo chatRoomInfo) + { + UUID id = chatRoomInfo.getId(); + String name = chatRoomInfo.getName(); + int shard = chatRoomInfo.getShard(); + log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + chatMessageChannel.putChatRoom(chatRoom); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java deleted file mode 100644 index c7a3c8b3..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java +++ /dev/null @@ -1,44 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.Message; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.TopicPartition; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; - - -/** - * TODO: - * Not used anywhere - * {@link ChatRoomActiveMessageHandlingStrategy} is the only active strategy at the moment. - * Refactor? - */ -class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy -{ - private final Consumer consumer; - private final TopicPartition tp; - private final long currentOffset; - private final long unseenOffset; - - ChatRoomLoadingMessageHandlingStrategy( - Consumer consumer, - TopicPartition tp, - long currentOffset, - long unseenOffset) - { - this.consumer = consumer; - this.tp = tp; - this.currentOffset = currentOffset; - this.unseenOffset = unseenOffset; - - consumer.seek(tp, unseenOffset); - } - - @Override - public Mono handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text) - { - // TODO - return null; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java new file mode 100644 index 00000000..e5649816 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java @@ -0,0 +1,30 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.UUID; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class ChatRoomTo +{ + private String id; + private String name; + private int shard; + + public ChatRoomInfo toChatRoomInfo() + { + return new ChatRoomInfo(UUID.fromString(id), name, shard); + } + + public static ChatRoomTo from(ChatRoom chatRoom) + { + return ChatRoomTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java deleted file mode 100644 index be0136a0..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java +++ /dev/null @@ -1,25 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.common.TopicPartition; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; - - -@RequiredArgsConstructor -class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy -{ - private final int shard; - - @Override - public Mono handleMessage( - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - throw new ShardNotOwnedException(shard); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java new file mode 100644 index 00000000..88947a04 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -0,0 +1,49 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatHome; +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.*; + + +@RequiredArgsConstructor +@Slf4j +public class KafkaChatHome implements ChatHome +{ + private final ShardingStrategy shardingStrategy; + private final ChatMessageChannel chatMessageChanel; + + + @Override + public Mono getChatRoom(UUID id) + { + int shard = shardingStrategy.selectShard(id); + if (chatMessageChanel.isLoadInProgress()) + { + throw new LoadInProgressException(shard); + } + else + { + return chatMessageChanel.getChatRoom(shard, id); + } + } + + @Override + public Flux getChatRooms() + { + if (chatMessageChanel.isLoadInProgress()) + { + throw new LoadInProgressException(); + } + else + { + return chatMessageChanel.getChatRooms(); + } + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java deleted file mode 100644 index eadd7629..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ /dev/null @@ -1,115 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.TopicPartition; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.ZoneId; -import java.util.*; - - -@Slf4j -public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener -{ - private final Consumer consumer; - private final Producer producer; - private final String topic; - private final ZoneId zoneId; - // private final long[] offsets; Erst mal immer alles neu einlesen - private final ChatHomeLoader[] chatHomeLoaders; - private final Map[] chatRoomMaps; - - - public KafkaChatHomeService( - Consumer consumer, - Producer producer, - String topic, - ZoneId zoneId, - int numShards) - { - log.debug("Creating KafkaChatHomeService"); - this.consumer = consumer; - this.producer = producer; - this.topic = topic; - this.zoneId = zoneId; - // this.offsets = new long[numShards]; - // for (int i=0; i< numShards; i++) - // { - // this.offsets[i] = 0l; - // } - this.chatHomeLoaders = new ChatHomeLoader[numShards]; - this.chatRoomMaps = new Map[numShards]; - } - - - @Override - public void onPartitionsAssigned(Collection partitions) - { - consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> - { - if (!topicPartition.topic().equals(topic)) - { - log.warn("Ignoring partition from unwanted topic: {}", topicPartition); - return; - } - - int partition = topicPartition.partition(); - long unseenOffset = 0; // offsets[partition]; - - log.info( - "Loading messages from partition {}: start-offset={} -> current-offset={}", - partition, - unseenOffset, - currentOffset); - - // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]! - consumer.seek(topicPartition, unseenOffset); - chatHomeLoaders[partition] = new ChatHomeLoader( - producer, - currentOffset, - zoneId); - }); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(topicPartition -> - { - if (!topicPartition.topic().equals(topic)) - { - log.warn("Ignoring partition from unwanted topic: {}", topicPartition); - return; - } - - int partition = topicPartition.partition(); - // long unseenOffset = offsets[partition]; TODO: Offset merken...? - }); - log.info("Revoked partitions: {}", partitions); - } - - @Override - public void onPartitionsLost(Collection partitions) - { - // TODO: Muss auf den Verlust anders reagiert werden? - onPartitionsRevoked(partitions); - } - - @Override - public Mono getChatRoom(int shard, UUID id) - { - return Mono.justOrEmpty(chatRoomMaps[shard].get(id)); - } - - @Override - public Flux getChatRooms(int shard) - { - return Flux.fromStream(chatRoomMaps[shard].values().stream()); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java index 20d85e80..825f16eb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -1,17 +1,24 @@ package de.juplo.kafka.chat.backend.persistence.kafka; -import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; import java.util.UUID; +@RequiredArgsConstructor +@Slf4j public class KafkaChatRoomFactory implements ChatRoomFactory { + private final ChatRoomChannel chatRoomChannel; + @Override - public Mono createChatRoom(UUID id, String name) + public Mono createChatRoom(UUID id, String name) { - return null; + log.info("Sending create-request for chat rooom: id={}, name={}"); + return chatRoomChannel.sendCreateChatRoomRequest(id, name); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 794c5c54..09861946 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -1,88 +1,45 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomService; -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.MessageMutationException; +import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.TopicPartition; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.LocalDateTime; import java.util.LinkedHashMap; +import java.util.UUID; +@RequiredArgsConstructor @Slf4j public class KafkaChatRoomService implements ChatRoomService { - private final Producer producer; - private final TopicPartition tp; + private final ChatMessageChannel chatMessageChannel; + private final UUID chatRoomId; private final LinkedHashMap messages = new LinkedHashMap<>(); - private volatile MessageHandlingStrategy strategy; - - - public KafkaChatRoomService( - Producer producer, - TopicPartition tp) - { - this.producer = producer; - this.tp = tp; - this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp.partition()); - } - @Override - synchronized public Mono persistMessage( + public Mono persistMessage( Message.MessageKey key, LocalDateTime timestamp, String text) { - return strategy.handleMessage(key, timestamp, text); + return chatMessageChannel + .sendMessage(chatRoomId, key, timestamp, text) + .doOnSuccess(message -> persistMessage(message)); } - /** - * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über - * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)} - */ - protected void addMessage(Message message) throws MessageMutationException + void persistMessage(Message message) { - Message existingMessage = messages.get(message.getKey()); - - // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel - // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht, - // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde, - // fängt dies bereits der ChatRoom ab. - // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden, - // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ?? - // doppelt aufschlägt... - if (existingMessage == null) - { - messages.put(message.getKey(), message); - } - else - { - if (!existingMessage.getMessageText().equals(message.getMessageText())) - { - throw new MessageMutationException(existingMessage, message.getMessageText()); - } - - // Warn and emit existing message - log.warn( - "Keeping existing message with {}@{} for {}", - existingMessage.getSerialNumber(), - existingMessage.getTimestamp(), - existingMessage.getKey()); - } + messages.put(message.getKey(), message); } @Override synchronized public Mono getMessage(Message.MessageKey key) { - // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen - // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird? return Mono.fromSupplier(() -> messages.get(key)); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java new file mode 100644 index 00000000..ee5834e5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -0,0 +1,83 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "kafka") +@Component +@Slf4j +public class KafkaServicesApplicationRunner implements ApplicationRunner +{ + @Autowired + ThreadPoolTaskExecutor taskExecutor; + @Autowired + ConfigurableApplicationContext context; + + @Autowired + ChatRoomChannel chatRoomChannel; + @Autowired + Consumer chatRoomChannelConsumer; + @Autowired + ChatMessageChannel chatMessageChannel; + @Autowired + Consumer chatMessageChannelConsumer; + + CompletableFuture chatRoomChannelConsumerJob; + CompletableFuture chatMessageChannelConsumerJob; + + + @Override + public void run(ApplicationArguments args) throws Exception + { + log.info("Starting the consumer for the ChatRoomChannel"); + chatRoomChannelConsumerJob = taskExecutor + .submitCompletable(chatRoomChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + return null; + }); + log.info("Starting the consumer for the ChatMessageChannel"); + chatMessageChannelConsumerJob = taskExecutor + .submitCompletable(chatMessageChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatMessageChannel exited abnormally!", e); + return null; + }); + } + + @PreDestroy + public void joinChatRoomChannelConsumerJob() + { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatRoomChannelConsumer.wakeup(); + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + chatRoomChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } + + @PreDestroy + public void joinChatMessageChannelConsumerJob() + { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatMessageChannelConsumer.wakeup(); + log.info("Waiting for the consumer of the ChatMessageChannel to finish its work"); + chatMessageChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatMessageChannel"); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java new file mode 100644 index 00000000..43507792 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -0,0 +1,270 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ChatHome; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.time.Clock; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "kafka") +@Configuration +public class KafkaServicesConfiguration +{ + @Bean + ChatHome kafkaChatHome( + ShardingStrategy shardingStrategy, + ChatMessageChannel chatMessageChannel) + { + return new KafkaChatHome(shardingStrategy, chatMessageChannel); + } + + @Bean + KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel) + { + return new KafkaChatRoomFactory(chatRoomChannel); + } + + @Bean + ChatRoomChannel chatRoomChannel( + ChatBackendProperties properties, + Producer chatRoomChannelProducer, + Consumer chatRoomChannelConsumer, + ShardingStrategy shardingStrategy, + ChatMessageChannel chatMessageChannel, + Clock clock) + { + return new ChatRoomChannel( + properties.getKafka().getChatroomChannelTopic(), + chatRoomChannelProducer, + chatRoomChannelConsumer, + shardingStrategy, + chatMessageChannel, + clock, + properties.getChatroomBufferSize()); + } + + @Bean + Producer chatRoomChannelProducer( + Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, + IntegerSerializer integerSerializer, + JsonSerializer chatRoomSerializer) + { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); + return new KafkaProducer<>( + properties, + integerSerializer, + chatRoomSerializer); + } + + @Bean + IntegerSerializer integerSerializer() + { + return new IntegerSerializer(); + } + + @Bean + JsonSerializer chatRoomSerializer() + { + JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + false); + return serializer; + } + + @Bean + Consumer chatRoomChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + IntegerDeserializer integerDeserializer, + JsonDeserializer chatRoomDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "chat_room_channel"); + return new KafkaConsumer<>( + properties, + integerDeserializer, + chatRoomDeserializer); + } + + @Bean + IntegerDeserializer integerDeserializer() + { + return new IntegerDeserializer(); + } + + @Bean + JsonDeserializer chatRoomDeserializer() + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.USE_TYPE_INFO_HEADERS, false, + JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class, + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), + false ); + return deserializer; + } + + @Bean + ShardingStrategy shardingStrategy(ChatBackendProperties properties) + { + return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions()); + } + + @Bean + ChatMessageChannel chatMessageChannel( + ChatBackendProperties properties, + Producer chatMessageChannelProducer, + Consumer chatMessageChannelConsumer, + ZoneId zoneId) + { + return new ChatMessageChannel( + properties.getKafka().getMessageChannelTopic(), + chatMessageChannelProducer, + chatMessageChannelConsumer, + zoneId, + properties.getKafka().getNumPartitions()); + } + + @Bean + Producer chatMessageChannelProducer( + Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, + StringSerializer stringSerializer, + JsonSerializer messageSerializer) + { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER"); + return new KafkaProducer<>( + properties, + stringSerializer, + messageSerializer); + } + + @Bean + StringSerializer stringSerializer() + { + return new StringSerializer(); + } + + @Bean + JsonSerializer chatMessageSerializer() + { + JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + false); + return serializer; + } + + @Bean + Consumer chatMessageChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + StringDeserializer stringDeserializer, + JsonDeserializer messageDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "chat_message_channel"); + return new KafkaConsumer<>( + properties, + stringDeserializer, + messageDeserializer); + } + + @Bean + StringDeserializer stringDeserializer() + { + return new StringDeserializer(); + } + + @Bean + JsonDeserializer chatMessageDeserializer() + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.USE_TYPE_INFO_HEADERS, false, + JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class, + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), + false ); + return deserializer; + } + + @Bean + Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + properties.setProperty( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + return properties; + } + + @Bean + Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + properties.setProperty( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + properties.setProperty( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + "false"); + properties.setProperty( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest"); + return properties; + } + + @Bean + ZoneId zoneId() + { + return ZoneId.systemDefault(); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java new file mode 100644 index 00000000..83e06bd1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; + + +public class LoadInProgressException extends ShardNotOwnedException +{ + public LoadInProgressException() + { + this(-1); + } + + public LoadInProgressException(int shard) + { + super(shard); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java deleted file mode 100644 index 1fb4c47d..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.Message; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; - - -interface MessageHandlingStrategy -{ - Mono handleMessage( - Message.MessageKey key, - LocalDateTime timestamp, - String text); -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java new file mode 100644 index 00000000..fc2b7c89 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -0,0 +1,46 @@ +package de.juplo.kafka.chat.backend; + +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import org.junit.jupiter.api.BeforeAll; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import java.util.UUID; + +import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.CHATROOMS_TOPIC; +import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.MESSAGES_TOPIC; + + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { + "chat.backend.services=kafka", + "chat.backend.kafka.client-id-PREFIX=TEST", + "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "chat.backend.kafka.chatroom-channel-topic=" + CHATROOMS_TOPIC, + "chat.backend.kafka.message-channel-topic=" + MESSAGES_TOPIC, + "chat.backend.kafka.num-partitions=3" }) +@EmbeddedKafka(topics = { CHATROOMS_TOPIC, MESSAGES_TOPIC }, partitions = 3) +class KafkaConfigurationIT extends AbstractConfigurationIT +{ + final static String CHATROOMS_TOPIC = "TEST_CHATROOM_CHANNEL"; + final static String MESSAGES_TOPIC = "TEST_MESSAGE_CHANNEL"; + + @BeforeAll + public static void test( + @Autowired ShardingStrategy shardingStrategy, + @Autowired KafkaTemplate messageTemplate, + @Autowired KafkaTemplate chatRoomTemplate) + { + UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); + int shard = shardingStrategy.selectShard(chatRoomId); + chatRoomTemplate.send(CHATROOMS_TOPIC, null,"{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }"); + messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }"); + messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }"); + messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }"); + messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }"); + } +} -- 2.20.1