--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import java.util.UUID;
+
+
+public interface ShardingStrategy
+{
+ int selectShard(UUID chatRoomId);
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import reactor.core.publisher.Flux;
+
+import java.util.UUID;
+
+
+public interface StorageStrategy
+{
+ default void write(ChatHomeService chatHomeService)
+ {
+ writeChatRoomInfo(
+ chatHomeService
+ .getChatRoomInfo()
+ .doOnNext(chatRoomInfo ->
+ writeChatRoomData(
+ chatRoomInfo.getId(),
+ chatHomeService
+ .getChatRoomData(chatRoomInfo.getId())
+ .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
+ }
+
+ void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
+ Flux<ChatRoomInfo> readChatRoomInfo();
+ void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+ Flux<Message> readChatRoomData(UUID chatRoomId);
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.util.LinkedHashMap;
+
+
+@Slf4j
+public class InMemoryChatRoomService implements ChatRoomService
+{
+ private final LinkedHashMap<Message.MessageKey, Message> messages;
+
+
+ public InMemoryChatRoomService(Flux<Message> messageFlux)
+ {
+ log.debug("Creating InMemoryChatRoomService");
+ messages = new LinkedHashMap<>();
+ messageFlux.subscribe(message -> messages.put(message.getKey(), message));
+ }
+
+ @Override
+ public Mono<Message> persistMessage(
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
+ {
+ Message message = new Message(key, (long)messages.size(), timestamp, text);
+ messages.put(message.getKey(), message);
+ return Mono.just(message);
+ }
+
+ @Override
+ public Mono<Message> getMessage(Message.MessageKey key)
+ {
+ return Mono.fromSupplier(() -> messages.get(key));
+ }
+
+ @Override
+ public Flux<Message> getMessages(long first, long last)
+ {
+ return Flux.fromStream(messages
+ .values()
+ .stream()
+ .filter(message ->
+ {
+ long serial = message.getSerialNumber();
+ return serial >= first && serial <= last;
+ }));
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.time.Clock;
+import java.util.stream.IntStream;
+
+
+@ConditionalOnProperty(
+ prefix = "chat.backend",
+ name = "services",
+ havingValue = "inmemory",
+ matchIfMissing = true)
+@Configuration
+public class InMemoryServicesConfiguration
+{
+ @Bean
+ @ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "sharding-strategy",
+ havingValue = "none",
+ matchIfMissing = true)
+ ChatHomeService noneShardingChatHome(
+ ChatBackendProperties properties,
+ StorageStrategy storageStrategy,
+ Clock clock)
+ {
+ return new SimpleChatHomeService(
+ storageStrategy,
+ clock,
+ properties.getChatroomBufferSize());
+ }
+
+ @Bean
+ @ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "sharding-strategy",
+ havingValue = "kafkalike")
+ ChatHomeService kafkalikeShardingChatHome(
+ ChatBackendProperties properties,
+ StorageStrategy storageStrategy,
+ Clock clock)
+ {
+ int numShards = properties.getInmemory().getNumShards();
+ SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards];
+ IntStream
+ .of(properties.getInmemory().getOwnedShards())
+ .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
+ shard,
+ storageStrategy,
+ clock,
+ properties.getChatroomBufferSize()));
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+ return new ShardedChatHomeService(chatHomes, strategy);
+ }
+
+ @ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "sharding-strategy",
+ havingValue = "none",
+ matchIfMissing = true)
+ @Bean
+ ShardingStrategy defaultShardingStrategy()
+ {
+ return chatRoomId -> 0;
+ }
+
+ @ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "sharding-strategy",
+ havingValue = "kafkalike")
+ @Bean
+ ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
+ {
+ return new KafkaLikeShardingStrategy(
+ properties.getInmemory().getNumShards());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+public class KafkaLikeShardingStrategy implements ShardingStrategy
+{
+ private final int numPartitions;
+
+ @Override
+ public int selectShard(UUID chatRoomId)
+ {
+ byte[] serializedKey = chatRoomId.toString().getBytes();
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+
+@Slf4j
+public class ShardedChatHomeService implements ChatHomeService
+{
+ private final SimpleChatHomeService[] chatHomes;
+ private final Set<Integer> ownedShards;
+ private final ShardingStrategy shardingStrategy;
+
+
+ public ShardedChatHomeService(
+ SimpleChatHomeService[] chatHomes,
+ ShardingStrategy shardingStrategy)
+ {
+ this.chatHomes = chatHomes;
+ this.shardingStrategy = shardingStrategy;
+ this.ownedShards = new HashSet<>();
+ for (int shard = 0; shard < chatHomes.length; shard++)
+ if(chatHomes[shard] != null)
+ this.ownedShards.add(shard);
+ log.info(
+ "Created ShardedChatHome for shards: {}",
+ ownedShards
+ .stream()
+ .map(String::valueOf)
+ .collect(Collectors.joining(", ")));
+ }
+
+
+ @Override
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+ {
+ int shard = shardingStrategy.selectShard(id);
+ return chatHomes[shard] == null
+ ? Mono.error(new ShardNotOwnedException(shard))
+ : chatHomes[shard].createChatRoom(id, name);
+ }
+
+ @Override
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatHomes[shard] == null
+ ? Mono.error(new ShardNotOwnedException(shard))
+ : chatHomes[shard]
+ .getChatRoomInfo(id)
+ .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+ ? new UnknownChatroomException(
+ id,
+ shard,
+ ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+ : throwable);
+ }
+
+ @Override
+ public Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return Flux
+ .fromIterable(ownedShards)
+ .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
+ }
+
+ @Override
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatHomes[shard] == null
+ ? Mono.error(new ShardNotOwnedException(shard))
+ : chatHomes[shard]
+ .getChatRoomData(id)
+ .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+ ? new UnknownChatroomException(
+ id,
+ shard,
+ ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+ : throwable);
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
+ {
+ return Flux
+ .fromIterable(ownedShards)
+ .flatMap(shard -> chatHomes[shard].getChatRoomData());
+ }
+
+
+
+ private int selectShard(UUID chatroomId)
+ {
+ return shardingStrategy.selectShard(chatroomId);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Clock;
+import java.util.*;
+
+
+@Slf4j
+public class SimpleChatHomeService implements ChatHomeService
+{
+ private final Integer shard;
+ private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+ private final Map<UUID, ChatRoomData> chatRoomData;
+ private final Clock clock;
+ private final int bufferSize;
+
+
+
+ public SimpleChatHomeService(
+ StorageStrategy storageStrategy,
+ Clock clock,
+ int bufferSize)
+ {
+ this(
+ null,
+ storageStrategy,
+ clock,
+ bufferSize);
+ }
+
+ public SimpleChatHomeService(
+ Integer shard,
+ StorageStrategy storageStrategy,
+ Clock clock,
+ int bufferSize)
+ {
+ log.info("Created SimpleChatHome for shard {}", shard);
+;
+ this.shard = shard;
+ this.chatRoomInfo = new HashMap<>();
+ this.chatRoomData = new HashMap<>();
+ storageStrategy
+ .readChatRoomInfo()
+ .filter(info ->
+ {
+ if (shard == null || info.getShard() == shard)
+ {
+ return true;
+ }
+ else
+ {
+ log.info(
+ "SimpleChatHome for shard {} ignores not owned chat-room {}",
+ shard,
+ info);
+ return false;
+ }
+ })
+ .toStream()
+ .forEach(info ->
+ {
+ UUID chatRoomId = info.getId();
+ chatRoomInfo.put(chatRoomId, info);
+ Flux<Message> messageFlux =
+ storageStrategy.readChatRoomData(chatRoomId);
+ chatRoomData.put(
+ info.getId(),
+ new ChatRoomData(
+ clock,
+ new InMemoryChatRoomService(messageFlux),
+ bufferSize));
+ });
+ this.clock = clock;
+ this.bufferSize = bufferSize;
+ }
+
+
+ @Override
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+ {
+ log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+ ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
+ ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
+ this.chatRoomInfo.put(id, chatRoomInfo);
+ ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ this.chatRoomData.put(id, chatRoomData);
+ return Mono.just(chatRoomInfo);
+ }
+
+ @Override
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
+ {
+ return Mono
+ .justOrEmpty(chatRoomInfo.get(id))
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+ }
+
+ @Override
+ public Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return Flux.fromIterable(chatRoomInfo.values());
+ }
+
+ @Override
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ return Mono
+ .justOrEmpty(chatRoomData.get(id))
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
+ {
+ return Flux.fromIterable(chatRoomData.values());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.*;
+import java.util.*;
+import java.util.stream.IntStream;
+
+
+@Slf4j
+public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
+{
+ private final String topic;
+ private final Producer<String, AbstractMessageTo> producer;
+ private final Consumer<String, AbstractMessageTo> consumer;
+ private final ZoneId zoneId;
+ private final int numShards;
+ private final int bufferSize;
+ private final Clock clock;
+ private final boolean[] isShardOwned;
+ private final long[] currentOffset;
+ private final long[] nextOffset;
+ private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
+ private final Map<UUID, ChatRoomData>[] chatRoomData;
+
+ private boolean running;
+ @Getter
+ private volatile boolean loadInProgress;
+
+
+ public ChatRoomChannel(
+ String topic,
+ Producer<String, AbstractMessageTo> producer,
+ Consumer<String, AbstractMessageTo> consumer,
+ ZoneId zoneId,
+ int numShards,
+ int bufferSize,
+ Clock clock)
+ {
+ log.debug(
+ "Creating ChatRoomChannel for topic {} with {} partitions",
+ topic,
+ numShards);
+ this.topic = topic;
+ this.consumer = consumer;
+ this.producer = producer;
+ this.zoneId = zoneId;
+ this.numShards = numShards;
+ this.bufferSize = bufferSize;
+ this.clock = clock;
+ this.isShardOwned = new boolean[numShards];
+ this.currentOffset = new long[numShards];
+ this.nextOffset = new long[numShards];
+ this.chatRoomInfo = new Map[numShards];
+ this.chatRoomData = new Map[numShards];
+ IntStream
+ .range(0, numShards)
+ .forEach(shard ->
+ {
+ this.chatRoomInfo[shard] = new HashMap<>();
+ this.chatRoomData[shard] = new HashMap<>();
+ });
+ }
+
+
+
+ Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+ UUID chatRoomId,
+ String name)
+ {
+ CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
+ return Mono.create(sink ->
+ {
+ ProducerRecord<String, AbstractMessageTo> record =
+ new ProducerRecord<>(
+ topic,
+ chatRoomId.toString(),
+ createChatRoomRequestTo);
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+ ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
+ createChatRoom(chatRoomInfo);
+ sink.success(chatRoomInfo);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send create-request for chat room (id={}, name={}): {}",
+ chatRoomId,
+ name,
+ exception);
+ sink.error(exception);
+ }
+ }));
+ });
+ }
+
+ Mono<Message> sendChatMessage(
+ UUID chatRoomId,
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
+ {
+ ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
+ return Mono.create(sink ->
+ {
+ ProducerRecord<String, AbstractMessageTo> record =
+ new ProducerRecord<>(
+ topic,
+ null,
+ zdt.toEpochSecond(),
+ chatRoomId.toString(),
+ EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ // On successful send
+ Message message = new Message(key, metadata.offset(), timestamp, text);
+ log.info("Successfully send message {}", message);
+ sink.success(message);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
+ chatRoomId,
+ key,
+ timestamp,
+ text,
+ exception);
+ sink.error(exception);
+ }
+ }));
+ });
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ log.info("Newly assigned partitions! Pausing normal operations...");
+ loadInProgress = true;
+
+ consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
+ {
+ int partition = topicPartition.partition();
+ isShardOwned[partition] = true;
+ this.currentOffset[partition] = currentOffset;
+
+ log.info(
+ "Partition assigned: {} - loading messages: next={} -> current={}",
+ partition,
+ nextOffset[partition],
+ currentOffset);
+
+ consumer.seek(topicPartition, nextOffset[partition]);
+ });
+
+ consumer.resume(partitions);
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(topicPartition ->
+ {
+ int partition = topicPartition.partition();
+ isShardOwned[partition] = false;
+ log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+ });
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ log.warn("Lost partitions: {}, partitions");
+ // TODO: Muss auf den Verlust anders reagiert werden?
+ onPartitionsRevoked(partitions);
+ }
+
+ @Override
+ public void run()
+ {
+ running = true;
+
+ while (running)
+ {
+ try
+ {
+ ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ log.info("Fetched {} messages", records.count());
+
+ if (loadInProgress)
+ {
+ loadChatRoom(records);
+
+ if (isLoadingCompleted())
+ {
+ log.info("Loading of messages completed! Pausing all owned partitions...");
+ pauseAllOwnedPartions();
+ log.info("Resuming normal operations...");
+ loadInProgress = false;
+ }
+ }
+ else
+ {
+ if (!records.isEmpty())
+ {
+ throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
+ }
+ }
+ }
+ catch (WakeupException e)
+ {
+ log.info("Received WakeupException, exiting!");
+ running = false;
+ }
+ }
+
+ log.info("Exiting normally");
+ }
+
+ private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+ {
+ for (ConsumerRecord<String, AbstractMessageTo> record : records)
+ {
+ UUID chatRoomId = UUID.fromString(record.key());
+
+ switch (record.value().getType())
+ {
+ case COMMAND_CREATE_CHATROOM:
+ createChatRoom(
+ chatRoomId,
+ (CommandCreateChatRoomTo) record.value(),
+ record.partition());
+ break;
+
+ case EVENT_CHATMESSAGE_RECEIVED:
+ Instant instant = Instant.ofEpochSecond(record.timestamp());
+ LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+ loadChatMessage(
+ chatRoomId,
+ timestamp,
+ record.offset(),
+ (EventChatMessageReceivedTo) record.value(),
+ record.partition());
+ break;
+
+ default:
+ log.debug(
+ "Ignoring message for chat-room {} with offset {}: {}",
+ chatRoomId,
+ record.offset(),
+ record.value());
+ }
+
+ nextOffset[record.partition()] = record.offset() + 1;
+ }
+ }
+
+ private void createChatRoom(
+ UUID chatRoomId,
+ CommandCreateChatRoomTo createChatRoomRequestTo,
+ Integer partition)
+ {
+ log.info(
+ "Loading ChatRoom {} for shard {} with buffer-size {}",
+ chatRoomId,
+ partition,
+ bufferSize);
+ KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
+ ChatRoomData chatRoomData = new ChatRoomData(
+ clock,
+ service,
+ bufferSize);
+ putChatRoom(
+ chatRoomId,
+ createChatRoomRequestTo.getName(),
+ partition,
+ chatRoomData);
+ }
+
+
+ private void createChatRoom(ChatRoomInfo chatRoomInfo)
+ {
+ UUID id = chatRoomInfo.getId();
+ log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+ KafkaChatRoomService service = new KafkaChatRoomService(this, id);
+ ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ putChatRoom(
+ chatRoomInfo.getId(),
+ chatRoomInfo.getName(),
+ chatRoomInfo.getShard(),
+ chatRoomData);
+ }
+
+ private void loadChatMessage(
+ UUID chatRoomId,
+ LocalDateTime timestamp,
+ long offset,
+ EventChatMessageReceivedTo chatMessageTo,
+ int partition)
+ {
+ Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
+ Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
+
+ ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
+ KafkaChatRoomService kafkaChatRoomService =
+ (KafkaChatRoomService) chatRoomData.getChatRoomService();
+
+ kafkaChatRoomService.persistMessage(message);
+ }
+
+ private boolean isLoadingCompleted()
+ {
+ return IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
+ }
+
+ private void pauseAllOwnedPartions()
+ {
+ consumer.pause(IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .mapToObj(shard -> new TopicPartition(topic, shard))
+ .toList());
+ }
+
+
+ private void putChatRoom(
+ UUID chatRoomId,
+ String name,
+ Integer partition,
+ ChatRoomData chatRoomData)
+ {
+ if (this.chatRoomInfo[partition].containsKey(chatRoomId))
+ {
+ log.warn(
+ "Ignoring existing chat-room for {}: {}",
+ partition,
+ chatRoomId);
+ }
+ else
+ {
+ log.info(
+ "Adding new chat-room to partition {}: {}",
+ partition,
+ chatRoomData);
+
+ this.chatRoomInfo[partition].put(
+ chatRoomId,
+ new ChatRoomInfo(chatRoomId, name, partition));
+ this.chatRoomData[partition].put(chatRoomId, chatRoomData);
+ }
+ }
+
+ int[] getOwnedShards()
+ {
+ return IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .toArray();
+ }
+
+ Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
+ {
+ if (loadInProgress)
+ {
+ return Mono.error(new LoadInProgressException());
+ }
+
+ if (!isShardOwned[shard])
+ {
+ return Mono.error(new ShardNotOwnedException(shard));
+ }
+
+ return Mono.justOrEmpty(chatRoomData[shard].get(id));
+ }
+
+ Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return Flux
+ .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
+ .filter(shard -> isShardOwned[shard])
+ .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
+ }
+
+ Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
+ {
+ if (loadInProgress)
+ {
+ return Mono.error(new LoadInProgressException());
+ }
+
+ if (!isShardOwned[shard])
+ {
+ return Mono.error(new ShardNotOwnedException(shard));
+ }
+
+ return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
+ }
+
+ Flux<ChatRoomData> getChatRoomData()
+ {
+ return Flux
+ .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
+ .filter(shard -> isShardOwned[shard])
+ .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.utils.Utils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.*;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaChatHomeService implements ChatHomeService
+{
+ private final int numPartitions;
+ private final ChatRoomChannel chatRoomChannel;
+
+
+
+ @Override
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+ {
+ log.info("Sending create-command for chat rooom: id={}, name={}");
+ return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+ }
+
+ @Override
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatRoomChannel
+ .getChatRoomInfo(shard, id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
+ id,
+ shard,
+ chatRoomChannel.getOwnedShards())));
+ }
+
+ @Override
+ public Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return chatRoomChannel.getChatRoomInfo();
+ }
+
+ @Override
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatRoomChannel
+ .getChatRoomData(shard, id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
+ id,
+ shard,
+ chatRoomChannel.getOwnedShards())));
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
+ {
+ return chatRoomChannel.getChatRoomData();
+ }
+
+ int selectShard(UUID chatRoomId)
+ {
+ byte[] serializedKey = chatRoomId.toString().getBytes();
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.util.LinkedHashMap;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaChatRoomService implements ChatRoomService
+{
+ private final ChatRoomChannel chatRoomChannel;
+ private final UUID chatRoomId;
+
+ private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+
+
+ @Override
+ public Mono<Message> persistMessage(
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
+ {
+ return chatRoomChannel
+ .sendChatMessage(chatRoomId, key, timestamp, text)
+ .doOnSuccess(message -> persistMessage(message));
+ }
+
+ void persistMessage(Message message)
+ {
+ messages.put (message.getKey(), message);
+ }
+
+ @Override
+ synchronized public Mono<Message> getMessage(Message.MessageKey key)
+ {
+ return Mono.fromSupplier(() -> messages.get(key));
+ }
+
+ @Override
+ synchronized public Flux<Message> getMessages(long first, long last)
+ {
+ return Flux.fromStream(messages
+ .values()
+ .stream()
+ .filter(message ->
+ {
+ long serial = message.getSerialNumber();
+ return serial >= first && serial <= last;
+ }));
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+@ConditionalOnProperty(
+ prefix = "chat.backend",
+ name = "services",
+ havingValue = "kafka")
+@Component
+@Slf4j
+public class KafkaServicesApplicationRunner implements ApplicationRunner
+{
+ @Autowired
+ ChatBackendProperties properties;
+
+ @Autowired
+ ThreadPoolTaskExecutor taskExecutor;
+ @Autowired
+ ConfigurableApplicationContext context;
+
+ @Autowired
+ ChatRoomChannel chatRoomChannel;
+ @Autowired
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+
+ CompletableFuture<Void> chatRoomChannelConsumerJob;
+
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception
+ {
+ List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
+ chatRoomChannelConsumer.subscribe(topics, chatRoomChannel);
+ log.info("Starting the consumer for the ChatRoomChannel");
+ chatRoomChannelConsumerJob = taskExecutor
+ .submitCompletable(chatRoomChannel)
+ .exceptionally(e ->
+ {
+ log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+ return null;
+ });
+ }
+
+ @PreDestroy
+ public void joinChatRoomChannelConsumerJob()
+ {
+ log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+ chatRoomChannelConsumer.wakeup();
+ log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+ chatRoomChannelConsumerJob.join();
+ log.info("Joined the consumer of the ChatRoomChannel");
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+
+@ConditionalOnProperty(
+ prefix = "chat.backend",
+ name = "services",
+ havingValue = "kafka")
+@Configuration
+public class KafkaServicesConfiguration
+{
+ @Bean
+ ChatHomeService kafkaChatHome(
+ ChatBackendProperties properties,
+ ChatRoomChannel chatRoomChannel)
+ {
+ return new KafkaChatHomeService(
+ properties.getKafka().getNumPartitions(),
+ chatRoomChannel);
+ }
+
+ @Bean
+ ChatRoomChannel chatRoomChannel(
+ ChatBackendProperties properties,
+ Producer<String, AbstractMessageTo> chatRoomChannelProducer,
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+ ZoneId zoneId,
+ Clock clock)
+ {
+ return new ChatRoomChannel(
+ properties.getKafka().getChatRoomChannelTopic(),
+ chatRoomChannelProducer,
+ chatRoomChannelConsumer,
+ zoneId,
+ properties.getKafka().getNumPartitions(),
+ properties.getChatroomBufferSize(),
+ clock);
+ }
+
+ @Bean
+ Producer<String, AbstractMessageTo> chatRoomChannelProducer(
+ Properties defaultProducerProperties,
+ ChatBackendProperties chatBackendProperties,
+ StringSerializer stringSerializer,
+ JsonSerializer<AbstractMessageTo> messageSerializer)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ProducerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
+ return new KafkaProducer<>(
+ properties,
+ stringSerializer,
+ messageSerializer);
+ }
+
+ @Bean
+ StringSerializer stringSerializer()
+ {
+ return new StringSerializer();
+ }
+
+ @Bean
+ JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
+ {
+ JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
+ serializer.configure(
+ Map.of(
+ JsonSerializer.TYPE_MAPPINGS, typeMappings),
+ false);
+ return serializer;
+ }
+
+ @Bean
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer(
+ Properties defaultConsumerProperties,
+ ChatBackendProperties chatBackendProperties,
+ StringDeserializer stringDeserializer,
+ JsonDeserializer<AbstractMessageTo> messageDeserializer)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
+ properties.put(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "chatroom_channel");
+ return new KafkaConsumer<>(
+ properties,
+ stringDeserializer,
+ messageDeserializer);
+ }
+
+ @Bean
+ StringDeserializer stringDeserializer()
+ {
+ return new StringDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
+ {
+ JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
+ deserializer.configure(
+ Map.of(
+ JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
+ JsonDeserializer.TYPE_MAPPINGS, typeMappings),
+ false );
+ return deserializer;
+ }
+
+ @Bean
+ String typeMappings ()
+ {
+ return
+ "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+ "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
+ }
+
+ @Bean
+ Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ properties.setProperty(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers());
+ return properties;
+ }
+
+ @Bean
+ Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ properties.setProperty(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers());
+ properties.setProperty(
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ "false");
+ properties.setProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ "earliest");
+ return properties;
+ }
+
+ @Bean
+ ZoneId zoneId()
+ {
+ return ZoneId.systemDefault();
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+public class AbstractMessageTo
+{
+ public enum ToType {
+ COMMAND_CREATE_CHATROOM,
+ EVENT_CHATMESSAGE_RECEIVED,
+ }
+
+ @Getter
+ private final ToType type;
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import lombok.*;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class CommandCreateChatRoomTo extends AbstractMessageTo
+{
+ private String name;
+
+
+ public CommandCreateChatRoomTo()
+ {
+ super(ToType.COMMAND_CREATE_CHATROOM);
+ }
+
+
+ public static CommandCreateChatRoomTo of(String name)
+ {
+ CommandCreateChatRoomTo to = new CommandCreateChatRoomTo();
+ to.name = name;
+ return to;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.*;
+
+import java.time.LocalDateTime;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventChatMessageReceivedTo extends AbstractMessageTo
+{
+ private String user;
+ private Long id;
+ private String text;
+
+
+ public EventChatMessageReceivedTo()
+ {
+ super(ToType.EVENT_CHATMESSAGE_RECEIVED);
+ }
+
+
+ public Message toMessage(long offset, LocalDateTime timestamp)
+ {
+ return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
+ }
+
+ public static EventChatMessageReceivedTo from(Message message)
+ {
+ return EventChatMessageReceivedTo.of(
+ message.getUsername(),
+ message.getId(),
+ message.getMessageText());
+ }
+
+
+ public static EventChatMessageReceivedTo of(String user, Long id, String text)
+ {
+ EventChatMessageReceivedTo to = new EventChatMessageReceivedTo();
+ to.user = user;
+ to.id = id;
+ to.text = text;
+ return to;
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import java.util.UUID;
-
-
-public interface ShardingStrategy
-{
- int selectShard(UUID chatRoomId);
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.Message;
-import reactor.core.publisher.Flux;
-
-import java.util.UUID;
-
-
-public interface StorageStrategy
-{
- default void write(ChatHomeService chatHomeService)
- {
- writeChatRoomInfo(
- chatHomeService
- .getChatRoomInfo()
- .doOnNext(chatRoomInfo ->
- writeChatRoomData(
- chatRoomInfo.getId(),
- chatHomeService
- .getChatRoomData(chatRoomInfo.getId())
- .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
- }
-
- void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
- Flux<ChatRoomInfo> readChatRoomInfo();
- void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
- Flux<Message> readChatRoomData(UUID chatRoomId);
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-import java.util.LinkedHashMap;
-
-
-@Slf4j
-public class InMemoryChatRoomService implements ChatRoomService
-{
- private final LinkedHashMap<Message.MessageKey, Message> messages;
-
-
- public InMemoryChatRoomService(Flux<Message> messageFlux)
- {
- log.debug("Creating InMemoryChatRoomService");
- messages = new LinkedHashMap<>();
- messageFlux.subscribe(message -> messages.put(message.getKey(), message));
- }
-
- @Override
- public Mono<Message> persistMessage(
- Message.MessageKey key,
- LocalDateTime timestamp,
- String text)
- {
- Message message = new Message(key, (long)messages.size(), timestamp, text);
- messages.put(message.getKey(), message);
- return Mono.just(message);
- }
-
- @Override
- public Mono<Message> getMessage(Message.MessageKey key)
- {
- return Mono.fromSupplier(() -> messages.get(key));
- }
-
- @Override
- public Flux<Message> getMessages(long first, long last)
- {
- return Flux.fromStream(messages
- .values()
- .stream()
- .filter(message ->
- {
- long serial = message.getSerialNumber();
- return serial >= first && serial <= last;
- }));
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.time.Clock;
-import java.util.stream.IntStream;
-
-
-@ConditionalOnProperty(
- prefix = "chat.backend",
- name = "services",
- havingValue = "inmemory",
- matchIfMissing = true)
-@Configuration
-public class InMemoryServicesConfiguration
-{
- @Bean
- @ConditionalOnProperty(
- prefix = "chat.backend.inmemory",
- name = "sharding-strategy",
- havingValue = "none",
- matchIfMissing = true)
- ChatHomeService noneShardingChatHome(
- ChatBackendProperties properties,
- StorageStrategy storageStrategy,
- Clock clock)
- {
- return new SimpleChatHomeService(
- storageStrategy,
- clock,
- properties.getChatroomBufferSize());
- }
-
- @Bean
- @ConditionalOnProperty(
- prefix = "chat.backend.inmemory",
- name = "sharding-strategy",
- havingValue = "kafkalike")
- ChatHomeService kafkalikeShardingChatHome(
- ChatBackendProperties properties,
- StorageStrategy storageStrategy,
- Clock clock)
- {
- int numShards = properties.getInmemory().getNumShards();
- SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards];
- IntStream
- .of(properties.getInmemory().getOwnedShards())
- .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
- shard,
- storageStrategy,
- clock,
- properties.getChatroomBufferSize()));
- ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
- return new ShardedChatHomeService(chatHomes, strategy);
- }
-
- @ConditionalOnProperty(
- prefix = "chat.backend.inmemory",
- name = "sharding-strategy",
- havingValue = "none",
- matchIfMissing = true)
- @Bean
- ShardingStrategy defaultShardingStrategy()
- {
- return chatRoomId -> 0;
- }
-
- @ConditionalOnProperty(
- prefix = "chat.backend.inmemory",
- name = "sharding-strategy",
- havingValue = "kafkalike")
- @Bean
- ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
- {
- return new KafkaLikeShardingStrategy(
- properties.getInmemory().getNumShards());
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.utils.Utils;
-
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-public class KafkaLikeShardingStrategy implements ShardingStrategy
-{
- private final int numPartitions;
-
- @Override
- public int selectShard(UUID chatRoomId)
- {
- byte[] serializedKey = chatRoomId.toString().getBytes();
- return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-
-@Slf4j
-public class ShardedChatHomeService implements ChatHomeService
-{
- private final SimpleChatHomeService[] chatHomes;
- private final Set<Integer> ownedShards;
- private final ShardingStrategy shardingStrategy;
-
-
- public ShardedChatHomeService(
- SimpleChatHomeService[] chatHomes,
- ShardingStrategy shardingStrategy)
- {
- this.chatHomes = chatHomes;
- this.shardingStrategy = shardingStrategy;
- this.ownedShards = new HashSet<>();
- for (int shard = 0; shard < chatHomes.length; shard++)
- if(chatHomes[shard] != null)
- this.ownedShards.add(shard);
- log.info(
- "Created ShardedChatHome for shards: {}",
- ownedShards
- .stream()
- .map(String::valueOf)
- .collect(Collectors.joining(", ")));
- }
-
-
- @Override
- public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
- {
- int shard = shardingStrategy.selectShard(id);
- return chatHomes[shard] == null
- ? Mono.error(new ShardNotOwnedException(shard))
- : chatHomes[shard].createChatRoom(id, name);
- }
-
- @Override
- public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
- {
- int shard = selectShard(id);
- return chatHomes[shard] == null
- ? Mono.error(new ShardNotOwnedException(shard))
- : chatHomes[shard]
- .getChatRoomInfo(id)
- .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
- ? new UnknownChatroomException(
- id,
- shard,
- ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
- : throwable);
- }
-
- @Override
- public Flux<ChatRoomInfo> getChatRoomInfo()
- {
- return Flux
- .fromIterable(ownedShards)
- .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
- }
-
- @Override
- public Mono<ChatRoomData> getChatRoomData(UUID id)
- {
- int shard = selectShard(id);
- return chatHomes[shard] == null
- ? Mono.error(new ShardNotOwnedException(shard))
- : chatHomes[shard]
- .getChatRoomData(id)
- .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
- ? new UnknownChatroomException(
- id,
- shard,
- ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
- : throwable);
- }
-
- public Flux<ChatRoomData> getChatRoomData()
- {
- return Flux
- .fromIterable(ownedShards)
- .flatMap(shard -> chatHomes[shard].getChatRoomData());
- }
-
-
-
- private int selectShard(UUID chatroomId)
- {
- return shardingStrategy.selectShard(chatroomId);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.util.*;
-
-
-@Slf4j
-public class SimpleChatHomeService implements ChatHomeService
-{
- private final Integer shard;
- private final Map<UUID, ChatRoomInfo> chatRoomInfo;
- private final Map<UUID, ChatRoomData> chatRoomData;
- private final Clock clock;
- private final int bufferSize;
-
-
-
- public SimpleChatHomeService(
- StorageStrategy storageStrategy,
- Clock clock,
- int bufferSize)
- {
- this(
- null,
- storageStrategy,
- clock,
- bufferSize);
- }
-
- public SimpleChatHomeService(
- Integer shard,
- StorageStrategy storageStrategy,
- Clock clock,
- int bufferSize)
- {
- log.info("Created SimpleChatHome for shard {}", shard);
-;
- this.shard = shard;
- this.chatRoomInfo = new HashMap<>();
- this.chatRoomData = new HashMap<>();
- storageStrategy
- .readChatRoomInfo()
- .filter(info ->
- {
- if (shard == null || info.getShard() == shard)
- {
- return true;
- }
- else
- {
- log.info(
- "SimpleChatHome for shard {} ignores not owned chat-room {}",
- shard,
- info);
- return false;
- }
- })
- .toStream()
- .forEach(info ->
- {
- UUID chatRoomId = info.getId();
- chatRoomInfo.put(chatRoomId, info);
- Flux<Message> messageFlux =
- storageStrategy.readChatRoomData(chatRoomId);
- chatRoomData.put(
- info.getId(),
- new ChatRoomData(
- clock,
- new InMemoryChatRoomService(messageFlux),
- bufferSize));
- });
- this.clock = clock;
- this.bufferSize = bufferSize;
- }
-
-
- @Override
- public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
- {
- log.info("Creating ChatRoom with buffer-size {}", bufferSize);
- ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
- ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
- this.chatRoomInfo.put(id, chatRoomInfo);
- ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
- this.chatRoomData.put(id, chatRoomData);
- return Mono.just(chatRoomInfo);
- }
-
- @Override
- public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
- {
- return Mono
- .justOrEmpty(chatRoomInfo.get(id))
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
- }
-
- @Override
- public Flux<ChatRoomInfo> getChatRoomInfo()
- {
- return Flux.fromIterable(chatRoomInfo.values());
- }
-
- @Override
- public Mono<ChatRoomData> getChatRoomData(UUID id)
- {
- return Mono
- .justOrEmpty(chatRoomData.get(id))
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
- }
-
- public Flux<ChatRoomData> getChatRoomData()
- {
- return Flux.fromIterable(chatRoomData.values());
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.*;
-import java.util.*;
-import java.util.stream.IntStream;
-
-
-@Slf4j
-public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
-{
- private final String topic;
- private final Producer<String, AbstractMessageTo> producer;
- private final Consumer<String, AbstractMessageTo> consumer;
- private final ZoneId zoneId;
- private final int numShards;
- private final int bufferSize;
- private final Clock clock;
- private final boolean[] isShardOwned;
- private final long[] currentOffset;
- private final long[] nextOffset;
- private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
- private final Map<UUID, ChatRoomData>[] chatRoomData;
-
- private boolean running;
- @Getter
- private volatile boolean loadInProgress;
-
-
- public ChatRoomChannel(
- String topic,
- Producer<String, AbstractMessageTo> producer,
- Consumer<String, AbstractMessageTo> consumer,
- ZoneId zoneId,
- int numShards,
- int bufferSize,
- Clock clock)
- {
- log.debug(
- "Creating ChatRoomChannel for topic {} with {} partitions",
- topic,
- numShards);
- this.topic = topic;
- this.consumer = consumer;
- this.producer = producer;
- this.zoneId = zoneId;
- this.numShards = numShards;
- this.bufferSize = bufferSize;
- this.clock = clock;
- this.isShardOwned = new boolean[numShards];
- this.currentOffset = new long[numShards];
- this.nextOffset = new long[numShards];
- this.chatRoomInfo = new Map[numShards];
- this.chatRoomData = new Map[numShards];
- IntStream
- .range(0, numShards)
- .forEach(shard ->
- {
- this.chatRoomInfo[shard] = new HashMap<>();
- this.chatRoomData[shard] = new HashMap<>();
- });
- }
-
-
-
- Mono<ChatRoomInfo> sendCreateChatRoomRequest(
- UUID chatRoomId,
- String name)
- {
- CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
- return Mono.create(sink ->
- {
- ProducerRecord<String, AbstractMessageTo> record =
- new ProducerRecord<>(
- topic,
- chatRoomId.toString(),
- createChatRoomRequestTo);
-
- producer.send(record, ((metadata, exception) ->
- {
- if (metadata != null)
- {
- log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
- ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
- createChatRoom(chatRoomInfo);
- sink.success(chatRoomInfo);
- }
- else
- {
- // On send-failure
- log.error(
- "Could not send create-request for chat room (id={}, name={}): {}",
- chatRoomId,
- name,
- exception);
- sink.error(exception);
- }
- }));
- });
- }
-
- Mono<Message> sendChatMessage(
- UUID chatRoomId,
- Message.MessageKey key,
- LocalDateTime timestamp,
- String text)
- {
- ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
- return Mono.create(sink ->
- {
- ProducerRecord<String, AbstractMessageTo> record =
- new ProducerRecord<>(
- topic,
- null,
- zdt.toEpochSecond(),
- chatRoomId.toString(),
- EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
-
- producer.send(record, ((metadata, exception) ->
- {
- if (metadata != null)
- {
- // On successful send
- Message message = new Message(key, metadata.offset(), timestamp, text);
- log.info("Successfully send message {}", message);
- sink.success(message);
- }
- else
- {
- // On send-failure
- log.error(
- "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
- chatRoomId,
- key,
- timestamp,
- text,
- exception);
- sink.error(exception);
- }
- }));
- });
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- log.info("Newly assigned partitions! Pausing normal operations...");
- loadInProgress = true;
-
- consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
- {
- int partition = topicPartition.partition();
- isShardOwned[partition] = true;
- this.currentOffset[partition] = currentOffset;
-
- log.info(
- "Partition assigned: {} - loading messages: next={} -> current={}",
- partition,
- nextOffset[partition],
- currentOffset);
-
- consumer.seek(topicPartition, nextOffset[partition]);
- });
-
- consumer.resume(partitions);
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- partitions.forEach(topicPartition ->
- {
- int partition = topicPartition.partition();
- isShardOwned[partition] = false;
- log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
- });
- }
-
- @Override
- public void onPartitionsLost(Collection<TopicPartition> partitions)
- {
- log.warn("Lost partitions: {}, partitions");
- // TODO: Muss auf den Verlust anders reagiert werden?
- onPartitionsRevoked(partitions);
- }
-
- @Override
- public void run()
- {
- running = true;
-
- while (running)
- {
- try
- {
- ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
- log.info("Fetched {} messages", records.count());
-
- if (loadInProgress)
- {
- loadChatRoom(records);
-
- if (isLoadingCompleted())
- {
- log.info("Loading of messages completed! Pausing all owned partitions...");
- pauseAllOwnedPartions();
- log.info("Resuming normal operations...");
- loadInProgress = false;
- }
- }
- else
- {
- if (!records.isEmpty())
- {
- throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
- }
- }
- }
- catch (WakeupException e)
- {
- log.info("Received WakeupException, exiting!");
- running = false;
- }
- }
-
- log.info("Exiting normally");
- }
-
- private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
- {
- for (ConsumerRecord<String, AbstractMessageTo> record : records)
- {
- UUID chatRoomId = UUID.fromString(record.key());
-
- switch (record.value().getType())
- {
- case COMMAND_CREATE_CHATROOM:
- createChatRoom(
- chatRoomId,
- (CommandCreateChatRoomTo) record.value(),
- record.partition());
- break;
-
- case EVENT_CHATMESSAGE_RECEIVED:
- Instant instant = Instant.ofEpochSecond(record.timestamp());
- LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
- loadChatMessage(
- chatRoomId,
- timestamp,
- record.offset(),
- (EventChatMessageReceivedTo) record.value(),
- record.partition());
- break;
-
- default:
- log.debug(
- "Ignoring message for chat-room {} with offset {}: {}",
- chatRoomId,
- record.offset(),
- record.value());
- }
-
- nextOffset[record.partition()] = record.offset() + 1;
- }
- }
-
- private void createChatRoom(
- UUID chatRoomId,
- CommandCreateChatRoomTo createChatRoomRequestTo,
- Integer partition)
- {
- log.info(
- "Loading ChatRoom {} for shard {} with buffer-size {}",
- chatRoomId,
- partition,
- bufferSize);
- KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
- ChatRoomData chatRoomData = new ChatRoomData(
- clock,
- service,
- bufferSize);
- putChatRoom(
- chatRoomId,
- createChatRoomRequestTo.getName(),
- partition,
- chatRoomData);
- }
-
-
- private void createChatRoom(ChatRoomInfo chatRoomInfo)
- {
- UUID id = chatRoomInfo.getId();
- log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
- KafkaChatRoomService service = new KafkaChatRoomService(this, id);
- ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
- putChatRoom(
- chatRoomInfo.getId(),
- chatRoomInfo.getName(),
- chatRoomInfo.getShard(),
- chatRoomData);
- }
-
- private void loadChatMessage(
- UUID chatRoomId,
- LocalDateTime timestamp,
- long offset,
- EventChatMessageReceivedTo chatMessageTo,
- int partition)
- {
- Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
- Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
-
- ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
- KafkaChatRoomService kafkaChatRoomService =
- (KafkaChatRoomService) chatRoomData.getChatRoomService();
-
- kafkaChatRoomService.persistMessage(message);
- }
-
- private boolean isLoadingCompleted()
- {
- return IntStream
- .range(0, numShards)
- .filter(shard -> isShardOwned[shard])
- .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
- }
-
- private void pauseAllOwnedPartions()
- {
- consumer.pause(IntStream
- .range(0, numShards)
- .filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> new TopicPartition(topic, shard))
- .toList());
- }
-
-
- private void putChatRoom(
- UUID chatRoomId,
- String name,
- Integer partition,
- ChatRoomData chatRoomData)
- {
- if (this.chatRoomInfo[partition].containsKey(chatRoomId))
- {
- log.warn(
- "Ignoring existing chat-room for {}: {}",
- partition,
- chatRoomId);
- }
- else
- {
- log.info(
- "Adding new chat-room to partition {}: {}",
- partition,
- chatRoomData);
-
- this.chatRoomInfo[partition].put(
- chatRoomId,
- new ChatRoomInfo(chatRoomId, name, partition));
- this.chatRoomData[partition].put(chatRoomId, chatRoomData);
- }
- }
-
- int[] getOwnedShards()
- {
- return IntStream
- .range(0, numShards)
- .filter(shard -> isShardOwned[shard])
- .toArray();
- }
-
- Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
- {
- if (loadInProgress)
- {
- return Mono.error(new LoadInProgressException());
- }
-
- if (!isShardOwned[shard])
- {
- return Mono.error(new ShardNotOwnedException(shard));
- }
-
- return Mono.justOrEmpty(chatRoomData[shard].get(id));
- }
-
- Flux<ChatRoomInfo> getChatRoomInfo()
- {
- return Flux
- .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
- .filter(shard -> isShardOwned[shard])
- .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
- }
-
- Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
- {
- if (loadInProgress)
- {
- return Mono.error(new LoadInProgressException());
- }
-
- if (!isShardOwned[shard])
- {
- return Mono.error(new ShardNotOwnedException(shard));
- }
-
- return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
- }
-
- Flux<ChatRoomData> getChatRoomData()
- {
- return Flux
- .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
- .filter(shard -> isShardOwned[shard])
- .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.domain.ChatRoomData;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.utils.Utils;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.*;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KafkaChatHomeService implements ChatHomeService
-{
- private final int numPartitions;
- private final ChatRoomChannel chatRoomChannel;
-
-
-
- @Override
- public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
- {
- log.info("Sending create-command for chat rooom: id={}, name={}");
- return chatRoomChannel.sendCreateChatRoomRequest(id, name);
- }
-
- @Override
- public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
- {
- int shard = selectShard(id);
- return chatRoomChannel
- .getChatRoomInfo(shard, id)
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
- id,
- shard,
- chatRoomChannel.getOwnedShards())));
- }
-
- @Override
- public Flux<ChatRoomInfo> getChatRoomInfo()
- {
- return chatRoomChannel.getChatRoomInfo();
- }
-
- @Override
- public Mono<ChatRoomData> getChatRoomData(UUID id)
- {
- int shard = selectShard(id);
- return chatRoomChannel
- .getChatRoomData(shard, id)
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
- id,
- shard,
- chatRoomChannel.getOwnedShards())));
- }
-
- public Flux<ChatRoomData> getChatRoomData()
- {
- return chatRoomChannel.getChatRoomData();
- }
-
- int selectShard(UUID chatRoomId)
- {
- byte[] serializedKey = chatRoomId.toString().getBytes();
- return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-import java.util.LinkedHashMap;
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KafkaChatRoomService implements ChatRoomService
-{
- private final ChatRoomChannel chatRoomChannel;
- private final UUID chatRoomId;
-
- private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
-
-
- @Override
- public Mono<Message> persistMessage(
- Message.MessageKey key,
- LocalDateTime timestamp,
- String text)
- {
- return chatRoomChannel
- .sendChatMessage(chatRoomId, key, timestamp, text)
- .doOnSuccess(message -> persistMessage(message));
- }
-
- void persistMessage(Message message)
- {
- messages.put (message.getKey(), message);
- }
-
- @Override
- synchronized public Mono<Message> getMessage(Message.MessageKey key)
- {
- return Mono.fromSupplier(() -> messages.get(key));
- }
-
- @Override
- synchronized public Flux<Message> getMessages(long first, long last)
- {
- return Flux.fromStream(messages
- .values()
- .stream()
- .filter(message ->
- {
- long serial = message.getSerialNumber();
- return serial >= first && serial <= last;
- }));
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
-import jakarta.annotation.PreDestroy;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-
-@ConditionalOnProperty(
- prefix = "chat.backend",
- name = "services",
- havingValue = "kafka")
-@Component
-@Slf4j
-public class KafkaServicesApplicationRunner implements ApplicationRunner
-{
- @Autowired
- ChatBackendProperties properties;
-
- @Autowired
- ThreadPoolTaskExecutor taskExecutor;
- @Autowired
- ConfigurableApplicationContext context;
-
- @Autowired
- ChatRoomChannel chatRoomChannel;
- @Autowired
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
-
- CompletableFuture<Void> chatRoomChannelConsumerJob;
-
-
- @Override
- public void run(ApplicationArguments args) throws Exception
- {
- List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
- chatRoomChannelConsumer.subscribe(topics, chatRoomChannel);
- log.info("Starting the consumer for the ChatRoomChannel");
- chatRoomChannelConsumerJob = taskExecutor
- .submitCompletable(chatRoomChannel)
- .exceptionally(e ->
- {
- log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
- return null;
- });
- }
-
- @PreDestroy
- public void joinChatRoomChannelConsumerJob()
- {
- log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
- chatRoomChannelConsumer.wakeup();
- log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
- chatRoomChannelConsumerJob.join();
- log.info("Joined the consumer of the ChatRoomChannel");
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-import java.time.Clock;
-import java.time.ZoneId;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-
-@ConditionalOnProperty(
- prefix = "chat.backend",
- name = "services",
- havingValue = "kafka")
-@Configuration
-public class KafkaServicesConfiguration
-{
- @Bean
- ChatHomeService kafkaChatHome(
- ChatBackendProperties properties,
- ChatRoomChannel chatRoomChannel)
- {
- return new KafkaChatHomeService(
- properties.getKafka().getNumPartitions(),
- chatRoomChannel);
- }
-
- @Bean
- ChatRoomChannel chatRoomChannel(
- ChatBackendProperties properties,
- Producer<String, AbstractMessageTo> chatRoomChannelProducer,
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
- ZoneId zoneId,
- Clock clock)
- {
- return new ChatRoomChannel(
- properties.getKafka().getChatRoomChannelTopic(),
- chatRoomChannelProducer,
- chatRoomChannelConsumer,
- zoneId,
- properties.getKafka().getNumPartitions(),
- properties.getChatroomBufferSize(),
- clock);
- }
-
- @Bean
- Producer<String, AbstractMessageTo> chatRoomChannelProducer(
- Properties defaultProducerProperties,
- ChatBackendProperties chatBackendProperties,
- StringSerializer stringSerializer,
- JsonSerializer<AbstractMessageTo> messageSerializer)
- {
- Map<String, Object> properties = new HashMap<>();
- defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
- properties.put(
- ProducerConfig.CLIENT_ID_CONFIG,
- chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
- return new KafkaProducer<>(
- properties,
- stringSerializer,
- messageSerializer);
- }
-
- @Bean
- StringSerializer stringSerializer()
- {
- return new StringSerializer();
- }
-
- @Bean
- JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
- {
- JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
- serializer.configure(
- Map.of(
- JsonSerializer.TYPE_MAPPINGS, typeMappings),
- false);
- return serializer;
- }
-
- @Bean
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer(
- Properties defaultConsumerProperties,
- ChatBackendProperties chatBackendProperties,
- StringDeserializer stringDeserializer,
- JsonDeserializer<AbstractMessageTo> messageDeserializer)
- {
- Map<String, Object> properties = new HashMap<>();
- defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
- properties.put(
- ConsumerConfig.CLIENT_ID_CONFIG,
- chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
- properties.put(
- ConsumerConfig.GROUP_ID_CONFIG,
- "chatroom_channel");
- return new KafkaConsumer<>(
- properties,
- stringDeserializer,
- messageDeserializer);
- }
-
- @Bean
- StringDeserializer stringDeserializer()
- {
- return new StringDeserializer();
- }
-
- @Bean
- JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
- {
- JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
- deserializer.configure(
- Map.of(
- JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
- JsonDeserializer.TYPE_MAPPINGS, typeMappings),
- false );
- return deserializer;
- }
-
- @Bean
- String typeMappings ()
- {
- return
- "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," +
- "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
- }
-
- @Bean
- Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
- {
- Properties properties = new Properties();
- properties.setProperty(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
- chatBackendProperties.getKafka().getBootstrapServers());
- return properties;
- }
-
- @Bean
- Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
- {
- Properties properties = new Properties();
- properties.setProperty(
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- chatBackendProperties.getKafka().getBootstrapServers());
- properties.setProperty(
- ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
- "false");
- properties.setProperty(
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
- "earliest");
- return properties;
- }
-
- @Bean
- ZoneId zoneId()
- {
- return ZoneId.systemDefault();
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka.messages;
-
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-
-@RequiredArgsConstructor
-public class AbstractMessageTo
-{
- public enum ToType {
- COMMAND_CREATE_CHATROOM,
- EVENT_CHATMESSAGE_RECEIVED,
- }
-
- @Getter
- private final ToType type;
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka.messages;
-
-import lombok.*;
-
-
-@Getter
-@Setter
-@EqualsAndHashCode
-@ToString
-public class CommandCreateChatRoomTo extends AbstractMessageTo
-{
- private String name;
-
-
- public CommandCreateChatRoomTo()
- {
- super(ToType.COMMAND_CREATE_CHATROOM);
- }
-
-
- public static CommandCreateChatRoomTo of(String name)
- {
- CommandCreateChatRoomTo to = new CommandCreateChatRoomTo();
- to.name = name;
- return to;
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka.messages;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.*;
-
-import java.time.LocalDateTime;
-
-
-@Getter
-@Setter
-@EqualsAndHashCode
-@ToString
-public class EventChatMessageReceivedTo extends AbstractMessageTo
-{
- private String user;
- private Long id;
- private String text;
-
-
- public EventChatMessageReceivedTo()
- {
- super(ToType.EVENT_CHATMESSAGE_RECEIVED);
- }
-
-
- public Message toMessage(long offset, LocalDateTime timestamp)
- {
- return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
- }
-
- public static EventChatMessageReceivedTo from(Message message)
- {
- return EventChatMessageReceivedTo.of(
- message.getUsername(),
- message.getId(),
- message.getMessageText());
- }
-
-
- public static EventChatMessageReceivedTo of(String user, Long id, String text)
- {
- EventChatMessageReceivedTo to = new EventChatMessageReceivedTo();
- to.user = user;
- to.id = id;
- to.text = text;
- return to;
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.util.stream.IntStream;
+
+public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest
+{
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ ShardedChatHomeService chatHome(
+ StorageStrategy storageStrategy,
+ Clock clock)
+ {
+ SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[NUM_SHARDS];
+
+ IntStream
+ .of(ownedShards())
+ .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
+ shard,
+ storageStrategy,
+ clock,
+ bufferSize()));
+
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
+
+ return new ShardedChatHomeService(chatHomes, strategy);
+ }
+
+ @Bean
+ public FilesStorageStrategy storageStrategy(Clock clock)
+ {
+ return new FilesStorageStrategy(
+ Paths.get("target", "test-classes", "data", "files"),
+ new KafkaLikeShardingStrategy(NUM_SHARDS),
+ new ObjectMapper());
+ }
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+
+ int[] ownedShards()
+ {
+ return new int[] { OWNED_SHARD };
+ }
+
+ int bufferSize()
+ {
+ return 8;
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceTest;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+
+
+public class SimpleChatHomeServiceTest extends ChatHomeServiceTest
+{
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ SimpleChatHomeService chatHome(
+ StorageStrategy storageStrategy,
+ Clock clock)
+ {
+ return new SimpleChatHomeService(
+ storageStrategy,
+ clock,
+ bufferSize());
+ }
+
+ @Bean
+ public FilesStorageStrategy storageStrategy(Clock clock)
+ {
+ return new FilesStorageStrategy(
+ Paths.get("target", "test-classes", "data", "files"),
+ chatRoomId -> 0,
+ new ObjectMapper());
+ }
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+
+ int bufferSize()
+ {
+ return 8;
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.time.Clock;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
+import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeServiceTest.TOPIC;
+
+
+@SpringBootTest(
+ classes = {
+ KafkaChatHomeServiceTest.KafkaChatHomeTestConfiguration.class,
+ KafkaServicesConfiguration.class,
+ KafkaAutoConfiguration.class,
+ TaskExecutionAutoConfiguration.class,
+ },
+ properties = {
+ "chat.backend.services=kafka",
+ "chat.backend.kafka.client-id-PREFIX=TEST",
+ "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+ "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
+})
+@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@Slf4j
+public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
+{
+ final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
+
+ static CompletableFuture<Void> CONSUMER_JOB;
+
+
+ @TestConfiguration
+ @EnableConfigurationProperties(ChatBackendProperties.class)
+ static class KafkaChatHomeTestConfiguration
+ {
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+ }
+
+
+ @BeforeAll
+ public static void sendAndLoadStoredData(
+ @Autowired KafkaTemplate<String, String> messageTemplate,
+ @Autowired Consumer chatRoomChannelConsumer,
+ @Autowired ThreadPoolTaskExecutor taskExecutor,
+ @Autowired ChatRoomChannel chatRoomChannel)
+ {
+ send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+
+ List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
+ chatRoomChannelConsumer.assign(assignedPartitions);
+ chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+ CONSUMER_JOB = taskExecutor
+ .submitCompletable(chatRoomChannel)
+ .exceptionally(e ->
+ {
+ log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+ return null;
+ });
+ }
+
+ static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+ {
+ ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+ record.headers().add("__TypeId__", typeId.getBytes());
+ SendResult<String, String> result = kafkaTemplate.send(record).join();
+ log.info(
+ "Sent {}={} to {}",
+ key,
+ value,
+ new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
+ }
+
+ @AfterAll
+ static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
+ {
+ log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+ chatRoomChannelConsumer.wakeup();
+ log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+ CONSUMER_JOB.join();
+ log.info("Joined the consumer of the ChatRoomChannel");
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class CommandCreateChatRoomToTest
+{
+ final String json = """
+ {
+ "name": "Foo-Room!"
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class);
+ assertThat(message.getName()).isEqualTo("Foo-Room!");
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class EventChatMessageReceivedToTest
+{
+ final String json = """
+ {
+ "id": 1,
+ "text": "Hallo, ich heiße Peter!",
+ "user": "Peter"
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class);
+ assertThat(message.getId()).isEqualTo(1l);
+ assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
+ assertThat(message.getUser()).isEqualTo("Peter");
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-
-import java.nio.file.Paths;
-import java.time.Clock;
-import java.util.stream.IntStream;
-
-public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest
-{
- @TestConfiguration
- static class Configuration
- {
- @Bean
- ShardedChatHomeService chatHome(
- StorageStrategy storageStrategy,
- Clock clock)
- {
- SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[NUM_SHARDS];
-
- IntStream
- .of(ownedShards())
- .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
- shard,
- storageStrategy,
- clock,
- bufferSize()));
-
- ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
-
- return new ShardedChatHomeService(chatHomes, strategy);
- }
-
- @Bean
- public FilesStorageStrategy storageStrategy(Clock clock)
- {
- return new FilesStorageStrategy(
- Paths.get("target", "test-classes", "data", "files"),
- new KafkaLikeShardingStrategy(NUM_SHARDS),
- new ObjectMapper());
- }
-
- @Bean
- Clock clock()
- {
- return Clock.systemDefaultZone();
- }
-
- int[] ownedShards()
- {
- return new int[] { OWNED_SHARD };
- }
-
- int bufferSize()
- {
- return 8;
- }
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.domain.ChatHomeServiceTest;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-
-import java.nio.file.Paths;
-import java.time.Clock;
-
-
-public class SimpleChatHomeServiceTest extends ChatHomeServiceTest
-{
- @TestConfiguration
- static class Configuration
- {
- @Bean
- SimpleChatHomeService chatHome(
- StorageStrategy storageStrategy,
- Clock clock)
- {
- return new SimpleChatHomeService(
- storageStrategy,
- clock,
- bufferSize());
- }
-
- @Bean
- public FilesStorageStrategy storageStrategy(Clock clock)
- {
- return new FilesStorageStrategy(
- Paths.get("target", "test-classes", "data", "files"),
- chatRoomId -> 0,
- new ObjectMapper());
- }
-
- @Bean
- Clock clock()
- {
- return Clock.systemDefaultZone();
- }
-
- int bufferSize()
- {
- return 8;
- }
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
-import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import java.time.Clock;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
-import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeServiceTest.TOPIC;
-
-
-@SpringBootTest(
- classes = {
- KafkaChatHomeServiceTest.KafkaChatHomeTestConfiguration.class,
- KafkaServicesConfiguration.class,
- KafkaAutoConfiguration.class,
- TaskExecutionAutoConfiguration.class,
- },
- properties = {
- "chat.backend.services=kafka",
- "chat.backend.kafka.client-id-PREFIX=TEST",
- "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
- "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
-})
-@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
-@Slf4j
-public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
-{
- final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
-
- static CompletableFuture<Void> CONSUMER_JOB;
-
-
- @TestConfiguration
- @EnableConfigurationProperties(ChatBackendProperties.class)
- static class KafkaChatHomeTestConfiguration
- {
- @Bean
- Clock clock()
- {
- return Clock.systemDefaultZone();
- }
- }
-
-
- @BeforeAll
- public static void sendAndLoadStoredData(
- @Autowired KafkaTemplate<String, String> messageTemplate,
- @Autowired Consumer chatRoomChannelConsumer,
- @Autowired ThreadPoolTaskExecutor taskExecutor,
- @Autowired ChatRoomChannel chatRoomChannel)
- {
- send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
-
- List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
- chatRoomChannelConsumer.assign(assignedPartitions);
- chatRoomChannel.onPartitionsAssigned(assignedPartitions);
- CONSUMER_JOB = taskExecutor
- .submitCompletable(chatRoomChannel)
- .exceptionally(e ->
- {
- log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
- return null;
- });
- }
-
- static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
- {
- ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
- record.headers().add("__TypeId__", typeId.getBytes());
- SendResult<String, String> result = kafkaTemplate.send(record).join();
- log.info(
- "Sent {}={} to {}",
- key,
- value,
- new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
- }
-
- @AfterAll
- static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
- {
- log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
- chatRoomChannelConsumer.wakeup();
- log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
- CONSUMER_JOB.join();
- log.info("Joined the consumer of the ChatRoomChannel");
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka.messages;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-
-public class CommandCreateChatRoomToTest
-{
- final String json = """
- {
- "name": "Foo-Room!"
- }""";
-
- ObjectMapper mapper;
-
- @BeforeEach
- public void setUp()
- {
- mapper = new ObjectMapper();
- mapper.registerModule(new JavaTimeModule());
- mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
- }
-
- @Test
- public void testDeserialization() throws Exception
- {
- CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class);
- assertThat(message.getName()).isEqualTo("Foo-Room!");
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka.messages;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-
-public class EventChatMessageReceivedToTest
-{
- final String json = """
- {
- "id": 1,
- "text": "Hallo, ich heiße Peter!",
- "user": "Peter"
- }""";
-
- ObjectMapper mapper;
-
- @BeforeEach
- public void setUp()
- {
- mapper = new ObjectMapper();
- mapper.registerModule(new JavaTimeModule());
- mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
- }
-
- @Test
- public void testDeserialization() throws Exception
- {
- EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class);
- assertThat(message.getId()).isEqualTo(1l);
- assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
- assertThat(message.getUser()).isEqualTo("Peter");
- }
-}