<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
private int chatroomBufferSize = 8;
private ServiceType services = ServiceType.inmemory;
private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
+ private KafkaServicesProperties kafka = new KafkaServicesProperties();
@Getter
private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString();
}
- public enum ServiceType { inmemory }
+ @Getter
+ @Setter
+ public static class KafkaServicesProperties
+ {
+ private String clientIdPrefix;
+ private String bootstrapServers = ":9092";
+ private String chatRoomChannelTopic = "message_channel";
+ private int numPartitions = 2;
+ }
+
+ public enum ServiceType { inmemory, kafka }
public enum StorageStrategyType { none, files, mongodb }
public enum ShardingStrategyType { none, kafkalike }
}
}
+ public ChatRoomService getChatRoomService()
+ {
+ return service;
+ }
+
public Mono<Message> getMessage(String username, Long messageId)
{
Message.MessageKey key = Message.MessageKey.of(username, messageId);
public interface ChatRoomFactory
{
- Mono<ChatRoom> createChatRoom(UUID id, String name);
+ Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
}
@Override
- public Mono<ChatRoom> createChatRoom(UUID id, String name)
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
log.info("Creating ChatRoom with buffer-size {}", bufferSize);
int shard = shardingStrategy.selectShard(id);
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.*;
+import java.util.*;
+import java.util.stream.IntStream;
+
+
+@Slf4j
+public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
+{
+ private final String topic;
+ private final Producer<String, AbstractMessageTo> producer;
+ private final Consumer<String, AbstractMessageTo> consumer;
+ private final ZoneId zoneId;
+ private final int numShards;
+ private final int bufferSize;
+ private final Clock clock;
+ private final boolean[] isShardOwned;
+ private final long[] currentOffset;
+ private final long[] nextOffset;
+ private final Map<UUID, ChatRoom>[] chatrooms;
+
+ private boolean running;
+ @Getter
+ private volatile boolean loadInProgress;
+
+
+ public ChatRoomChannel(
+ String topic,
+ Producer<String, AbstractMessageTo> producer,
+ Consumer<String, AbstractMessageTo> consumer,
+ ZoneId zoneId,
+ int numShards,
+ int bufferSize,
+ Clock clock)
+ {
+ log.debug(
+ "Creating ChatRoomChannel for topic {} with {} partitions",
+ topic,
+ numShards);
+ this.topic = topic;
+ this.consumer = consumer;
+ this.producer = producer;
+ this.zoneId = zoneId;
+ this.numShards = numShards;
+ this.bufferSize = bufferSize;
+ this.clock = clock;
+ this.isShardOwned = new boolean[numShards];
+ this.currentOffset = new long[numShards];
+ this.nextOffset = new long[numShards];
+ this.chatrooms = new Map[numShards];
+ IntStream
+ .range(0, numShards)
+ .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
+ }
+
+
+
+ Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+ UUID chatRoomId,
+ String name)
+ {
+ CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
+ return Mono.create(sink ->
+ {
+ ProducerRecord<String, AbstractMessageTo> record =
+ new ProducerRecord<>(
+ topic,
+ chatRoomId.toString(),
+ createChatRoomRequestTo);
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+ ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
+ createChatRoom(chatRoomInfo);
+ sink.success(chatRoomInfo);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send create-request for chat room (id={}, name={}): {}",
+ chatRoomId,
+ name,
+ exception);
+ sink.error(exception);
+ }
+ }));
+ });
+ }
+
+ Mono<Message> sendChatMessage(
+ UUID chatRoomId,
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
+ {
+ ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
+ return Mono.create(sink ->
+ {
+ ProducerRecord<String, AbstractMessageTo> record =
+ new ProducerRecord<>(
+ topic,
+ null,
+ zdt.toEpochSecond(),
+ chatRoomId.toString(),
+ EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ // On successful send
+ Message message = new Message(key, metadata.offset(), timestamp, text);
+ log.info("Successfully send message {}", message);
+ sink.success(message);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
+ chatRoomId,
+ key,
+ timestamp,
+ text,
+ exception);
+ sink.error(exception);
+ }
+ }));
+ });
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ log.info("Newly assigned partitions! Pausing normal operations...");
+ loadInProgress = true;
+
+ consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
+ {
+ int partition = topicPartition.partition();
+ isShardOwned[partition] = true;
+ this.currentOffset[partition] = currentOffset;
+
+ log.info(
+ "Partition assigned: {} - loading messages: next={} -> current={}",
+ partition,
+ nextOffset[partition],
+ currentOffset);
+
+ consumer.seek(topicPartition, nextOffset[partition]);
+ });
+
+ consumer.resume(partitions);
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(topicPartition ->
+ {
+ int partition = topicPartition.partition();
+ isShardOwned[partition] = false;
+ log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+ });
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ log.warn("Lost partitions: {}, partitions");
+ // TODO: Muss auf den Verlust anders reagiert werden?
+ onPartitionsRevoked(partitions);
+ }
+
+ @Override
+ public void run()
+ {
+ running = true;
+
+ while (running)
+ {
+ try
+ {
+ ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ log.info("Fetched {} messages", records.count());
+
+ if (loadInProgress)
+ {
+ loadChatRoom(records);
+
+ if (isLoadingCompleted())
+ {
+ log.info("Loading of messages completed! Pausing all owned partitions...");
+ pauseAllOwnedPartions();
+ log.info("Resuming normal operations...");
+ loadInProgress = false;
+ }
+ }
+ else
+ {
+ if (!records.isEmpty())
+ {
+ throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
+ }
+ }
+ }
+ catch (WakeupException e)
+ {
+ log.info("Received WakeupException, exiting!");
+ running = false;
+ }
+ }
+
+ log.info("Exiting normally");
+ }
+
+ private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+ {
+ for (ConsumerRecord<String, AbstractMessageTo> record : records)
+ {
+ UUID chatRoomId = UUID.fromString(record.key());
+
+ switch (record.value().getType())
+ {
+ case COMMAND_CREATE_CHATROOM:
+ createChatRoom(
+ chatRoomId,
+ (CommandCreateChatRoomTo) record.value(),
+ record.partition());
+ break;
+
+ case EVENT_CHATMESSAGE_RECEIVED:
+ Instant instant = Instant.ofEpochSecond(record.timestamp());
+ LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+ loadChatMessage(
+ chatRoomId,
+ timestamp,
+ record.offset(),
+ (EventChatMessageReceivedTo) record.value(),
+ record.partition());
+ break;
+
+ default:
+ log.debug(
+ "Ignoring message for chat-room {} with offset {}: {}",
+ chatRoomId,
+ record.offset(),
+ record.value());
+ }
+
+ nextOffset[record.partition()] = record.offset() + 1;
+ }
+ }
+
+ private void createChatRoom(
+ UUID chatRoomId,
+ CommandCreateChatRoomTo createChatRoomRequestTo,
+ int partition)
+ {
+ log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
+ KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
+ ChatRoom chatRoom = new ChatRoom(
+ chatRoomId,
+ createChatRoomRequestTo.getName(),
+ partition,
+ clock,
+ service,
+ bufferSize);
+ putChatRoom(chatRoom);
+ }
+
+
+ private void createChatRoom(ChatRoomInfo chatRoomInfo)
+ {
+ UUID id = chatRoomInfo.getId();
+ String name = chatRoomInfo.getName();
+ int shard = chatRoomInfo.getShard();
+ log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+ KafkaChatRoomService service = new KafkaChatRoomService(this, id);
+ ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+ putChatRoom(chatRoom);
+ }
+
+ private void loadChatMessage(
+ UUID chatRoomId,
+ LocalDateTime timestamp,
+ long offset,
+ EventChatMessageReceivedTo chatMessageTo,
+ int partition)
+ {
+ Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
+ Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
+
+ ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
+ KafkaChatRoomService kafkaChatRoomService =
+ (KafkaChatRoomService) chatRoom.getChatRoomService();
+
+ kafkaChatRoomService.persistMessage(message);
+ }
+
+ private boolean isLoadingCompleted()
+ {
+ return IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
+ }
+
+ private void pauseAllOwnedPartions()
+ {
+ consumer.pause(IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .mapToObj(shard -> new TopicPartition(topic, shard))
+ .toList());
+ }
+
+
+ private void putChatRoom(ChatRoom chatRoom)
+ {
+ Integer partition = chatRoom.getShard();
+ UUID chatRoomId = chatRoom.getId();
+ if (chatrooms[partition].containsKey(chatRoomId))
+ {
+ log.warn("Ignoring existing chat-room: " + chatRoom);
+ }
+ else
+ {
+ log.info(
+ "Adding new chat-room to partition {}: {}",
+ partition,
+ chatRoom);
+
+ chatrooms[partition].put(chatRoomId, chatRoom);
+ }
+ }
+
+ Mono<ChatRoom> getChatRoom(int shard, UUID id)
+ {
+ if (loadInProgress)
+ {
+ throw new LoadInProgressException(shard);
+ }
+
+ if (!isShardOwned[shard])
+ {
+ throw new ShardNotOwnedException(shard);
+ }
+
+ return Mono.justOrEmpty(chatrooms[shard].get(id));
+ }
+
+ Flux<ChatRoom> getChatRooms()
+ {
+ return Flux
+ .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
+ .filter(shard -> isShardOwned[shard])
+ .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.utils.Utils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.*;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaChatHome implements ChatHome
+{
+ private final int numPartitions;
+ private final ChatRoomChannel chatRoomChannel;
+
+
+ @Override
+ public Mono<ChatRoom> getChatRoom(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatRoomChannel.getChatRoom(shard, id);
+ }
+
+ int selectShard(UUID chatRoomId)
+ {
+ byte[] serializedKey = chatRoomId.toString().getBytes();
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+ }
+
+ @Override
+ public Flux<ChatRoom> getChatRooms()
+ {
+ return chatRoomChannel.getChatRooms();
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Mono;
+
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaChatRoomFactory implements ChatRoomFactory
+{
+ private final ChatRoomChannel chatRoomChannel;
+
+ @Override
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+ {
+ log.info("Sending create-command for chat rooom: id={}, name={}");
+ return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.util.LinkedHashMap;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaChatRoomService implements ChatRoomService
+{
+ private final ChatRoomChannel chatRoomChannel;
+ private final UUID chatRoomId;
+
+ private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+
+
+ @Override
+ public Mono<Message> persistMessage(
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
+ {
+ return chatRoomChannel
+ .sendChatMessage(chatRoomId, key, timestamp, text)
+ .doOnSuccess(message -> persistMessage(message));
+ }
+
+ void persistMessage(Message message)
+ {
+ messages.put (message.getKey(), message);
+ }
+
+ @Override
+ synchronized public Mono<Message> getMessage(Message.MessageKey key)
+ {
+ return Mono.fromSupplier(() -> messages.get(key));
+ }
+
+ @Override
+ synchronized public Flux<Message> getMessages(long first, long last)
+ {
+ return Flux.fromStream(messages
+ .values()
+ .stream()
+ .filter(message ->
+ {
+ long serial = message.getSerialNumber();
+ return serial >= first && serial <= last;
+ }));
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+@ConditionalOnProperty(
+ prefix = "chat.backend",
+ name = "services",
+ havingValue = "kafka")
+@Component
+@Slf4j
+public class KafkaServicesApplicationRunner implements ApplicationRunner
+{
+ @Autowired
+ ChatBackendProperties properties;
+
+ @Autowired
+ ThreadPoolTaskExecutor taskExecutor;
+ @Autowired
+ ConfigurableApplicationContext context;
+
+ @Autowired
+ ChatRoomChannel chatRoomChannel;
+ @Autowired
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+
+ CompletableFuture<Void> chatRoomChannelConsumerJob;
+
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception
+ {
+ List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
+ chatRoomChannelConsumer.subscribe(topics, chatRoomChannel);
+ log.info("Starting the consumer for the ChatRoomChannel");
+ chatRoomChannelConsumerJob = taskExecutor
+ .submitCompletable(chatRoomChannel)
+ .exceptionally(e ->
+ {
+ log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+ return null;
+ });
+ }
+
+ @PreDestroy
+ public void joinChatRoomChannelConsumerJob()
+ {
+ log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+ chatRoomChannelConsumer.wakeup();
+ log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+ chatRoomChannelConsumerJob.join();
+ log.info("Joined the consumer of the ChatRoomChannel");
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+
+@ConditionalOnProperty(
+ prefix = "chat.backend",
+ name = "services",
+ havingValue = "kafka")
+@Configuration
+public class KafkaServicesConfiguration
+{
+ @Bean
+ ChatHome kafkaChatHome(
+ ChatBackendProperties properties,
+ ChatRoomChannel chatRoomChannel)
+ {
+ return new KafkaChatHome(
+ properties.getKafka().getNumPartitions(),
+ chatRoomChannel);
+ }
+
+ @Bean
+ KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
+ {
+ return new KafkaChatRoomFactory(chatRoomChannel);
+ }
+
+ @Bean
+ ChatRoomChannel chatRoomChannel(
+ ChatBackendProperties properties,
+ Producer<String, AbstractMessageTo> chatRoomChannelProducer,
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+ ZoneId zoneId,
+ Clock clock)
+ {
+ return new ChatRoomChannel(
+ properties.getKafka().getChatRoomChannelTopic(),
+ chatRoomChannelProducer,
+ chatRoomChannelConsumer,
+ zoneId,
+ properties.getKafka().getNumPartitions(),
+ properties.getChatroomBufferSize(),
+ clock);
+ }
+
+ @Bean
+ Producer<String, AbstractMessageTo> chatRoomChannelProducer(
+ Properties defaultProducerProperties,
+ ChatBackendProperties chatBackendProperties,
+ StringSerializer stringSerializer,
+ JsonSerializer<AbstractMessageTo> messageSerializer)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ProducerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
+ return new KafkaProducer<>(
+ properties,
+ stringSerializer,
+ messageSerializer);
+ }
+
+ @Bean
+ StringSerializer stringSerializer()
+ {
+ return new StringSerializer();
+ }
+
+ @Bean
+ JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
+ {
+ JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
+ serializer.configure(
+ Map.of(
+ JsonSerializer.TYPE_MAPPINGS, typeMappings),
+ false);
+ return serializer;
+ }
+
+ @Bean
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer(
+ Properties defaultConsumerProperties,
+ ChatBackendProperties chatBackendProperties,
+ StringDeserializer stringDeserializer,
+ JsonDeserializer<AbstractMessageTo> messageDeserializer)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
+ properties.put(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "chatroom_channel");
+ return new KafkaConsumer<>(
+ properties,
+ stringDeserializer,
+ messageDeserializer);
+ }
+
+ @Bean
+ StringDeserializer stringDeserializer()
+ {
+ return new StringDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
+ {
+ JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
+ deserializer.configure(
+ Map.of(
+ JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
+ JsonDeserializer.TYPE_MAPPINGS, typeMappings),
+ false );
+ return deserializer;
+ }
+
+ @Bean
+ String typeMappings ()
+ {
+ return
+ "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+ "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
+ }
+
+ @Bean
+ Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ properties.setProperty(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers());
+ return properties;
+ }
+
+ @Bean
+ Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ properties.setProperty(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers());
+ properties.setProperty(
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ "false");
+ properties.setProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ "earliest");
+ return properties;
+ }
+
+ @Bean
+ ZoneId zoneId()
+ {
+ return ZoneId.systemDefault();
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+
+
+public class LoadInProgressException extends ShardNotOwnedException
+{
+ public LoadInProgressException()
+ {
+ this(-1);
+ }
+
+ public LoadInProgressException(int shard)
+ {
+ super(shard);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+public class AbstractMessageTo
+{
+ public enum ToType {
+ COMMAND_CREATE_CHATROOM,
+ EVENT_CHATMESSAGE_RECEIVED,
+ }
+
+ @Getter
+ private final ToType type;
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import lombok.*;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class CommandCreateChatRoomTo extends AbstractMessageTo
+{
+ private String name;
+
+
+ public CommandCreateChatRoomTo()
+ {
+ super(ToType.COMMAND_CREATE_CHATROOM);
+ }
+
+
+ public static CommandCreateChatRoomTo of(String name)
+ {
+ CommandCreateChatRoomTo to = new CommandCreateChatRoomTo();
+ to.name = name;
+ return to;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.*;
+
+import java.time.LocalDateTime;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventChatMessageReceivedTo extends AbstractMessageTo
+{
+ private String user;
+ private Long id;
+ private String text;
+
+
+ public EventChatMessageReceivedTo()
+ {
+ super(ToType.EVENT_CHATMESSAGE_RECEIVED);
+ }
+
+
+ public Message toMessage(long offset, LocalDateTime timestamp)
+ {
+ return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
+ }
+
+ public static EventChatMessageReceivedTo from(Message message)
+ {
+ return EventChatMessageReceivedTo.of(
+ message.getUsername(),
+ message.getId(),
+ message.getMessageText());
+ }
+
+
+ public static EventChatMessageReceivedTo of(String user, Long id, String text)
+ {
+ EventChatMessageReceivedTo to = new EventChatMessageReceivedTo();
+ to.user = user;
+ to.id = id;
+ to.text = text;
+ return to;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import de.juplo.kafka.chat.backend.persistence.kafka.ChatRoomChannel;
+import de.juplo.kafka.chat.backend.persistence.kafka.KafkaServicesApplicationRunner;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "chat.backend.services=kafka",
+ "chat.backend.kafka.client-id-PREFIX=TEST",
+ "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+ "chat.backend.kafka.num-partitions=10",
+ })
+@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@Slf4j
+class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
+{
+ final static String TOPIC = "TEST_CHATROOM_CHANNEL";
+
+ static CompletableFuture<Void> CONSUMER_JOB;
+
+ @MockBean
+ KafkaServicesApplicationRunner kafkaServicesApplicationRunner;
+
+ @BeforeAll
+ public static void sendAndLoadStoredData(
+ @Autowired KafkaTemplate<String, String> messageTemplate,
+ @Autowired Consumer chatRoomChannelConsumer,
+ @Autowired ThreadPoolTaskExecutor taskExecutor,
+ @Autowired ChatRoomChannel chatRoomChannel)
+ {
+ send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+
+ List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
+ chatRoomChannelConsumer.assign(assignedPartitions);
+ chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+ CONSUMER_JOB = taskExecutor
+ .submitCompletable(chatRoomChannel)
+ .exceptionally(e ->
+ {
+ log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+ return null;
+ });
+ }
+
+ static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+ {
+ ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+ record.headers().add("__TypeId__", typeId.getBytes());
+ SendResult<String, String> result = kafkaTemplate.send(record).join();
+ log.info(
+ "Sent {}={} to {}",
+ key,
+ value,
+ new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
+ }
+
+ @AfterAll
+ static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
+ {
+ log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+ chatRoomChannelConsumer.wakeup();
+ log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+ CONSUMER_JOB.join();
+ log.info("Joined the consumer of the ChatRoomChannel");
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class CommandCreateChatRoomToTest
+{
+ final String json = """
+ {
+ "name": "Foo-Room!"
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class);
+ assertThat(message.getName()).isEqualTo("Foo-Room!");
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class EventChatMessageReceivedToTest
+{
+ final String json = """
+ {
+ "id": 1,
+ "text": "Hallo, ich heiße Peter!",
+ "user": "Peter"
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class);
+ assertThat(message.getId()).isEqualTo(1l);
+ assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
+ assertThat(message.getUser()).isEqualTo("Peter");
+ }
+}