From: Kai Moritz Date: Sun, 22 Jan 2023 17:13:40 +0000 (+0100) Subject: Kafka X-Git-Tag: rebase--2023-08-20~16 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a60e1a9000ac4a8034e0e4d9797facec6805cf2c;p=demos%2Fkafka%2Fchat Kafka Besser: - Create-Requests für ChatRoom's auch in den Message-Channel schreiben - Dann, wenn sie dort gelesen werden, _zusätzlich nachträglich_ in den Chatroom-Channel. - Grund: Dann fällt das hier übrig gebliebene _nicht-triviale_ Problem weg, bzw. löst sich in Wohlgefallen auf, da die Create-Requests automatisch in der richtigen Reihenfolge (also vor allen Messages, für einen bestimmten ChatRoom) in dem Message-Channel gelesen werden Außerdem: - Der Chatroom-Channel wird ("später") auch als allgemeiner Info-Channel benötigt, in den die Instanzen _insbesondere_ auch veröffentlichen, welche Partitionen ihnen gerade zugeordnet sind. - Der Chatroom-Channel sollte daher auf Dauer Info-Channel heißen und der Message-Channel eher allgemeiner Chatroom-Channel (im Sinne von hier alles zum Thema ChatRoom und den daran veröffentlichten Nachrichten...) --- diff --git a/README.txt b/README.txt new file mode 100644 index 00000000..8aaea7be --- /dev/null +++ b/README.txt @@ -0,0 +1,72 @@ +Aktuelle Idee für die Kafka-Anbindung +===================================== + +- *Beobachtung:* Alle schreibenden Anfragen für Nachrichten müssen erst + durch `ChatHomeService.getChatRoom(int, UUID)` den zuständigen + `ChatRoom` ermitteln, bevor sie die Nachricht schreiben können. + - D.h., das Locking, das während einem Rebalance nötig ist, kann + *vollständig* in `KafkaChatHomeService` umgesetzt werden. + - In `KafkaChatRoomService` muss *keinerlei* Kontrolle mehr erfolgen, + ob der `ChatRoom` tatsächlich gerade in die Zuständigkeit der Instanz + fällt, da die Anfragen *hier nie ankommen*, wenn die Instanz nicht + zuständig ist, da sie dann bereits in `getChatRoom(int, UUID)` + abgelehnt werden! + - Die in der Domain-Klasse `ChatRoom` definierte Logik, für die + Behandlung doppelter Nachrichten *ist vollständig valide*, da Anfragen + für einen bestimmten `ChatRoom` dort (bei korrekt implementiertem Locking + in `KafkaChatHomeService`) nur ankommen, wenn die Instanz *tatsächlich* + für den `ChatRoom` zuständig ist. + - D.h. insbesondere auch, dass die Antwort dort (also in dem `ChatRoom`) + erst ankommen, wenn dieser *vollständig geladen* ist, so dass die lokale + Kontrolle auf doppelte Nachrichten logisch gültig ist. +- *Anforderung:* Wenn ein Rebalance aktiv ist, wird die Instanz gelockt. + - Das Locking erfolg in `KafkaChatRoomService`, durch das alle Anfragen + durchgreifen müssen, so dass hier *zentral alle Aktionen* auf einzelnen + `ChatRoom`-Instanzen *unterbunden* werden können. +- *Vereinfachung:* Wenn `KafkaChatRoomService` gelockt ist, wird für alle + Zugriffe eine `ShardNotOwnedException` erzeugt. + - Dadurch wird das Zustands-Handling *extrem vereinfacht*, da Anfragen, + die *während* einem Rebalance auflaufen +- *Lade-Modus - Initialisierung und Abschluss-Bedingung:* + - Wenn durch einen Rebalance in den Lade-Modus gewechselt wird, muss die + *tatsächliche* Offset-Position der zuletzt geschriebenen Nachrichten + für die zugeordneten Partitionen ermittelt werden. + - Anschließend wird ein Seek auf die Offset-Position 0 (später: auf die + in den lokalen Daten gespeicherte Offset-Position) durchgeführt. + - Der Lade-Modus ist abgeschlossen, wenn für alle zugeordneten Partitionen + der zum Rebalance-Zeitpunkt ermittelte Offset der aktuellsten Nachrichten + erreicht ist. + - Wenn ein weiterer Rebalance erfolgt, während der Lade-Modus bereits + aktiv ist, sollte es genügen, die Informationen über die zugeordneten + Partitionen zu aktualisieren und die Aktuellen Offsets für diese neu + zu ermitteln. +- *Lade-Modus vs. Default-Modus:* + - Nur während des Lade-Modus *liest* die `KafkaChatRoomServcie`-Instanz + tatsächlich die Nachrichten aus den zugeordneten Partitionen. + - Im Default-Modus *schreibt* sie die Nachrichten nur in die Partitionen + und speichert sie lokal ab, sobald die *Bestätigung durch den `Producer`* + erfolgt. + - D.h. insbesondere, dass der `KafkaConsumer` im Default-Modus für alle + zugeordneten Partitionen *pausiert* wird! + - Damit die Offset-Positon nicht unnötig zurückfällt, sollte ggf. + regelmäßig für alle zugeordneten Partitionen ein Seek auf die zuletzt + vom Producer bestätigt geschriebenen Offsets durchgeführt werden. + - *Beachte:_ Dies ist nicht nötig, wenn die Offsets eh in den lokal + gespeicherten Daten gehalten und aus diesen wiederhergestellt werden! +- *Umsetzungs-Details:* + - Da die in dem Interface `ConsumerRebalanceListener` definierten Methoden + in einem zeitkritischem Setting laufen, muss das eigentliche Laden der + `ChatRoom`-Zustände separat erfolgen, so dass die Kontrolle schnell an + den `KafkaConsumer` zurückgegeben werden kann. + - Dafür muss der `KafkaChatRoomService` in einen speziellen Lade-Modus + wechseln, der aktiv ist, bis die `ChatRoom`-Instanzen für alle durch + den Rebalance zugeteilten Partitionen aus dem Log wiederhergestellt + wurden. + - Das Lock der `KafkaChatRoomService`-Instanz muss während dieser + gesmaten Phase aufrecht erhalten werden: Es wird erst gelöst, wenn + die Instanz in den normalen Modus zurückwechselt. + - D.h. insbesondere auch, dass während dieser ganzen Phase _alle_ + Anfragen mit `ShardNotOwnedException` abgelehnt werden! + - Eine besondere Herausforderung sind *erneute* Rebalances, die + Auftreten, *während* der `KafkaChatRoomService` sich noch in einem + durch einen vorherigen Rebalance ausgelösten Lade-Modus befindet! diff --git a/pom.xml b/pom.xml index 2819be45..23fcec3f 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,10 @@ org.apache.kafka kafka-clients + + org.springframework.kafka + spring-kafka + org.springframework.boot spring-boot-starter-test @@ -93,6 +97,11 @@ awaitility test + + org.springframework.kafka + spring-kafka-test + test + diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index def9de17..0bcfc9e5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -16,6 +16,7 @@ public class ChatBackendProperties private int chatroomBufferSize = 8; private ServiceType services = ServiceType.inmemory; private InMemoryServicesProperties inmemory = new InMemoryServicesProperties(); + private KafkaServicesProperties kafka = new KafkaServicesProperties(); @Getter @@ -29,7 +30,18 @@ public class ChatBackendProperties private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString(); } - public enum ServiceType { inmemory } + @Getter + @Setter + public static class KafkaServicesProperties + { + private String clientIdPrefix; + private String bootstrapServers = ":9092"; + private String chatroomChannelTopic = "chatroom_channel"; + private String messageChannelTopic = "message_channel"; + private int numPartitions = 2; + } + + public enum ServiceType { inmemory, kafka } public enum StorageStrategyType { none, files, mongodb } public enum ShardingStrategyType { none, kafkalike } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index cffc0ad0..b9463095 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -80,6 +80,11 @@ public class ChatRoom extends ChatRoomInfo } + public ChatRoomService getChatRoomService() + { + return service; + } + public Mono getMessage(String username, Long messageId) { Message.MessageKey key = Message.MessageKey.of(username, messageId); diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java index 324e4b02..603795d9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java @@ -7,5 +7,5 @@ import java.util.UUID; public interface ChatRoomFactory { - Mono createChatRoom(UUID id, String name); + Mono createChatRoom(UUID id, String name); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java index 9872ccb1..2bde2361 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java @@ -21,7 +21,7 @@ public class InMemoryChatRoomFactory implements ChatRoomFactory @Override - public Mono createChatRoom(UUID id, String name) + public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); int shard = shardingStrategy.selectShard(id); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractMessageTo.java new file mode 100644 index 00000000..85a194e9 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractMessageTo.java @@ -0,0 +1,18 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +public class AbstractMessageTo +{ + public enum ToType { + CREATE_CHATROOM_COMMAND, + CHATMESSAGE_EVENT, + } + + @Getter + private final ToType type; +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java new file mode 100644 index 00000000..08975a9d --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -0,0 +1,373 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.*; +import java.util.*; +import java.util.stream.IntStream; + + +@Slf4j +public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener +{ + private final String topic; + private final Producer producer; + private final Consumer consumer; + private final ZoneId zoneId; + private final int numShards; + private final int bufferSize; + private final Clock clock; + private final boolean[] isShardOwned; + private final long[] currentOffset; + private final long[] nextOffset; + private final Map[] chatrooms; + + private boolean running; + @Getter + private volatile boolean loadInProgress; + + + public ChatMessageChannel( + String topic, + Producer producer, + Consumer consumer, + ZoneId zoneId, + int numShards, + int bufferSize, + Clock clock) + { + log.debug( + "Creating ChatMessageChannel for topic {} with {} partitions", + topic, + numShards); + this.topic = topic; + this.consumer = consumer; + this.producer = producer; + this.zoneId = zoneId; + this.numShards = numShards; + this.bufferSize = bufferSize; + this.clock = clock; + this.isShardOwned = new boolean[numShards]; + this.currentOffset = new long[numShards]; + this.nextOffset = new long[numShards]; + this.chatrooms = new Map[numShards]; + IntStream + .range(0, numShards) + .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); + } + + + + Mono sendCreateChatRoomRequest( + UUID chatRoomId, + String name) + { + CreateChatRoomCommandTo createChatRoomRequestTo = CreateChatRoomCommandTo.of(name); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + chatRoomId.toString(), + createChatRoomRequestTo); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); + createChatRoom(chatRoomInfo); + sink.success(chatRoomInfo); + } + else + { + // On send-failure + log.error( + "Could not send create-request for chat room (id={}, name={}): {}", + chatRoomId, + name, + exception); + sink.error(exception); + } + })); + }); + } + + Mono sendChatMessage( + UUID chatRoomId, + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + null, + zdt.toEpochSecond(), + chatRoomId.toString(), + ChatMessageReceivedEventTo.of(key.getUsername(), key.getMessageId(), text)); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + // On successful send + Message message = new Message(key, metadata.offset(), timestamp, text); + log.info("Successfully send message {}", message); + sink.success(message); + } + else + { + // On send-failure + log.error( + "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", + chatRoomId, + key, + timestamp, + text, + exception); + sink.error(exception); + } + })); + }); + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("Newly assigned partitions! Pausing normal operations..."); + loadInProgress = true; + + consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = true; + this.currentOffset[partition] = currentOffset; + + log.info( + "Partition assigned: {} - loading messages: next={} -> current={}", + partition, + nextOffset[partition], + currentOffset); + + consumer.seek(topicPartition, nextOffset[partition]); + }); + + consumer.resume(partitions); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(topicPartition -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = false; + log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + }); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.warn("Lost partitions: {}, partitions"); + // TODO: Muss auf den Verlust anders reagiert werden? + onPartitionsRevoked(partitions); + } + + @Override + public void run() + { + consumer.subscribe(List.of(topic), this); + + running = true; + + while (running) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + log.info("Fetched {} messages", records.count()); + + if (loadInProgress) + { + loadChatRoom(records); + + if (isLoadingCompleted()) + { + log.info("Loading of messages completed! Pausing all owned partitions..."); + pauseAllOwnedPartions(); + log.info("Resuming normal operations..."); + loadInProgress = false; + } + } + else + { + if (!records.isEmpty()) + { + throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); + } + } + } + catch (WakeupException e) + { + log.info("Received WakeupException, exiting!"); + running = false; + } + } + + log.info("Exiting normally"); + } + + void loadChatRoom(ConsumerRecords records) + { + for (ConsumerRecord record : records) + { + UUID chatRoomId = UUID.fromString(record.key()); + + switch (record.value().getType()) + { + case CREATE_CHATROOM_COMMAND: + createChatRoom( + chatRoomId, + (CreateChatRoomCommandTo) record.value(), + record.partition()); + break; + + case CHATMESSAGE_EVENT: + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + loadChatMessage( + chatRoomId, + timestamp, + record.offset(), + (ChatMessageReceivedEventTo) record.value(), + record.partition()); + break; + + default: + log.debug( + "Ignoring message for chat-room {} with offset {}: {}", + chatRoomId, + record.offset(), + record.value()); + } + + nextOffset[record.partition()] = record.offset() + 1; + } + } + + void createChatRoom( + UUID chatRoomId, + CreateChatRoomCommandTo createChatRoomRequestTo, + int partition) + { + log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); + ChatRoom chatRoom = new ChatRoom( + chatRoomId, + createChatRoomRequestTo.getName(), + partition, + clock, + service, + bufferSize); + putChatRoom(chatRoom); + } + + + void createChatRoom(ChatRoomInfo chatRoomInfo) + { + UUID id = chatRoomInfo.getId(); + String name = chatRoomInfo.getName(); + int shard = chatRoomInfo.getShard(); + log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, id); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + putChatRoom(chatRoom); + } + + void loadChatMessage( + UUID chatRoomId, + LocalDateTime timestamp, + long offset, + ChatMessageReceivedEventTo chatMessageTo, + int partition) + { + Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); + Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); + + ChatRoom chatRoom = chatrooms[partition].get(chatRoomId); + KafkaChatRoomService kafkaChatRoomService = + (KafkaChatRoomService) chatRoom.getChatRoomService(); + + kafkaChatRoomService.persistMessage(message); + } + + boolean isLoadingCompleted() + { + return IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); + } + + void pauseAllOwnedPartions() + { + consumer.pause(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> new TopicPartition(topic, shard)) + .toList()); + } + + + private void putChatRoom(ChatRoom chatRoom) + { + Integer partition = chatRoom.getShard(); + UUID chatRoomId = chatRoom.getId(); + if (chatrooms[partition].containsKey(chatRoomId)) + { + log.warn("Ignoring existing chat-room: " + chatRoom); + } + else + { + log.info( + "Adding new chat-room to partition {}: {}", + partition, + chatRoom); + + chatrooms[partition].put(chatRoomId, chatRoom); + } + } + + Mono getChatRoom(int shard, UUID id) + { + return Mono.justOrEmpty(chatrooms[shard].get(id)); + } + + Flux getChatRooms() + { + return Flux + .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) + .filter(shard -> isShardOwned[shard]) + .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values())); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventTo.java new file mode 100644 index 00000000..2bc7a034 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventTo.java @@ -0,0 +1,48 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.*; + +import java.time.LocalDateTime; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class ChatMessageReceivedEventTo extends AbstractMessageTo +{ + private String user; + private Long id; + private String text; + + + public ChatMessageReceivedEventTo() + { + super(ToType.CHATMESSAGE_EVENT); + } + + + public Message toMessage(long offset, LocalDateTime timestamp) + { + return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); + } + + public static ChatMessageReceivedEventTo from(Message message) + { + return ChatMessageReceivedEventTo.of( + message.getUsername(), + message.getId(), + message.getMessageText()); + } + + + public static ChatMessageReceivedEventTo of(String user, Long id, String text) + { + ChatMessageReceivedEventTo to = new ChatMessageReceivedEventTo(); + to.user = user; + to.id = id; + to.text = text; + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandTo.java new file mode 100644 index 00000000..b2601fb8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandTo.java @@ -0,0 +1,27 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import lombok.*; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class CreateChatRoomCommandTo extends AbstractMessageTo +{ + private String name; + + + public CreateChatRoomCommandTo() + { + super(ToType.CREATE_CHATROOM_COMMAND); + } + + + public static CreateChatRoomCommandTo of(String name) + { + CreateChatRoomCommandTo to = new CreateChatRoomCommandTo(); + to.name = name; + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java new file mode 100644 index 00000000..77790bd6 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -0,0 +1,43 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatHome; +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.*; + + +@RequiredArgsConstructor +@Slf4j +public class KafkaChatHome implements ChatHome +{ + private final KafkaLikeShardingStrategy shardingStrategy; + private final ChatMessageChannel chatMessageChanel; + + + @Override + public Mono getChatRoom(UUID id) + { + int shard = shardingStrategy.selectShard(id); + if (chatMessageChanel.isLoadInProgress()) + { + throw new LoadInProgressException(shard); + } + else + { + return chatMessageChanel.getChatRoom(shard, id); + } + } + + @Override + public Flux getChatRooms() + { + return chatMessageChanel.getChatRooms(); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java new file mode 100644 index 00000000..c46529d8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -0,0 +1,24 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; + +import java.util.UUID; + + +@RequiredArgsConstructor +@Slf4j +public class KafkaChatRoomFactory implements ChatRoomFactory +{ + private final ChatMessageChannel chatMessageChannel; + + @Override + public Mono createChatRoom(UUID id, String name) + { + log.info("Sending create-request for chat rooom: id={}, name={}"); + return chatMessageChannel.sendCreateChatRoomRequest(id, name); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java new file mode 100644 index 00000000..77ecf1ca --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -0,0 +1,58 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.util.LinkedHashMap; +import java.util.UUID; + + +@RequiredArgsConstructor +@Slf4j +public class KafkaChatRoomService implements ChatRoomService +{ + private final ChatMessageChannel chatMessageChannel; + private final UUID chatRoomId; + + private final LinkedHashMap messages = new LinkedHashMap<>(); + + + @Override + public Mono persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + return chatMessageChannel + .sendChatMessage(chatRoomId, key, timestamp, text) + .doOnSuccess(message -> persistMessage(message)); + } + + void persistMessage(Message message) + { + messages.put (message.getKey(), message); + } + + @Override + synchronized public Mono getMessage(Message.MessageKey key) + { + return Mono.fromSupplier(() -> messages.get(key)); + } + + @Override + synchronized public Flux getMessages(long first, long last) + { + return Flux.fromStream(messages + .values() + .stream() + .filter(message -> + { + long serial = message.getSerialNumber(); + return serial >= first && serial <= last; + })); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java new file mode 100644 index 00000000..f0dc3155 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -0,0 +1,60 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "kafka") +@Component +@Slf4j +public class KafkaServicesApplicationRunner implements ApplicationRunner +{ + @Autowired + ThreadPoolTaskExecutor taskExecutor; + @Autowired + ConfigurableApplicationContext context; + + @Autowired + ChatMessageChannel chatMessageChannel; + @Autowired + Consumer chatMessageChannelConsumer; + + CompletableFuture chatMessageChannelConsumerJob; + + + @Override + public void run(ApplicationArguments args) throws Exception + { + log.info("Starting the consumer for the ChatMessageChannel"); + chatMessageChannelConsumerJob = taskExecutor + .submitCompletable(chatMessageChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatMessageChannel exited abnormally!", e); + return null; + }); + } + + @PreDestroy + public void joinChatMessageChannelConsumerJob() + { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatMessageChannelConsumer.wakeup(); + log.info("Waiting for the consumer of the ChatMessageChannel to finish its work"); + chatMessageChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatMessageChannel"); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java new file mode 100644 index 00000000..84919c71 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -0,0 +1,185 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ChatHome; +import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.time.Clock; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "kafka") +@Configuration +public class KafkaServicesConfiguration +{ + @Bean + ChatHome kafkaChatHome( + KafkaLikeShardingStrategy shardingStrategy, + ChatMessageChannel chatMessageChannel) + { + return new KafkaChatHome(shardingStrategy, chatMessageChannel); + } + + @Bean + KafkaChatRoomFactory chatRoomFactory(ChatMessageChannel chatMessageChannel) + { + return new KafkaChatRoomFactory(chatMessageChannel); + } + + @Bean + KafkaLikeShardingStrategy shardingStrategy(ChatBackendProperties properties) + { + return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions()); + } + + @Bean + ChatMessageChannel chatMessageChannel( + ChatBackendProperties properties, + Producer chatMessageChannelProducer, + Consumer chatMessageChannelConsumer, + ZoneId zoneId, + Clock clock) + { + return new ChatMessageChannel( + properties.getKafka().getMessageChannelTopic(), + chatMessageChannelProducer, + chatMessageChannelConsumer, + zoneId, + properties.getKafka().getNumPartitions(), + properties.getChatroomBufferSize(), + clock); + } + + @Bean + Producer chatMessageChannelProducer( + Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, + StringSerializer stringSerializer, + JsonSerializer messageSerializer) + { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER"); + return new KafkaProducer<>( + properties, + stringSerializer, + messageSerializer); + } + + @Bean + StringSerializer stringSerializer() + { + return new StringSerializer(); + } + + @Bean + JsonSerializer chatMessageSerializer(String typeMappings) + { + JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of( + JsonSerializer.TYPE_MAPPINGS, typeMappings), + false); + return serializer; + } + + @Bean + Consumer chatMessageChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + StringDeserializer stringDeserializer, + JsonDeserializer messageDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "chat_message_channel"); + return new KafkaConsumer<>( + properties, + stringDeserializer, + messageDeserializer); + } + + @Bean + StringDeserializer stringDeserializer() + { + return new StringDeserializer(); + } + + @Bean + JsonDeserializer chatMessageDeserializer(String typeMappings) + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(), + JsonDeserializer.TYPE_MAPPINGS, typeMappings), + false ); + return deserializer; + } + + @Bean + String typeMappings () + { + return + "create:" + CreateChatRoomCommandTo.class.getCanonicalName() + "," + + "message:" + ChatMessageReceivedEventTo.class.getCanonicalName(); + } + + @Bean + Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + properties.setProperty( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + return properties; + } + + @Bean + Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + properties.setProperty( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + properties.setProperty( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + "false"); + properties.setProperty( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest"); + return properties; + } + + @Bean + ZoneId zoneId() + { + return ZoneId.systemDefault(); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java new file mode 100644 index 00000000..83e06bd1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; + + +public class LoadInProgressException extends ShardNotOwnedException +{ + public LoadInProgressException() + { + this(-1); + } + + public LoadInProgressException(int shard) + { + super(shard); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java new file mode 100644 index 00000000..38c170e7 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -0,0 +1,53 @@ +package de.juplo.kafka.chat.backend; + +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.BeforeAll; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import java.util.UUID; + +import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.CHATROOMS_TOPIC; +import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.MESSAGES_TOPIC; + + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { + "chat.backend.services=kafka", + "chat.backend.kafka.client-id-PREFIX=TEST", + "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "chat.backend.kafka.chatroom-channel-topic=" + CHATROOMS_TOPIC, + "chat.backend.kafka.message-channel-topic=" + MESSAGES_TOPIC, + "chat.backend.kafka.num-partitions=3" }) +@EmbeddedKafka(topics = { CHATROOMS_TOPIC, MESSAGES_TOPIC }, partitions = 3) +class KafkaConfigurationIT extends AbstractConfigurationIT +{ + final static String CHATROOMS_TOPIC = "TEST_CHATROOM_CHANNEL"; + final static String MESSAGES_TOPIC = "TEST_MESSAGE_CHANNEL"; + + @BeforeAll + public static void test( + @Autowired ShardingStrategy shardingStrategy, + @Autowired KafkaTemplate messageTemplate) + { + UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); + int shard = shardingStrategy.selectShard(chatRoomId); + send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }", "create"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "message"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "message"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "message"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "message"); + } + + static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) + { + ProducerRecord record = new ProducerRecord<>(MESSAGES_TOPIC, key, value); + record.headers().add("__TypeId__", typeId.getBytes()); + kafkaTemplate.send(record); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventToTest.java new file mode 100644 index 00000000..be612c8b --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventToTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class ChatMessageReceivedEventToTest +{ + final String json = """ + { + "id": 1, + "text": "Hallo, ich heiße Peter!", + "user": "Peter" + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + ChatMessageReceivedEventTo message = mapper.readValue(json, ChatMessageReceivedEventTo.class); + assertThat(message.getId()).isEqualTo(1l); + assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); + assertThat(message.getUser()).isEqualTo("Peter"); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandToTest.java new file mode 100644 index 00000000..71a29715 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandToTest.java @@ -0,0 +1,35 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class CreateChatRoomCommandToTest +{ + final String json = """ + { + "name": "Foo-Room!" + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + CreateChatRoomCommandTo message = mapper.readValue(json, CreateChatRoomCommandTo.class); + assertThat(message.getName()).isEqualTo("Foo-Room!"); + } +}