From e7af512057440075a779ff5a5401dd11fc962741 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 22 Jan 2023 18:13:40 +0100 Subject: [PATCH] feat: first runnable implementation, that is based on Kafka --- pom.xml | 9 + .../chat/backend/ChatBackendProperties.java | 13 +- .../kafka/chat/backend/domain/ChatRoom.java | 5 + .../chat/backend/domain/ChatRoomFactory.java | 2 +- .../inmemory/InMemoryChatRoomFactory.java | 2 +- .../persistence/kafka/ChatRoomChannel.java | 385 ++++++++++++++++++ .../persistence/kafka/KafkaChatHome.java | 40 ++ .../kafka/KafkaChatRoomFactory.java | 24 ++ .../kafka/KafkaChatRoomService.java | 58 +++ .../kafka/KafkaServicesApplicationRunner.java | 68 ++++ .../kafka/KafkaServicesConfiguration.java | 183 +++++++++ .../kafka/LoadInProgressException.java | 17 + .../kafka/messages/AbstractMessageTo.java | 18 + .../messages/CommandCreateChatRoomTo.java | 27 ++ .../messages/EventChatMessageReceivedTo.java | 48 +++ .../chat/backend/KafkaConfigurationIT.java | 92 +++++ .../messages/CommandCreateChatRoomToTest.java | 35 ++ .../EventChatMessageReceivedToTest.java | 39 ++ 18 files changed, 1062 insertions(+), 3 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java create mode 100644 src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java create mode 100644 src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java create mode 100644 src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java diff --git a/pom.xml b/pom.xml index 2819be45..23fcec3f 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,10 @@ org.apache.kafka kafka-clients + + org.springframework.kafka + spring-kafka + org.springframework.boot spring-boot-starter-test @@ -93,6 +97,11 @@ awaitility test + + org.springframework.kafka + spring-kafka-test + test + diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index def9de17..d772fc9a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -16,6 +16,7 @@ public class ChatBackendProperties private int chatroomBufferSize = 8; private ServiceType services = ServiceType.inmemory; private InMemoryServicesProperties inmemory = new InMemoryServicesProperties(); + private KafkaServicesProperties kafka = new KafkaServicesProperties(); @Getter @@ -29,7 +30,17 @@ public class ChatBackendProperties private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString(); } - public enum ServiceType { inmemory } + @Getter + @Setter + public static class KafkaServicesProperties + { + private String clientIdPrefix = "DEV"; + private String bootstrapServers = ":9092"; + private String chatRoomChannelTopic = "message_channel"; + private int numPartitions = 2; + } + + public enum ServiceType { inmemory, kafka } public enum StorageStrategyType { none, files, mongodb } public enum ShardingStrategyType { none, kafkalike } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index cffc0ad0..b9463095 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -80,6 +80,11 @@ public class ChatRoom extends ChatRoomInfo } + public ChatRoomService getChatRoomService() + { + return service; + } + public Mono getMessage(String username, Long messageId) { Message.MessageKey key = Message.MessageKey.of(username, messageId); diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java index 324e4b02..603795d9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java @@ -7,5 +7,5 @@ import java.util.UUID; public interface ChatRoomFactory { - Mono createChatRoom(UUID id, String name); + Mono createChatRoom(UUID id, String name); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java index 9872ccb1..2bde2361 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java @@ -21,7 +21,7 @@ public class InMemoryChatRoomFactory implements ChatRoomFactory @Override - public Mono createChatRoom(UUID id, String name) + public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); int shard = shardingStrategy.selectShard(id); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java new file mode 100644 index 00000000..7f93ad8b --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -0,0 +1,385 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.*; +import java.util.*; +import java.util.stream.IntStream; + + +@Slf4j +public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener +{ + private final String topic; + private final Producer producer; + private final Consumer consumer; + private final ZoneId zoneId; + private final int numShards; + private final int bufferSize; + private final Clock clock; + private final boolean[] isShardOwned; + private final long[] currentOffset; + private final long[] nextOffset; + private final Map[] chatrooms; + + private boolean running; + @Getter + private volatile boolean loadInProgress; + + + public ChatRoomChannel( + String topic, + Producer producer, + Consumer consumer, + ZoneId zoneId, + int numShards, + int bufferSize, + Clock clock) + { + log.debug( + "Creating ChatRoomChannel for topic {} with {} partitions", + topic, + numShards); + this.topic = topic; + this.consumer = consumer; + this.producer = producer; + this.zoneId = zoneId; + this.numShards = numShards; + this.bufferSize = bufferSize; + this.clock = clock; + this.isShardOwned = new boolean[numShards]; + this.currentOffset = new long[numShards]; + this.nextOffset = new long[numShards]; + this.chatrooms = new Map[numShards]; + IntStream + .range(0, numShards) + .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); + } + + + + Mono sendCreateChatRoomRequest( + UUID chatRoomId, + String name) + { + CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + chatRoomId.toString(), + createChatRoomRequestTo); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); + createChatRoom(chatRoomInfo); + sink.success(chatRoomInfo); + } + else + { + // On send-failure + log.error( + "Could not send create-request for chat room (id={}, name={}): {}", + chatRoomId, + name, + exception); + sink.error(exception); + } + })); + }); + } + + Mono sendChatMessage( + UUID chatRoomId, + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + null, + zdt.toEpochSecond(), + chatRoomId.toString(), + EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + // On successful send + Message message = new Message(key, metadata.offset(), timestamp, text); + log.info("Successfully send message {}", message); + sink.success(message); + } + else + { + // On send-failure + log.error( + "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", + chatRoomId, + key, + timestamp, + text, + exception); + sink.error(exception); + } + })); + }); + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("Newly assigned partitions! Pausing normal operations..."); + loadInProgress = true; + + consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = true; + this.currentOffset[partition] = currentOffset; + + log.info( + "Partition assigned: {} - loading messages: next={} -> current={}", + partition, + nextOffset[partition], + currentOffset); + + consumer.seek(topicPartition, nextOffset[partition]); + }); + + consumer.resume(partitions); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(topicPartition -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = false; + log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + }); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.warn("Lost partitions: {}, partitions"); + // TODO: Muss auf den Verlust anders reagiert werden? + onPartitionsRevoked(partitions); + } + + @Override + public void run() + { + running = true; + + while (running) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + log.info("Fetched {} messages", records.count()); + + if (loadInProgress) + { + loadChatRoom(records); + + if (isLoadingCompleted()) + { + log.info("Loading of messages completed! Pausing all owned partitions..."); + pauseAllOwnedPartions(); + log.info("Resuming normal operations..."); + loadInProgress = false; + } + } + else + { + if (!records.isEmpty()) + { + throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); + } + } + } + catch (WakeupException e) + { + log.info("Received WakeupException, exiting!"); + running = false; + } + } + + log.info("Exiting normally"); + } + + private void loadChatRoom(ConsumerRecords records) + { + for (ConsumerRecord record : records) + { + UUID chatRoomId = UUID.fromString(record.key()); + + switch (record.value().getType()) + { + case COMMAND_CREATE_CHATROOM: + createChatRoom( + chatRoomId, + (CommandCreateChatRoomTo) record.value(), + record.partition()); + break; + + case EVENT_CHATMESSAGE_RECEIVED: + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + loadChatMessage( + chatRoomId, + timestamp, + record.offset(), + (EventChatMessageReceivedTo) record.value(), + record.partition()); + break; + + default: + log.debug( + "Ignoring message for chat-room {} with offset {}: {}", + chatRoomId, + record.offset(), + record.value()); + } + + nextOffset[record.partition()] = record.offset() + 1; + } + } + + private void createChatRoom( + UUID chatRoomId, + CommandCreateChatRoomTo createChatRoomRequestTo, + int partition) + { + log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); + ChatRoom chatRoom = new ChatRoom( + chatRoomId, + createChatRoomRequestTo.getName(), + partition, + clock, + service, + bufferSize); + putChatRoom(chatRoom); + } + + + private void createChatRoom(ChatRoomInfo chatRoomInfo) + { + UUID id = chatRoomInfo.getId(); + String name = chatRoomInfo.getName(); + int shard = chatRoomInfo.getShard(); + log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, id); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + putChatRoom(chatRoom); + } + + private void loadChatMessage( + UUID chatRoomId, + LocalDateTime timestamp, + long offset, + EventChatMessageReceivedTo chatMessageTo, + int partition) + { + Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); + Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); + + ChatRoom chatRoom = chatrooms[partition].get(chatRoomId); + KafkaChatRoomService kafkaChatRoomService = + (KafkaChatRoomService) chatRoom.getChatRoomService(); + + kafkaChatRoomService.persistMessage(message); + } + + private boolean isLoadingCompleted() + { + return IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); + } + + private void pauseAllOwnedPartions() + { + consumer.pause(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> new TopicPartition(topic, shard)) + .toList()); + } + + + private void putChatRoom(ChatRoom chatRoom) + { + Integer partition = chatRoom.getShard(); + UUID chatRoomId = chatRoom.getId(); + if (chatrooms[partition].containsKey(chatRoomId)) + { + log.warn("Ignoring existing chat-room: " + chatRoom); + } + else + { + log.info( + "Adding new chat-room to partition {}: {}", + partition, + chatRoom); + + chatrooms[partition].put(chatRoomId, chatRoom); + } + } + + Mono getChatRoom(int shard, UUID id) + { + if (loadInProgress) + { + throw new LoadInProgressException(shard); + } + + if (!isShardOwned[shard]) + { + throw new ShardNotOwnedException(shard); + } + + return Mono.justOrEmpty(chatrooms[shard].get(id)); + } + + Flux getChatRooms() + { + return Flux + .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) + .filter(shard -> isShardOwned[shard]) + .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values())); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java new file mode 100644 index 00000000..67990409 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -0,0 +1,40 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatHome; +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.utils.Utils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.*; + + +@RequiredArgsConstructor +@Slf4j +public class KafkaChatHome implements ChatHome +{ + private final int numPartitions; + private final ChatRoomChannel chatRoomChannel; + + + @Override + public Mono getChatRoom(UUID id) + { + int shard = selectShard(id); + return chatRoomChannel.getChatRoom(shard, id); + } + + int selectShard(UUID chatRoomId) + { + byte[] serializedKey = chatRoomId.toString().getBytes(); + return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; + } + + @Override + public Flux getChatRooms() + { + return chatRoomChannel.getChatRooms(); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java new file mode 100644 index 00000000..6a1dc78a --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -0,0 +1,24 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; + +import java.util.UUID; + + +@RequiredArgsConstructor +@Slf4j +public class KafkaChatRoomFactory implements ChatRoomFactory +{ + private final ChatRoomChannel chatRoomChannel; + + @Override + public Mono createChatRoom(UUID id, String name) + { + log.info("Sending create-command for chat rooom: id={}, name={}"); + return chatRoomChannel.sendCreateChatRoomRequest(id, name); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java new file mode 100644 index 00000000..7b2cc0b1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -0,0 +1,58 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.util.LinkedHashMap; +import java.util.UUID; + + +@RequiredArgsConstructor +@Slf4j +public class KafkaChatRoomService implements ChatRoomService +{ + private final ChatRoomChannel chatRoomChannel; + private final UUID chatRoomId; + + private final LinkedHashMap messages = new LinkedHashMap<>(); + + + @Override + public Mono persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + return chatRoomChannel + .sendChatMessage(chatRoomId, key, timestamp, text) + .doOnSuccess(message -> persistMessage(message)); + } + + void persistMessage(Message message) + { + messages.put (message.getKey(), message); + } + + @Override + synchronized public Mono getMessage(Message.MessageKey key) + { + return Mono.fromSupplier(() -> messages.get(key)); + } + + @Override + synchronized public Flux getMessages(long first, long last) + { + return Flux.fromStream(messages + .values() + .stream() + .filter(message -> + { + long serial = message.getSerialNumber(); + return serial >= first && serial <= last; + })); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java new file mode 100644 index 00000000..8474239b --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -0,0 +1,68 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "kafka") +@Component +@Slf4j +public class KafkaServicesApplicationRunner implements ApplicationRunner +{ + @Autowired + ChatBackendProperties properties; + + @Autowired + ThreadPoolTaskExecutor taskExecutor; + @Autowired + ConfigurableApplicationContext context; + + @Autowired + ChatRoomChannel chatRoomChannel; + @Autowired + Consumer chatRoomChannelConsumer; + + CompletableFuture chatRoomChannelConsumerJob; + + + @Override + public void run(ApplicationArguments args) throws Exception + { + List topics = List.of(properties.getKafka().getChatRoomChannelTopic()); + chatRoomChannelConsumer.subscribe(topics, chatRoomChannel); + log.info("Starting the consumer for the ChatRoomChannel"); + chatRoomChannelConsumerJob = taskExecutor + .submitCompletable(chatRoomChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + return null; + }); + } + + @PreDestroy + public void joinChatRoomChannelConsumerJob() + { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatRoomChannelConsumer.wakeup(); + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + chatRoomChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java new file mode 100644 index 00000000..1cd41b53 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -0,0 +1,183 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ChatHome; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.time.Clock; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "kafka") +@Configuration +public class KafkaServicesConfiguration +{ + @Bean + ChatHome kafkaChatHome( + ChatBackendProperties properties, + ChatRoomChannel chatRoomChannel) + { + return new KafkaChatHome( + properties.getKafka().getNumPartitions(), + chatRoomChannel); + } + + @Bean + KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel) + { + return new KafkaChatRoomFactory(chatRoomChannel); + } + + @Bean + ChatRoomChannel chatRoomChannel( + ChatBackendProperties properties, + Producer chatRoomChannelProducer, + Consumer chatRoomChannelConsumer, + ZoneId zoneId, + Clock clock) + { + return new ChatRoomChannel( + properties.getKafka().getChatRoomChannelTopic(), + chatRoomChannelProducer, + chatRoomChannelConsumer, + zoneId, + properties.getKafka().getNumPartitions(), + properties.getChatroomBufferSize(), + clock); + } + + @Bean + Producer chatRoomChannelProducer( + Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, + StringSerializer stringSerializer, + JsonSerializer messageSerializer) + { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); + return new KafkaProducer<>( + properties, + stringSerializer, + messageSerializer); + } + + @Bean + StringSerializer stringSerializer() + { + return new StringSerializer(); + } + + @Bean + JsonSerializer chatMessageSerializer(String typeMappings) + { + JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of( + JsonSerializer.TYPE_MAPPINGS, typeMappings), + false); + return serializer; + } + + @Bean + Consumer chatRoomChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + StringDeserializer stringDeserializer, + JsonDeserializer messageDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "chatroom_channel"); + return new KafkaConsumer<>( + properties, + stringDeserializer, + messageDeserializer); + } + + @Bean + StringDeserializer stringDeserializer() + { + return new StringDeserializer(); + } + + @Bean + JsonDeserializer chatMessageDeserializer(String typeMappings) + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(), + JsonDeserializer.TYPE_MAPPINGS, typeMappings), + false ); + return deserializer; + } + + @Bean + String typeMappings () + { + return + "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + + "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName(); + } + + @Bean + Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + properties.setProperty( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + return properties; + } + + @Bean + Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + properties.setProperty( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + properties.setProperty( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + "false"); + properties.setProperty( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest"); + return properties; + } + + @Bean + ZoneId zoneId() + { + return ZoneId.systemDefault(); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java new file mode 100644 index 00000000..83e06bd1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; + + +public class LoadInProgressException extends ShardNotOwnedException +{ + public LoadInProgressException() + { + this(-1); + } + + public LoadInProgressException(int shard) + { + super(shard); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java new file mode 100644 index 00000000..7cc7541e --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java @@ -0,0 +1,18 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +public class AbstractMessageTo +{ + public enum ToType { + COMMAND_CREATE_CHATROOM, + EVENT_CHATMESSAGE_RECEIVED, + } + + @Getter + private final ToType type; +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java new file mode 100644 index 00000000..1a134f31 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java @@ -0,0 +1,27 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + +import lombok.*; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class CommandCreateChatRoomTo extends AbstractMessageTo +{ + private String name; + + + public CommandCreateChatRoomTo() + { + super(ToType.COMMAND_CREATE_CHATROOM); + } + + + public static CommandCreateChatRoomTo of(String name) + { + CommandCreateChatRoomTo to = new CommandCreateChatRoomTo(); + to.name = name; + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java new file mode 100644 index 00000000..2297b949 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java @@ -0,0 +1,48 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.*; + +import java.time.LocalDateTime; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class EventChatMessageReceivedTo extends AbstractMessageTo +{ + private String user; + private Long id; + private String text; + + + public EventChatMessageReceivedTo() + { + super(ToType.EVENT_CHATMESSAGE_RECEIVED); + } + + + public Message toMessage(long offset, LocalDateTime timestamp) + { + return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); + } + + public static EventChatMessageReceivedTo from(Message message) + { + return EventChatMessageReceivedTo.of( + message.getUsername(), + message.getId(), + message.getMessageText()); + } + + + public static EventChatMessageReceivedTo of(String user, Long id, String text) + { + EventChatMessageReceivedTo to = new EventChatMessageReceivedTo(); + to.user = user; + to.id = id; + to.text = text; + return to; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java new file mode 100644 index 00000000..961275e8 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -0,0 +1,92 @@ +package de.juplo.kafka.chat.backend; + +import de.juplo.kafka.chat.backend.persistence.kafka.ChatRoomChannel; +import de.juplo.kafka.chat.backend.persistence.kafka.KafkaServicesApplicationRunner; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC; + + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { + "chat.backend.services=kafka", + "chat.backend.kafka.client-id-PREFIX=TEST", + "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "chat.backend.kafka.chatroom-channel-topic=" + TOPIC, + "chat.backend.kafka.num-partitions=10", + }) +@EmbeddedKafka(topics = { TOPIC }, partitions = 10) +@Slf4j +class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT +{ + final static String TOPIC = "TEST_CHATROOM_CHANNEL"; + + static CompletableFuture CONSUMER_JOB; + + @MockBean + KafkaServicesApplicationRunner kafkaServicesApplicationRunner; + + @BeforeAll + public static void sendAndLoadStoredData( + @Autowired KafkaTemplate messageTemplate, + @Autowired Consumer chatRoomChannelConsumer, + @Autowired ThreadPoolTaskExecutor taskExecutor, + @Autowired ChatRoomChannel chatRoomChannel) + { + send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); + + List assignedPartitions = List.of(new TopicPartition(TOPIC, 2)); + chatRoomChannelConsumer.assign(assignedPartitions); + chatRoomChannel.onPartitionsAssigned(assignedPartitions); + CONSUMER_JOB = taskExecutor + .submitCompletable(chatRoomChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + return null; + }); + } + + static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) + { + ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); + record.headers().add("__TypeId__", typeId.getBytes()); + SendResult result = kafkaTemplate.send(record).join(); + log.info( + "Sent {}={} to {}", + key, + value, + new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); + } + + @AfterAll + static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer) + { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatRoomChannelConsumer.wakeup(); + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + CONSUMER_JOB.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java new file mode 100644 index 00000000..5ef12efe --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java @@ -0,0 +1,35 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class CommandCreateChatRoomToTest +{ + final String json = """ + { + "name": "Foo-Room!" + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class); + assertThat(message.getName()).isEqualTo("Foo-Room!"); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java new file mode 100644 index 00000000..33a78277 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class EventChatMessageReceivedToTest +{ + final String json = """ + { + "id": 1, + "text": "Hallo, ich heiße Peter!", + "user": "Peter" + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class); + assertThat(message.getId()).isEqualTo(1l); + assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); + assertThat(message.getUser()).isEqualTo("Peter"); + } +} -- 2.20.1