From: Kai Moritz Date: Wed, 6 Sep 2023 21:48:39 +0000 (+0200) Subject: refactor: Renamed `persistence` into `implementation` - Moved classes X-Git-Tag: rebase--2023-09-13--21-01~21 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=477b86622b0beeb59849abaecfe3247980a34743;p=demos%2Fkafka%2Fchat refactor: Renamed `persistence` into `implementation` - Moved classes --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/ShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/ShardingStrategy.java new file mode 100644 index 00000000..f7350f74 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/ShardingStrategy.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.chat.backend.persistence; + +import java.util.UUID; + + +public interface ShardingStrategy +{ + int selectShard(UUID chatRoomId); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java new file mode 100644 index 00000000..cbf55a4c --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java @@ -0,0 +1,30 @@ +package de.juplo.kafka.chat.backend.persistence; + +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; +import reactor.core.publisher.Flux; + +import java.util.UUID; + + +public interface StorageStrategy +{ + default void write(ChatHomeService chatHomeService) + { + writeChatRoomInfo( + chatHomeService + .getChatRoomInfo() + .doOnNext(chatRoomInfo -> + writeChatRoomData( + chatRoomInfo.getId(), + chatHomeService + .getChatRoomData(chatRoomInfo.getId()) + .flatMapMany(chatRoomData -> chatRoomData.getMessages())))); + } + + void writeChatRoomInfo(Flux chatRoomInfoFlux); + Flux readChatRoomInfo(); + void writeChatRoomData(UUID chatRoomId, Flux messageFlux); + Flux readChatRoomData(UUID chatRoomId); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatRoomService.java new file mode 100644 index 00000000..e1d5a5e3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatRoomService.java @@ -0,0 +1,55 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.util.LinkedHashMap; + + +@Slf4j +public class InMemoryChatRoomService implements ChatRoomService +{ + private final LinkedHashMap messages; + + + public InMemoryChatRoomService(Flux messageFlux) + { + log.debug("Creating InMemoryChatRoomService"); + messages = new LinkedHashMap<>(); + messageFlux.subscribe(message -> messages.put(message.getKey(), message)); + } + + @Override + public Mono persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + Message message = new Message(key, (long)messages.size(), timestamp, text); + messages.put(message.getKey(), message); + return Mono.just(message); + } + + @Override + public Mono getMessage(Message.MessageKey key) + { + return Mono.fromSupplier(() -> messages.get(key)); + } + + @Override + public Flux getMessages(long first, long last) + { + return Flux.fromStream(messages + .values() + .stream() + .filter(message -> + { + long serial = message.getSerialNumber(); + return serial >= first && serial <= last; + })); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java new file mode 100644 index 00000000..263a2d5f --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java @@ -0,0 +1,84 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.time.Clock; +import java.util.stream.IntStream; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "inmemory", + matchIfMissing = true) +@Configuration +public class InMemoryServicesConfiguration +{ + @Bean + @ConditionalOnProperty( + prefix = "chat.backend.inmemory", + name = "sharding-strategy", + havingValue = "none", + matchIfMissing = true) + ChatHomeService noneShardingChatHome( + ChatBackendProperties properties, + StorageStrategy storageStrategy, + Clock clock) + { + return new SimpleChatHomeService( + storageStrategy, + clock, + properties.getChatroomBufferSize()); + } + + @Bean + @ConditionalOnProperty( + prefix = "chat.backend.inmemory", + name = "sharding-strategy", + havingValue = "kafkalike") + ChatHomeService kafkalikeShardingChatHome( + ChatBackendProperties properties, + StorageStrategy storageStrategy, + Clock clock) + { + int numShards = properties.getInmemory().getNumShards(); + SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards]; + IntStream + .of(properties.getInmemory().getOwnedShards()) + .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService( + shard, + storageStrategy, + clock, + properties.getChatroomBufferSize())); + ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); + return new ShardedChatHomeService(chatHomes, strategy); + } + + @ConditionalOnProperty( + prefix = "chat.backend.inmemory", + name = "sharding-strategy", + havingValue = "none", + matchIfMissing = true) + @Bean + ShardingStrategy defaultShardingStrategy() + { + return chatRoomId -> 0; + } + + @ConditionalOnProperty( + prefix = "chat.backend.inmemory", + name = "sharding-strategy", + havingValue = "kafkalike") + @Bean + ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties) + { + return new KafkaLikeShardingStrategy( + properties.getInmemory().getNumShards()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/KafkaLikeShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/KafkaLikeShardingStrategy.java new file mode 100644 index 00000000..e20dab71 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/KafkaLikeShardingStrategy.java @@ -0,0 +1,21 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.utils.Utils; + +import java.util.UUID; + + +@RequiredArgsConstructor +public class KafkaLikeShardingStrategy implements ShardingStrategy +{ + private final int numPartitions; + + @Override + public int selectShard(UUID chatRoomId) + { + byte[] serializedKey = chatRoomId.toString().getBytes(); + return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java new file mode 100644 index 00000000..09e4684d --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java @@ -0,0 +1,106 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; +import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + + +@Slf4j +public class ShardedChatHomeService implements ChatHomeService +{ + private final SimpleChatHomeService[] chatHomes; + private final Set ownedShards; + private final ShardingStrategy shardingStrategy; + + + public ShardedChatHomeService( + SimpleChatHomeService[] chatHomes, + ShardingStrategy shardingStrategy) + { + this.chatHomes = chatHomes; + this.shardingStrategy = shardingStrategy; + this.ownedShards = new HashSet<>(); + for (int shard = 0; shard < chatHomes.length; shard++) + if(chatHomes[shard] != null) + this.ownedShards.add(shard); + log.info( + "Created ShardedChatHome for shards: {}", + ownedShards + .stream() + .map(String::valueOf) + .collect(Collectors.joining(", "))); + } + + + @Override + public Mono createChatRoom(UUID id, String name) + { + int shard = shardingStrategy.selectShard(id); + return chatHomes[shard] == null + ? Mono.error(new ShardNotOwnedException(shard)) + : chatHomes[shard].createChatRoom(id, name); + } + + @Override + public Mono getChatRoomInfo(UUID id) + { + int shard = selectShard(id); + return chatHomes[shard] == null + ? Mono.error(new ShardNotOwnedException(shard)) + : chatHomes[shard] + .getChatRoomInfo(id) + .onErrorMap(throwable -> throwable instanceof UnknownChatroomException + ? new UnknownChatroomException( + id, + shard, + ownedShards.stream().mapToInt(i -> i.intValue()).toArray()) + : throwable); + } + + @Override + public Flux getChatRoomInfo() + { + return Flux + .fromIterable(ownedShards) + .flatMap(shard -> chatHomes[shard].getChatRoomInfo()); + } + + @Override + public Mono getChatRoomData(UUID id) + { + int shard = selectShard(id); + return chatHomes[shard] == null + ? Mono.error(new ShardNotOwnedException(shard)) + : chatHomes[shard] + .getChatRoomData(id) + .onErrorMap(throwable -> throwable instanceof UnknownChatroomException + ? new UnknownChatroomException( + id, + shard, + ownedShards.stream().mapToInt(i -> i.intValue()).toArray()) + : throwable); + } + + public Flux getChatRoomData() + { + return Flux + .fromIterable(ownedShards) + .flatMap(shard -> chatHomes[shard].getChatRoomData()); + } + + + + private int selectShard(UUID chatroomId) + { + return shardingStrategy.selectShard(chatroomId); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java new file mode 100644 index 00000000..d9c31546 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -0,0 +1,122 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Clock; +import java.util.*; + + +@Slf4j +public class SimpleChatHomeService implements ChatHomeService +{ + private final Integer shard; + private final Map chatRoomInfo; + private final Map chatRoomData; + private final Clock clock; + private final int bufferSize; + + + + public SimpleChatHomeService( + StorageStrategy storageStrategy, + Clock clock, + int bufferSize) + { + this( + null, + storageStrategy, + clock, + bufferSize); + } + + public SimpleChatHomeService( + Integer shard, + StorageStrategy storageStrategy, + Clock clock, + int bufferSize) + { + log.info("Created SimpleChatHome for shard {}", shard); +; + this.shard = shard; + this.chatRoomInfo = new HashMap<>(); + this.chatRoomData = new HashMap<>(); + storageStrategy + .readChatRoomInfo() + .filter(info -> + { + if (shard == null || info.getShard() == shard) + { + return true; + } + else + { + log.info( + "SimpleChatHome for shard {} ignores not owned chat-room {}", + shard, + info); + return false; + } + }) + .toStream() + .forEach(info -> + { + UUID chatRoomId = info.getId(); + chatRoomInfo.put(chatRoomId, info); + Flux messageFlux = + storageStrategy.readChatRoomData(chatRoomId); + chatRoomData.put( + info.getId(), + new ChatRoomData( + clock, + new InMemoryChatRoomService(messageFlux), + bufferSize)); + }); + this.clock = clock; + this.bufferSize = bufferSize; + } + + + @Override + public Mono createChatRoom(UUID id, String name) + { + log.info("Creating ChatRoom with buffer-size {}", bufferSize); + ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); + this.chatRoomInfo.put(id, chatRoomInfo); + ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); + this.chatRoomData.put(id, chatRoomData); + return Mono.just(chatRoomInfo); + } + + @Override + public Mono getChatRoomInfo(UUID id) + { + return Mono + .justOrEmpty(chatRoomInfo.get(id)) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } + + @Override + public Flux getChatRoomInfo() + { + return Flux.fromIterable(chatRoomInfo.values()); + } + + @Override + public Mono getChatRoomData(UUID id) + { + return Mono + .justOrEmpty(chatRoomData.get(id)) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } + + public Flux getChatRoomData() + { + return Flux.fromIterable(chatRoomData.values()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChatRoomChannel.java new file mode 100644 index 00000000..7e95c648 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChatRoomChannel.java @@ -0,0 +1,436 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; +import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.*; +import java.util.*; +import java.util.stream.IntStream; + + +@Slf4j +public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener +{ + private final String topic; + private final Producer producer; + private final Consumer consumer; + private final ZoneId zoneId; + private final int numShards; + private final int bufferSize; + private final Clock clock; + private final boolean[] isShardOwned; + private final long[] currentOffset; + private final long[] nextOffset; + private final Map[] chatRoomInfo; + private final Map[] chatRoomData; + + private boolean running; + @Getter + private volatile boolean loadInProgress; + + + public ChatRoomChannel( + String topic, + Producer producer, + Consumer consumer, + ZoneId zoneId, + int numShards, + int bufferSize, + Clock clock) + { + log.debug( + "Creating ChatRoomChannel for topic {} with {} partitions", + topic, + numShards); + this.topic = topic; + this.consumer = consumer; + this.producer = producer; + this.zoneId = zoneId; + this.numShards = numShards; + this.bufferSize = bufferSize; + this.clock = clock; + this.isShardOwned = new boolean[numShards]; + this.currentOffset = new long[numShards]; + this.nextOffset = new long[numShards]; + this.chatRoomInfo = new Map[numShards]; + this.chatRoomData = new Map[numShards]; + IntStream + .range(0, numShards) + .forEach(shard -> + { + this.chatRoomInfo[shard] = new HashMap<>(); + this.chatRoomData[shard] = new HashMap<>(); + }); + } + + + + Mono sendCreateChatRoomRequest( + UUID chatRoomId, + String name) + { + CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + chatRoomId.toString(), + createChatRoomRequestTo); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); + createChatRoom(chatRoomInfo); + sink.success(chatRoomInfo); + } + else + { + // On send-failure + log.error( + "Could not send create-request for chat room (id={}, name={}): {}", + chatRoomId, + name, + exception); + sink.error(exception); + } + })); + }); + } + + Mono sendChatMessage( + UUID chatRoomId, + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + null, + zdt.toEpochSecond(), + chatRoomId.toString(), + EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + // On successful send + Message message = new Message(key, metadata.offset(), timestamp, text); + log.info("Successfully send message {}", message); + sink.success(message); + } + else + { + // On send-failure + log.error( + "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", + chatRoomId, + key, + timestamp, + text, + exception); + sink.error(exception); + } + })); + }); + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("Newly assigned partitions! Pausing normal operations..."); + loadInProgress = true; + + consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = true; + this.currentOffset[partition] = currentOffset; + + log.info( + "Partition assigned: {} - loading messages: next={} -> current={}", + partition, + nextOffset[partition], + currentOffset); + + consumer.seek(topicPartition, nextOffset[partition]); + }); + + consumer.resume(partitions); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(topicPartition -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = false; + log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + }); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.warn("Lost partitions: {}, partitions"); + // TODO: Muss auf den Verlust anders reagiert werden? + onPartitionsRevoked(partitions); + } + + @Override + public void run() + { + running = true; + + while (running) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + log.info("Fetched {} messages", records.count()); + + if (loadInProgress) + { + loadChatRoom(records); + + if (isLoadingCompleted()) + { + log.info("Loading of messages completed! Pausing all owned partitions..."); + pauseAllOwnedPartions(); + log.info("Resuming normal operations..."); + loadInProgress = false; + } + } + else + { + if (!records.isEmpty()) + { + throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); + } + } + } + catch (WakeupException e) + { + log.info("Received WakeupException, exiting!"); + running = false; + } + } + + log.info("Exiting normally"); + } + + private void loadChatRoom(ConsumerRecords records) + { + for (ConsumerRecord record : records) + { + UUID chatRoomId = UUID.fromString(record.key()); + + switch (record.value().getType()) + { + case COMMAND_CREATE_CHATROOM: + createChatRoom( + chatRoomId, + (CommandCreateChatRoomTo) record.value(), + record.partition()); + break; + + case EVENT_CHATMESSAGE_RECEIVED: + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + loadChatMessage( + chatRoomId, + timestamp, + record.offset(), + (EventChatMessageReceivedTo) record.value(), + record.partition()); + break; + + default: + log.debug( + "Ignoring message for chat-room {} with offset {}: {}", + chatRoomId, + record.offset(), + record.value()); + } + + nextOffset[record.partition()] = record.offset() + 1; + } + } + + private void createChatRoom( + UUID chatRoomId, + CommandCreateChatRoomTo createChatRoomRequestTo, + Integer partition) + { + log.info( + "Loading ChatRoom {} for shard {} with buffer-size {}", + chatRoomId, + partition, + bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); + ChatRoomData chatRoomData = new ChatRoomData( + clock, + service, + bufferSize); + putChatRoom( + chatRoomId, + createChatRoomRequestTo.getName(), + partition, + chatRoomData); + } + + + private void createChatRoom(ChatRoomInfo chatRoomInfo) + { + UUID id = chatRoomInfo.getId(); + log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, id); + ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); + putChatRoom( + chatRoomInfo.getId(), + chatRoomInfo.getName(), + chatRoomInfo.getShard(), + chatRoomData); + } + + private void loadChatMessage( + UUID chatRoomId, + LocalDateTime timestamp, + long offset, + EventChatMessageReceivedTo chatMessageTo, + int partition) + { + Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); + Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); + + ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); + KafkaChatRoomService kafkaChatRoomService = + (KafkaChatRoomService) chatRoomData.getChatRoomService(); + + kafkaChatRoomService.persistMessage(message); + } + + private boolean isLoadingCompleted() + { + return IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); + } + + private void pauseAllOwnedPartions() + { + consumer.pause(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> new TopicPartition(topic, shard)) + .toList()); + } + + + private void putChatRoom( + UUID chatRoomId, + String name, + Integer partition, + ChatRoomData chatRoomData) + { + if (this.chatRoomInfo[partition].containsKey(chatRoomId)) + { + log.warn( + "Ignoring existing chat-room for {}: {}", + partition, + chatRoomId); + } + else + { + log.info( + "Adding new chat-room to partition {}: {}", + partition, + chatRoomData); + + this.chatRoomInfo[partition].put( + chatRoomId, + new ChatRoomInfo(chatRoomId, name, partition)); + this.chatRoomData[partition].put(chatRoomId, chatRoomData); + } + } + + int[] getOwnedShards() + { + return IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .toArray(); + } + + Mono getChatRoomData(int shard, UUID id) + { + if (loadInProgress) + { + return Mono.error(new LoadInProgressException()); + } + + if (!isShardOwned[shard]) + { + return Mono.error(new ShardNotOwnedException(shard)); + } + + return Mono.justOrEmpty(chatRoomData[shard].get(id)); + } + + Flux getChatRoomInfo() + { + return Flux + .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) + .filter(shard -> isShardOwned[shard]) + .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); + } + + Mono getChatRoomInfo(int shard, UUID id) + { + if (loadInProgress) + { + return Mono.error(new LoadInProgressException()); + } + + if (!isShardOwned[shard]) + { + return Mono.error(new ShardNotOwnedException(shard)); + } + + return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); + } + + Flux getChatRoomData() + { + return Flux + .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) + .filter(shard -> isShardOwned[shard]) + .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values())); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java new file mode 100644 index 00000000..d3321d71 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java @@ -0,0 +1,72 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ChatRoomData; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.utils.Utils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.*; + + +@RequiredArgsConstructor +@Slf4j +public class KafkaChatHomeService implements ChatHomeService +{ + private final int numPartitions; + private final ChatRoomChannel chatRoomChannel; + + + + @Override + public Mono createChatRoom(UUID id, String name) + { + log.info("Sending create-command for chat rooom: id={}, name={}"); + return chatRoomChannel.sendCreateChatRoomRequest(id, name); + } + + @Override + public Mono getChatRoomInfo(UUID id) + { + int shard = selectShard(id); + return chatRoomChannel + .getChatRoomInfo(shard, id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( + id, + shard, + chatRoomChannel.getOwnedShards()))); + } + + @Override + public Flux getChatRoomInfo() + { + return chatRoomChannel.getChatRoomInfo(); + } + + @Override + public Mono getChatRoomData(UUID id) + { + int shard = selectShard(id); + return chatRoomChannel + .getChatRoomData(shard, id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( + id, + shard, + chatRoomChannel.getOwnedShards()))); + } + + public Flux getChatRoomData() + { + return chatRoomChannel.getChatRoomData(); + } + + int selectShard(UUID chatRoomId) + { + byte[] serializedKey = chatRoomId.toString().getBytes(); + return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatRoomService.java new file mode 100644 index 00000000..7b2cc0b1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatRoomService.java @@ -0,0 +1,58 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.util.LinkedHashMap; +import java.util.UUID; + + +@RequiredArgsConstructor +@Slf4j +public class KafkaChatRoomService implements ChatRoomService +{ + private final ChatRoomChannel chatRoomChannel; + private final UUID chatRoomId; + + private final LinkedHashMap messages = new LinkedHashMap<>(); + + + @Override + public Mono persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + return chatRoomChannel + .sendChatMessage(chatRoomId, key, timestamp, text) + .doOnSuccess(message -> persistMessage(message)); + } + + void persistMessage(Message message) + { + messages.put (message.getKey(), message); + } + + @Override + synchronized public Mono getMessage(Message.MessageKey key) + { + return Mono.fromSupplier(() -> messages.get(key)); + } + + @Override + synchronized public Flux getMessages(long first, long last) + { + return Flux.fromStream(messages + .values() + .stream() + .filter(message -> + { + long serial = message.getSerialNumber(); + return serial >= first && serial <= last; + })); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java new file mode 100644 index 00000000..8474239b --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -0,0 +1,68 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "kafka") +@Component +@Slf4j +public class KafkaServicesApplicationRunner implements ApplicationRunner +{ + @Autowired + ChatBackendProperties properties; + + @Autowired + ThreadPoolTaskExecutor taskExecutor; + @Autowired + ConfigurableApplicationContext context; + + @Autowired + ChatRoomChannel chatRoomChannel; + @Autowired + Consumer chatRoomChannelConsumer; + + CompletableFuture chatRoomChannelConsumerJob; + + + @Override + public void run(ApplicationArguments args) throws Exception + { + List topics = List.of(properties.getKafka().getChatRoomChannelTopic()); + chatRoomChannelConsumer.subscribe(topics, chatRoomChannel); + log.info("Starting the consumer for the ChatRoomChannel"); + chatRoomChannelConsumerJob = taskExecutor + .submitCompletable(chatRoomChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + return null; + }); + } + + @PreDestroy + public void joinChatRoomChannelConsumerJob() + { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatRoomChannelConsumer.wakeup(); + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + chatRoomChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java new file mode 100644 index 00000000..c4479ce5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -0,0 +1,177 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.time.Clock; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "kafka") +@Configuration +public class KafkaServicesConfiguration +{ + @Bean + ChatHomeService kafkaChatHome( + ChatBackendProperties properties, + ChatRoomChannel chatRoomChannel) + { + return new KafkaChatHomeService( + properties.getKafka().getNumPartitions(), + chatRoomChannel); + } + + @Bean + ChatRoomChannel chatRoomChannel( + ChatBackendProperties properties, + Producer chatRoomChannelProducer, + Consumer chatRoomChannelConsumer, + ZoneId zoneId, + Clock clock) + { + return new ChatRoomChannel( + properties.getKafka().getChatRoomChannelTopic(), + chatRoomChannelProducer, + chatRoomChannelConsumer, + zoneId, + properties.getKafka().getNumPartitions(), + properties.getChatroomBufferSize(), + clock); + } + + @Bean + Producer chatRoomChannelProducer( + Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, + StringSerializer stringSerializer, + JsonSerializer messageSerializer) + { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); + return new KafkaProducer<>( + properties, + stringSerializer, + messageSerializer); + } + + @Bean + StringSerializer stringSerializer() + { + return new StringSerializer(); + } + + @Bean + JsonSerializer chatMessageSerializer(String typeMappings) + { + JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of( + JsonSerializer.TYPE_MAPPINGS, typeMappings), + false); + return serializer; + } + + @Bean + Consumer chatRoomChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + StringDeserializer stringDeserializer, + JsonDeserializer messageDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "chatroom_channel"); + return new KafkaConsumer<>( + properties, + stringDeserializer, + messageDeserializer); + } + + @Bean + StringDeserializer stringDeserializer() + { + return new StringDeserializer(); + } + + @Bean + JsonDeserializer chatMessageDeserializer(String typeMappings) + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(), + JsonDeserializer.TYPE_MAPPINGS, typeMappings), + false ); + return deserializer; + } + + @Bean + String typeMappings () + { + return + "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + + "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName(); + } + + @Bean + Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + properties.setProperty( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + return properties; + } + + @Bean + Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + properties.setProperty( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + properties.setProperty( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + "false"); + properties.setProperty( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest"); + return properties; + } + + @Bean + ZoneId zoneId() + { + return ZoneId.systemDefault(); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java new file mode 100644 index 00000000..7cc7541e --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java @@ -0,0 +1,18 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +public class AbstractMessageTo +{ + public enum ToType { + COMMAND_CREATE_CHATROOM, + EVENT_CHATMESSAGE_RECEIVED, + } + + @Getter + private final ToType type; +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomTo.java new file mode 100644 index 00000000..1a134f31 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomTo.java @@ -0,0 +1,27 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + +import lombok.*; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class CommandCreateChatRoomTo extends AbstractMessageTo +{ + private String name; + + + public CommandCreateChatRoomTo() + { + super(ToType.COMMAND_CREATE_CHATROOM); + } + + + public static CommandCreateChatRoomTo of(String name) + { + CommandCreateChatRoomTo to = new CommandCreateChatRoomTo(); + to.name = name; + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedTo.java new file mode 100644 index 00000000..2297b949 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedTo.java @@ -0,0 +1,48 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.*; + +import java.time.LocalDateTime; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class EventChatMessageReceivedTo extends AbstractMessageTo +{ + private String user; + private Long id; + private String text; + + + public EventChatMessageReceivedTo() + { + super(ToType.EVENT_CHATMESSAGE_RECEIVED); + } + + + public Message toMessage(long offset, LocalDateTime timestamp) + { + return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); + } + + public static EventChatMessageReceivedTo from(Message message) + { + return EventChatMessageReceivedTo.of( + message.getUsername(), + message.getId(), + message.getMessageText()); + } + + + public static EventChatMessageReceivedTo of(String user, Long id, String text) + { + EventChatMessageReceivedTo to = new EventChatMessageReceivedTo(); + to.user = user; + to.id = id; + to.text = text; + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/ShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/ShardingStrategy.java deleted file mode 100644 index f7350f74..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/ShardingStrategy.java +++ /dev/null @@ -1,9 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence; - -import java.util.UUID; - - -public interface ShardingStrategy -{ - int selectShard(UUID chatRoomId); -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java deleted file mode 100644 index cbf55a4c..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java +++ /dev/null @@ -1,30 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence; - -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.Message; -import reactor.core.publisher.Flux; - -import java.util.UUID; - - -public interface StorageStrategy -{ - default void write(ChatHomeService chatHomeService) - { - writeChatRoomInfo( - chatHomeService - .getChatRoomInfo() - .doOnNext(chatRoomInfo -> - writeChatRoomData( - chatRoomInfo.getId(), - chatHomeService - .getChatRoomData(chatRoomInfo.getId()) - .flatMapMany(chatRoomData -> chatRoomData.getMessages())))); - } - - void writeChatRoomInfo(Flux chatRoomInfoFlux); - Flux readChatRoomInfo(); - void writeChatRoomData(UUID chatRoomId, Flux messageFlux); - Flux readChatRoomData(UUID chatRoomId); -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java deleted file mode 100644 index e1d5a5e3..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java +++ /dev/null @@ -1,55 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.ChatRoomService; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; -import java.util.LinkedHashMap; - - -@Slf4j -public class InMemoryChatRoomService implements ChatRoomService -{ - private final LinkedHashMap messages; - - - public InMemoryChatRoomService(Flux messageFlux) - { - log.debug("Creating InMemoryChatRoomService"); - messages = new LinkedHashMap<>(); - messageFlux.subscribe(message -> messages.put(message.getKey(), message)); - } - - @Override - public Mono persistMessage( - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - Message message = new Message(key, (long)messages.size(), timestamp, text); - messages.put(message.getKey(), message); - return Mono.just(message); - } - - @Override - public Mono getMessage(Message.MessageKey key) - { - return Mono.fromSupplier(() -> messages.get(key)); - } - - @Override - public Flux getMessages(long first, long last) - { - return Flux.fromStream(messages - .values() - .stream() - .filter(message -> - { - long serial = message.getSerialNumber(); - return serial >= first && serial <= last; - })); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java deleted file mode 100644 index 263a2d5f..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ /dev/null @@ -1,84 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.time.Clock; -import java.util.stream.IntStream; - - -@ConditionalOnProperty( - prefix = "chat.backend", - name = "services", - havingValue = "inmemory", - matchIfMissing = true) -@Configuration -public class InMemoryServicesConfiguration -{ - @Bean - @ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "sharding-strategy", - havingValue = "none", - matchIfMissing = true) - ChatHomeService noneShardingChatHome( - ChatBackendProperties properties, - StorageStrategy storageStrategy, - Clock clock) - { - return new SimpleChatHomeService( - storageStrategy, - clock, - properties.getChatroomBufferSize()); - } - - @Bean - @ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "sharding-strategy", - havingValue = "kafkalike") - ChatHomeService kafkalikeShardingChatHome( - ChatBackendProperties properties, - StorageStrategy storageStrategy, - Clock clock) - { - int numShards = properties.getInmemory().getNumShards(); - SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards]; - IntStream - .of(properties.getInmemory().getOwnedShards()) - .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService( - shard, - storageStrategy, - clock, - properties.getChatroomBufferSize())); - ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); - return new ShardedChatHomeService(chatHomes, strategy); - } - - @ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "sharding-strategy", - havingValue = "none", - matchIfMissing = true) - @Bean - ShardingStrategy defaultShardingStrategy() - { - return chatRoomId -> 0; - } - - @ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "sharding-strategy", - havingValue = "kafkalike") - @Bean - ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties) - { - return new KafkaLikeShardingStrategy( - properties.getInmemory().getNumShards()); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/KafkaLikeShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/KafkaLikeShardingStrategy.java deleted file mode 100644 index e20dab71..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/KafkaLikeShardingStrategy.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.common.utils.Utils; - -import java.util.UUID; - - -@RequiredArgsConstructor -public class KafkaLikeShardingStrategy implements ShardingStrategy -{ - private final int numPartitions; - - @Override - public int selectShard(UUID chatRoomId) - { - byte[] serializedKey = chatRoomId.toString().getBytes(); - return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeService.java deleted file mode 100644 index 09e4684d..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeService.java +++ /dev/null @@ -1,106 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; -import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; - - -@Slf4j -public class ShardedChatHomeService implements ChatHomeService -{ - private final SimpleChatHomeService[] chatHomes; - private final Set ownedShards; - private final ShardingStrategy shardingStrategy; - - - public ShardedChatHomeService( - SimpleChatHomeService[] chatHomes, - ShardingStrategy shardingStrategy) - { - this.chatHomes = chatHomes; - this.shardingStrategy = shardingStrategy; - this.ownedShards = new HashSet<>(); - for (int shard = 0; shard < chatHomes.length; shard++) - if(chatHomes[shard] != null) - this.ownedShards.add(shard); - log.info( - "Created ShardedChatHome for shards: {}", - ownedShards - .stream() - .map(String::valueOf) - .collect(Collectors.joining(", "))); - } - - - @Override - public Mono createChatRoom(UUID id, String name) - { - int shard = shardingStrategy.selectShard(id); - return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) - : chatHomes[shard].createChatRoom(id, name); - } - - @Override - public Mono getChatRoomInfo(UUID id) - { - int shard = selectShard(id); - return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) - : chatHomes[shard] - .getChatRoomInfo(id) - .onErrorMap(throwable -> throwable instanceof UnknownChatroomException - ? new UnknownChatroomException( - id, - shard, - ownedShards.stream().mapToInt(i -> i.intValue()).toArray()) - : throwable); - } - - @Override - public Flux getChatRoomInfo() - { - return Flux - .fromIterable(ownedShards) - .flatMap(shard -> chatHomes[shard].getChatRoomInfo()); - } - - @Override - public Mono getChatRoomData(UUID id) - { - int shard = selectShard(id); - return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) - : chatHomes[shard] - .getChatRoomData(id) - .onErrorMap(throwable -> throwable instanceof UnknownChatroomException - ? new UnknownChatroomException( - id, - shard, - ownedShards.stream().mapToInt(i -> i.intValue()).toArray()) - : throwable); - } - - public Flux getChatRoomData() - { - return Flux - .fromIterable(ownedShards) - .flatMap(shard -> chatHomes[shard].getChatRoomData()); - } - - - - private int selectShard(UUID chatroomId) - { - return shardingStrategy.selectShard(chatroomId); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeService.java deleted file mode 100644 index d9c31546..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeService.java +++ /dev/null @@ -1,122 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.Clock; -import java.util.*; - - -@Slf4j -public class SimpleChatHomeService implements ChatHomeService -{ - private final Integer shard; - private final Map chatRoomInfo; - private final Map chatRoomData; - private final Clock clock; - private final int bufferSize; - - - - public SimpleChatHomeService( - StorageStrategy storageStrategy, - Clock clock, - int bufferSize) - { - this( - null, - storageStrategy, - clock, - bufferSize); - } - - public SimpleChatHomeService( - Integer shard, - StorageStrategy storageStrategy, - Clock clock, - int bufferSize) - { - log.info("Created SimpleChatHome for shard {}", shard); -; - this.shard = shard; - this.chatRoomInfo = new HashMap<>(); - this.chatRoomData = new HashMap<>(); - storageStrategy - .readChatRoomInfo() - .filter(info -> - { - if (shard == null || info.getShard() == shard) - { - return true; - } - else - { - log.info( - "SimpleChatHome for shard {} ignores not owned chat-room {}", - shard, - info); - return false; - } - }) - .toStream() - .forEach(info -> - { - UUID chatRoomId = info.getId(); - chatRoomInfo.put(chatRoomId, info); - Flux messageFlux = - storageStrategy.readChatRoomData(chatRoomId); - chatRoomData.put( - info.getId(), - new ChatRoomData( - clock, - new InMemoryChatRoomService(messageFlux), - bufferSize)); - }); - this.clock = clock; - this.bufferSize = bufferSize; - } - - - @Override - public Mono createChatRoom(UUID id, String name) - { - log.info("Creating ChatRoom with buffer-size {}", bufferSize); - ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); - this.chatRoomInfo.put(id, chatRoomInfo); - ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); - this.chatRoomData.put(id, chatRoomData); - return Mono.just(chatRoomInfo); - } - - @Override - public Mono getChatRoomInfo(UUID id) - { - return Mono - .justOrEmpty(chatRoomInfo.get(id)) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); - } - - @Override - public Flux getChatRoomInfo() - { - return Flux.fromIterable(chatRoomInfo.values()); - } - - @Override - public Mono getChatRoomData(UUID id) - { - return Mono - .justOrEmpty(chatRoomData.get(id)) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); - } - - public Flux getChatRoomData() - { - return Flux.fromIterable(chatRoomData.values()); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java deleted file mode 100644 index 7e95c648..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ /dev/null @@ -1,436 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; -import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; -import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.*; -import java.util.*; -import java.util.stream.IntStream; - - -@Slf4j -public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener -{ - private final String topic; - private final Producer producer; - private final Consumer consumer; - private final ZoneId zoneId; - private final int numShards; - private final int bufferSize; - private final Clock clock; - private final boolean[] isShardOwned; - private final long[] currentOffset; - private final long[] nextOffset; - private final Map[] chatRoomInfo; - private final Map[] chatRoomData; - - private boolean running; - @Getter - private volatile boolean loadInProgress; - - - public ChatRoomChannel( - String topic, - Producer producer, - Consumer consumer, - ZoneId zoneId, - int numShards, - int bufferSize, - Clock clock) - { - log.debug( - "Creating ChatRoomChannel for topic {} with {} partitions", - topic, - numShards); - this.topic = topic; - this.consumer = consumer; - this.producer = producer; - this.zoneId = zoneId; - this.numShards = numShards; - this.bufferSize = bufferSize; - this.clock = clock; - this.isShardOwned = new boolean[numShards]; - this.currentOffset = new long[numShards]; - this.nextOffset = new long[numShards]; - this.chatRoomInfo = new Map[numShards]; - this.chatRoomData = new Map[numShards]; - IntStream - .range(0, numShards) - .forEach(shard -> - { - this.chatRoomInfo[shard] = new HashMap<>(); - this.chatRoomData[shard] = new HashMap<>(); - }); - } - - - - Mono sendCreateChatRoomRequest( - UUID chatRoomId, - String name) - { - CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - chatRoomId.toString(), - createChatRoomRequestTo); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); - ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); - createChatRoom(chatRoomInfo); - sink.success(chatRoomInfo); - } - else - { - // On send-failure - log.error( - "Could not send create-request for chat room (id={}, name={}): {}", - chatRoomId, - name, - exception); - sink.error(exception); - } - })); - }); - } - - Mono sendChatMessage( - UUID chatRoomId, - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - null, - zdt.toEpochSecond(), - chatRoomId.toString(), - EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - // On successful send - Message message = new Message(key, metadata.offset(), timestamp, text); - log.info("Successfully send message {}", message); - sink.success(message); - } - else - { - // On send-failure - log.error( - "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", - chatRoomId, - key, - timestamp, - text, - exception); - sink.error(exception); - } - })); - }); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - log.info("Newly assigned partitions! Pausing normal operations..."); - loadInProgress = true; - - consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = true; - this.currentOffset[partition] = currentOffset; - - log.info( - "Partition assigned: {} - loading messages: next={} -> current={}", - partition, - nextOffset[partition], - currentOffset); - - consumer.seek(topicPartition, nextOffset[partition]); - }); - - consumer.resume(partitions); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(topicPartition -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = false; - log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); - }); - } - - @Override - public void onPartitionsLost(Collection partitions) - { - log.warn("Lost partitions: {}, partitions"); - // TODO: Muss auf den Verlust anders reagiert werden? - onPartitionsRevoked(partitions); - } - - @Override - public void run() - { - running = true; - - while (running) - { - try - { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); - log.info("Fetched {} messages", records.count()); - - if (loadInProgress) - { - loadChatRoom(records); - - if (isLoadingCompleted()) - { - log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); - loadInProgress = false; - } - } - else - { - if (!records.isEmpty()) - { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); - } - } - } - catch (WakeupException e) - { - log.info("Received WakeupException, exiting!"); - running = false; - } - } - - log.info("Exiting normally"); - } - - private void loadChatRoom(ConsumerRecords records) - { - for (ConsumerRecord record : records) - { - UUID chatRoomId = UUID.fromString(record.key()); - - switch (record.value().getType()) - { - case COMMAND_CREATE_CHATROOM: - createChatRoom( - chatRoomId, - (CommandCreateChatRoomTo) record.value(), - record.partition()); - break; - - case EVENT_CHATMESSAGE_RECEIVED: - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); - loadChatMessage( - chatRoomId, - timestamp, - record.offset(), - (EventChatMessageReceivedTo) record.value(), - record.partition()); - break; - - default: - log.debug( - "Ignoring message for chat-room {} with offset {}: {}", - chatRoomId, - record.offset(), - record.value()); - } - - nextOffset[record.partition()] = record.offset() + 1; - } - } - - private void createChatRoom( - UUID chatRoomId, - CommandCreateChatRoomTo createChatRoomRequestTo, - Integer partition) - { - log.info( - "Loading ChatRoom {} for shard {} with buffer-size {}", - chatRoomId, - partition, - bufferSize); - KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); - ChatRoomData chatRoomData = new ChatRoomData( - clock, - service, - bufferSize); - putChatRoom( - chatRoomId, - createChatRoomRequestTo.getName(), - partition, - chatRoomData); - } - - - private void createChatRoom(ChatRoomInfo chatRoomInfo) - { - UUID id = chatRoomInfo.getId(); - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatRoomService service = new KafkaChatRoomService(this, id); - ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); - putChatRoom( - chatRoomInfo.getId(), - chatRoomInfo.getName(), - chatRoomInfo.getShard(), - chatRoomData); - } - - private void loadChatMessage( - UUID chatRoomId, - LocalDateTime timestamp, - long offset, - EventChatMessageReceivedTo chatMessageTo, - int partition) - { - Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); - Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - - ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); - KafkaChatRoomService kafkaChatRoomService = - (KafkaChatRoomService) chatRoomData.getChatRoomService(); - - kafkaChatRoomService.persistMessage(message); - } - - private boolean isLoadingCompleted() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); - } - - private void pauseAllOwnedPartions() - { - consumer.pause(IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> new TopicPartition(topic, shard)) - .toList()); - } - - - private void putChatRoom( - UUID chatRoomId, - String name, - Integer partition, - ChatRoomData chatRoomData) - { - if (this.chatRoomInfo[partition].containsKey(chatRoomId)) - { - log.warn( - "Ignoring existing chat-room for {}: {}", - partition, - chatRoomId); - } - else - { - log.info( - "Adding new chat-room to partition {}: {}", - partition, - chatRoomData); - - this.chatRoomInfo[partition].put( - chatRoomId, - new ChatRoomInfo(chatRoomId, name, partition)); - this.chatRoomData[partition].put(chatRoomId, chatRoomData); - } - } - - int[] getOwnedShards() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .toArray(); - } - - Mono getChatRoomData(int shard, UUID id) - { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } - - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); - } - - return Mono.justOrEmpty(chatRoomData[shard].get(id)); - } - - Flux getChatRoomInfo() - { - return Flux - .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) - .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); - } - - Mono getChatRoomInfo(int shard, UUID id) - { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } - - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); - } - - return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); - } - - Flux getChatRoomData() - { - return Flux - .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) - .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values())); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java deleted file mode 100644 index d3321d71..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ /dev/null @@ -1,72 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.domain.ChatRoomData; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.utils.Utils; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.*; - - -@RequiredArgsConstructor -@Slf4j -public class KafkaChatHomeService implements ChatHomeService -{ - private final int numPartitions; - private final ChatRoomChannel chatRoomChannel; - - - - @Override - public Mono createChatRoom(UUID id, String name) - { - log.info("Sending create-command for chat rooom: id={}, name={}"); - return chatRoomChannel.sendCreateChatRoomRequest(id, name); - } - - @Override - public Mono getChatRoomInfo(UUID id) - { - int shard = selectShard(id); - return chatRoomChannel - .getChatRoomInfo(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( - id, - shard, - chatRoomChannel.getOwnedShards()))); - } - - @Override - public Flux getChatRoomInfo() - { - return chatRoomChannel.getChatRoomInfo(); - } - - @Override - public Mono getChatRoomData(UUID id) - { - int shard = selectShard(id); - return chatRoomChannel - .getChatRoomData(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( - id, - shard, - chatRoomChannel.getOwnedShards()))); - } - - public Flux getChatRoomData() - { - return chatRoomChannel.getChatRoomData(); - } - - int selectShard(UUID chatRoomId) - { - byte[] serializedKey = chatRoomId.toString().getBytes(); - return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java deleted file mode 100644 index 7b2cc0b1..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ /dev/null @@ -1,58 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.ChatRoomService; -import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; -import java.util.LinkedHashMap; -import java.util.UUID; - - -@RequiredArgsConstructor -@Slf4j -public class KafkaChatRoomService implements ChatRoomService -{ - private final ChatRoomChannel chatRoomChannel; - private final UUID chatRoomId; - - private final LinkedHashMap messages = new LinkedHashMap<>(); - - - @Override - public Mono persistMessage( - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - return chatRoomChannel - .sendChatMessage(chatRoomId, key, timestamp, text) - .doOnSuccess(message -> persistMessage(message)); - } - - void persistMessage(Message message) - { - messages.put (message.getKey(), message); - } - - @Override - synchronized public Mono getMessage(Message.MessageKey key) - { - return Mono.fromSupplier(() -> messages.get(key)); - } - - @Override - synchronized public Flux getMessages(long first, long last) - { - return Flux.fromStream(messages - .values() - .stream() - .filter(message -> - { - long serial = message.getSerialNumber(); - return serial >= first && serial <= last; - })); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java deleted file mode 100644 index 8474239b..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ /dev/null @@ -1,68 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; -import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; - -import java.util.List; -import java.util.concurrent.CompletableFuture; - - -@ConditionalOnProperty( - prefix = "chat.backend", - name = "services", - havingValue = "kafka") -@Component -@Slf4j -public class KafkaServicesApplicationRunner implements ApplicationRunner -{ - @Autowired - ChatBackendProperties properties; - - @Autowired - ThreadPoolTaskExecutor taskExecutor; - @Autowired - ConfigurableApplicationContext context; - - @Autowired - ChatRoomChannel chatRoomChannel; - @Autowired - Consumer chatRoomChannelConsumer; - - CompletableFuture chatRoomChannelConsumerJob; - - - @Override - public void run(ApplicationArguments args) throws Exception - { - List topics = List.of(properties.getKafka().getChatRoomChannelTopic()); - chatRoomChannelConsumer.subscribe(topics, chatRoomChannel); - log.info("Starting the consumer for the ChatRoomChannel"); - chatRoomChannelConsumerJob = taskExecutor - .submitCompletable(chatRoomChannel) - .exceptionally(e -> - { - log.error("The consumer for the ChatRoomChannel exited abnormally!", e); - return null; - }); - } - - @PreDestroy - public void joinChatRoomChannelConsumerJob() - { - log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - chatRoomChannelConsumer.wakeup(); - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - chatRoomChannelConsumerJob.join(); - log.info("Joined the consumer of the ChatRoomChannel"); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java deleted file mode 100644 index c4479ce5..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ /dev/null @@ -1,177 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerializer; - -import java.time.Clock; -import java.time.ZoneId; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - - -@ConditionalOnProperty( - prefix = "chat.backend", - name = "services", - havingValue = "kafka") -@Configuration -public class KafkaServicesConfiguration -{ - @Bean - ChatHomeService kafkaChatHome( - ChatBackendProperties properties, - ChatRoomChannel chatRoomChannel) - { - return new KafkaChatHomeService( - properties.getKafka().getNumPartitions(), - chatRoomChannel); - } - - @Bean - ChatRoomChannel chatRoomChannel( - ChatBackendProperties properties, - Producer chatRoomChannelProducer, - Consumer chatRoomChannelConsumer, - ZoneId zoneId, - Clock clock) - { - return new ChatRoomChannel( - properties.getKafka().getChatRoomChannelTopic(), - chatRoomChannelProducer, - chatRoomChannelConsumer, - zoneId, - properties.getKafka().getNumPartitions(), - properties.getChatroomBufferSize(), - clock); - } - - @Bean - Producer chatRoomChannelProducer( - Properties defaultProducerProperties, - ChatBackendProperties chatBackendProperties, - StringSerializer stringSerializer, - JsonSerializer messageSerializer) - { - Map properties = new HashMap<>(); - defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); - properties.put( - ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); - return new KafkaProducer<>( - properties, - stringSerializer, - messageSerializer); - } - - @Bean - StringSerializer stringSerializer() - { - return new StringSerializer(); - } - - @Bean - JsonSerializer chatMessageSerializer(String typeMappings) - { - JsonSerializer serializer = new JsonSerializer<>(); - serializer.configure( - Map.of( - JsonSerializer.TYPE_MAPPINGS, typeMappings), - false); - return serializer; - } - - @Bean - Consumer chatRoomChannelConsumer( - Properties defaultConsumerProperties, - ChatBackendProperties chatBackendProperties, - StringDeserializer stringDeserializer, - JsonDeserializer messageDeserializer) - { - Map properties = new HashMap<>(); - defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); - properties.put( - ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); - properties.put( - ConsumerConfig.GROUP_ID_CONFIG, - "chatroom_channel"); - return new KafkaConsumer<>( - properties, - stringDeserializer, - messageDeserializer); - } - - @Bean - StringDeserializer stringDeserializer() - { - return new StringDeserializer(); - } - - @Bean - JsonDeserializer chatMessageDeserializer(String typeMappings) - { - JsonDeserializer deserializer = new JsonDeserializer<>(); - deserializer.configure( - Map.of( - JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(), - JsonDeserializer.TYPE_MAPPINGS, typeMappings), - false ); - return deserializer; - } - - @Bean - String typeMappings () - { - return - "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + - "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName(); - } - - @Bean - Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) - { - Properties properties = new Properties(); - properties.setProperty( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - chatBackendProperties.getKafka().getBootstrapServers()); - return properties; - } - - @Bean - Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties) - { - Properties properties = new Properties(); - properties.setProperty( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - chatBackendProperties.getKafka().getBootstrapServers()); - properties.setProperty( - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - "false"); - properties.setProperty( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - "earliest"); - return properties; - } - - @Bean - ZoneId zoneId() - { - return ZoneId.systemDefault(); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java deleted file mode 100644 index 7cc7541e..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java +++ /dev/null @@ -1,18 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka.messages; - - -import lombok.Getter; -import lombok.RequiredArgsConstructor; - - -@RequiredArgsConstructor -public class AbstractMessageTo -{ - public enum ToType { - COMMAND_CREATE_CHATROOM, - EVENT_CHATMESSAGE_RECEIVED, - } - - @Getter - private final ToType type; -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java deleted file mode 100644 index 1a134f31..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java +++ /dev/null @@ -1,27 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka.messages; - -import lombok.*; - - -@Getter -@Setter -@EqualsAndHashCode -@ToString -public class CommandCreateChatRoomTo extends AbstractMessageTo -{ - private String name; - - - public CommandCreateChatRoomTo() - { - super(ToType.COMMAND_CREATE_CHATROOM); - } - - - public static CommandCreateChatRoomTo of(String name) - { - CommandCreateChatRoomTo to = new CommandCreateChatRoomTo(); - to.name = name; - return to; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java deleted file mode 100644 index 2297b949..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java +++ /dev/null @@ -1,48 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka.messages; - -import de.juplo.kafka.chat.backend.domain.Message; -import lombok.*; - -import java.time.LocalDateTime; - - -@Getter -@Setter -@EqualsAndHashCode -@ToString -public class EventChatMessageReceivedTo extends AbstractMessageTo -{ - private String user; - private Long id; - private String text; - - - public EventChatMessageReceivedTo() - { - super(ToType.EVENT_CHATMESSAGE_RECEIVED); - } - - - public Message toMessage(long offset, LocalDateTime timestamp) - { - return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); - } - - public static EventChatMessageReceivedTo from(Message message) - { - return EventChatMessageReceivedTo.of( - message.getUsername(), - message.getId(), - message.getMessageText()); - } - - - public static EventChatMessageReceivedTo of(String user, Long id, String text) - { - EventChatMessageReceivedTo to = new EventChatMessageReceivedTo(); - to.user = user; - to.id = id; - to.text = text; - return to; - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java new file mode 100644 index 00000000..4c922ce6 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java @@ -0,0 +1,65 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest; +import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; + +import java.nio.file.Paths; +import java.time.Clock; +import java.util.stream.IntStream; + +public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest +{ + @TestConfiguration + static class Configuration + { + @Bean + ShardedChatHomeService chatHome( + StorageStrategy storageStrategy, + Clock clock) + { + SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[NUM_SHARDS]; + + IntStream + .of(ownedShards()) + .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService( + shard, + storageStrategy, + clock, + bufferSize())); + + ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS); + + return new ShardedChatHomeService(chatHomes, strategy); + } + + @Bean + public FilesStorageStrategy storageStrategy(Clock clock) + { + return new FilesStorageStrategy( + Paths.get("target", "test-classes", "data", "files"), + new KafkaLikeShardingStrategy(NUM_SHARDS), + new ObjectMapper()); + } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + + int[] ownedShards() + { + return new int[] { OWNED_SHARD }; + } + + int bufferSize() + { + return 8; + } + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeServiceTest.java new file mode 100644 index 00000000..fd5a83b8 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeServiceTest.java @@ -0,0 +1,50 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.chat.backend.domain.ChatHomeServiceTest; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; + +import java.nio.file.Paths; +import java.time.Clock; + + +public class SimpleChatHomeServiceTest extends ChatHomeServiceTest +{ + @TestConfiguration + static class Configuration + { + @Bean + SimpleChatHomeService chatHome( + StorageStrategy storageStrategy, + Clock clock) + { + return new SimpleChatHomeService( + storageStrategy, + clock, + bufferSize()); + } + + @Bean + public FilesStorageStrategy storageStrategy(Clock clock) + { + return new FilesStorageStrategy( + Paths.get("target", "test-classes", "data", "files"), + chatRoomId -> 0, + new ObjectMapper()); + } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + + int bufferSize() + { + return 8; + } + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java new file mode 100644 index 00000000..3633aa5f --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -0,0 +1,113 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.time.Clock; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS; +import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeServiceTest.TOPIC; + + +@SpringBootTest( + classes = { + KafkaChatHomeServiceTest.KafkaChatHomeTestConfiguration.class, + KafkaServicesConfiguration.class, + KafkaAutoConfiguration.class, + TaskExecutionAutoConfiguration.class, + }, + properties = { + "chat.backend.services=kafka", + "chat.backend.kafka.client-id-PREFIX=TEST", + "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "chat.backend.kafka.chatroom-channel-topic=" + TOPIC, + "chat.backend.kafka.num-partitions=" + NUM_SHARDS, +}) +@EmbeddedKafka(topics = { TOPIC }, partitions = 10) +@Slf4j +public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest +{ + final static String TOPIC = "KAFKA_CHAT_HOME_TEST"; + + static CompletableFuture CONSUMER_JOB; + + + @TestConfiguration + @EnableConfigurationProperties(ChatBackendProperties.class) + static class KafkaChatHomeTestConfiguration + { + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + } + + + @BeforeAll + public static void sendAndLoadStoredData( + @Autowired KafkaTemplate messageTemplate, + @Autowired Consumer chatRoomChannelConsumer, + @Autowired ThreadPoolTaskExecutor taskExecutor, + @Autowired ChatRoomChannel chatRoomChannel) + { + send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); + + List assignedPartitions = List.of(new TopicPartition(TOPIC, 2)); + chatRoomChannelConsumer.assign(assignedPartitions); + chatRoomChannel.onPartitionsAssigned(assignedPartitions); + CONSUMER_JOB = taskExecutor + .submitCompletable(chatRoomChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + return null; + }); + } + + static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) + { + ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); + record.headers().add("__TypeId__", typeId.getBytes()); + SendResult result = kafkaTemplate.send(record).join(); + log.info( + "Sent {}={} to {}", + key, + value, + new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); + } + + @AfterAll + static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer) + { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatRoomChannelConsumer.wakeup(); + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + CONSUMER_JOB.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomToTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomToTest.java new file mode 100644 index 00000000..5ef12efe --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomToTest.java @@ -0,0 +1,35 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class CommandCreateChatRoomToTest +{ + final String json = """ + { + "name": "Foo-Room!" + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class); + assertThat(message.getName()).isEqualTo("Foo-Room!"); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedToTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedToTest.java new file mode 100644 index 00000000..33a78277 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedToTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class EventChatMessageReceivedToTest +{ + final String json = """ + { + "id": 1, + "text": "Hallo, ich heiße Peter!", + "user": "Peter" + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class); + assertThat(message.getId()).isEqualTo(1l); + assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); + assertThat(message.getUser()).isEqualTo("Peter"); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeServiceTest.java deleted file mode 100644 index 4c922ce6..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeServiceTest.java +++ /dev/null @@ -1,65 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest; -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; - -import java.nio.file.Paths; -import java.time.Clock; -import java.util.stream.IntStream; - -public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest -{ - @TestConfiguration - static class Configuration - { - @Bean - ShardedChatHomeService chatHome( - StorageStrategy storageStrategy, - Clock clock) - { - SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[NUM_SHARDS]; - - IntStream - .of(ownedShards()) - .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService( - shard, - storageStrategy, - clock, - bufferSize())); - - ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS); - - return new ShardedChatHomeService(chatHomes, strategy); - } - - @Bean - public FilesStorageStrategy storageStrategy(Clock clock) - { - return new FilesStorageStrategy( - Paths.get("target", "test-classes", "data", "files"), - new KafkaLikeShardingStrategy(NUM_SHARDS), - new ObjectMapper()); - } - - @Bean - Clock clock() - { - return Clock.systemDefaultZone(); - } - - int[] ownedShards() - { - return new int[] { OWNED_SHARD }; - } - - int bufferSize() - { - return 8; - } - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeServiceTest.java deleted file mode 100644 index fd5a83b8..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeServiceTest.java +++ /dev/null @@ -1,50 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.chat.backend.domain.ChatHomeServiceTest; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; - -import java.nio.file.Paths; -import java.time.Clock; - - -public class SimpleChatHomeServiceTest extends ChatHomeServiceTest -{ - @TestConfiguration - static class Configuration - { - @Bean - SimpleChatHomeService chatHome( - StorageStrategy storageStrategy, - Clock clock) - { - return new SimpleChatHomeService( - storageStrategy, - clock, - bufferSize()); - } - - @Bean - public FilesStorageStrategy storageStrategy(Clock clock) - { - return new FilesStorageStrategy( - Paths.get("target", "test-classes", "data", "files"), - chatRoomId -> 0, - new ObjectMapper()); - } - - @Bean - Clock clock() - { - return Clock.systemDefaultZone(); - } - - int bufferSize() - { - return 8; - } - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeServiceTest.java deleted file mode 100644 index 3633aa5f..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeServiceTest.java +++ /dev/null @@ -1,113 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; -import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.SendResult; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -import java.time.Clock; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS; -import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeServiceTest.TOPIC; - - -@SpringBootTest( - classes = { - KafkaChatHomeServiceTest.KafkaChatHomeTestConfiguration.class, - KafkaServicesConfiguration.class, - KafkaAutoConfiguration.class, - TaskExecutionAutoConfiguration.class, - }, - properties = { - "chat.backend.services=kafka", - "chat.backend.kafka.client-id-PREFIX=TEST", - "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "chat.backend.kafka.chatroom-channel-topic=" + TOPIC, - "chat.backend.kafka.num-partitions=" + NUM_SHARDS, -}) -@EmbeddedKafka(topics = { TOPIC }, partitions = 10) -@Slf4j -public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest -{ - final static String TOPIC = "KAFKA_CHAT_HOME_TEST"; - - static CompletableFuture CONSUMER_JOB; - - - @TestConfiguration - @EnableConfigurationProperties(ChatBackendProperties.class) - static class KafkaChatHomeTestConfiguration - { - @Bean - Clock clock() - { - return Clock.systemDefaultZone(); - } - } - - - @BeforeAll - public static void sendAndLoadStoredData( - @Autowired KafkaTemplate messageTemplate, - @Autowired Consumer chatRoomChannelConsumer, - @Autowired ThreadPoolTaskExecutor taskExecutor, - @Autowired ChatRoomChannel chatRoomChannel) - { - send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - - List assignedPartitions = List.of(new TopicPartition(TOPIC, 2)); - chatRoomChannelConsumer.assign(assignedPartitions); - chatRoomChannel.onPartitionsAssigned(assignedPartitions); - CONSUMER_JOB = taskExecutor - .submitCompletable(chatRoomChannel) - .exceptionally(e -> - { - log.error("The consumer for the ChatRoomChannel exited abnormally!", e); - return null; - }); - } - - static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) - { - ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); - record.headers().add("__TypeId__", typeId.getBytes()); - SendResult result = kafkaTemplate.send(record).join(); - log.info( - "Sent {}={} to {}", - key, - value, - new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); - } - - @AfterAll - static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer) - { - log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - chatRoomChannelConsumer.wakeup(); - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - CONSUMER_JOB.join(); - log.info("Joined the consumer of the ChatRoomChannel"); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java deleted file mode 100644 index 5ef12efe..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java +++ /dev/null @@ -1,35 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka.messages; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; - - -public class CommandCreateChatRoomToTest -{ - final String json = """ - { - "name": "Foo-Room!" - }"""; - - ObjectMapper mapper; - - @BeforeEach - public void setUp() - { - mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - } - - @Test - public void testDeserialization() throws Exception - { - CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class); - assertThat(message.getName()).isEqualTo("Foo-Room!"); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java deleted file mode 100644 index 33a78277..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java +++ /dev/null @@ -1,39 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka.messages; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; - - -public class EventChatMessageReceivedToTest -{ - final String json = """ - { - "id": 1, - "text": "Hallo, ich heiße Peter!", - "user": "Peter" - }"""; - - ObjectMapper mapper; - - @BeforeEach - public void setUp() - { - mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - } - - @Test - public void testDeserialization() throws Exception - { - EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class); - assertThat(message.getId()).isEqualTo(1l); - assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); - assertThat(message.getUser()).isEqualTo("Peter"); - } -}