From: Kai Moritz Date: Sun, 16 Apr 2023 09:25:26 +0000 (+0200) Subject: NEU X-Git-Tag: kafkadata~20 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=915ed8f85459da3c95f86b6351a3d7129668bc8e;p=demos%2Fkafka%2Fchat NEU --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 177d4f51..4724f6b5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -16,6 +16,7 @@ public class ChatBackendProperties private int chatroomBufferSize = 8; private ServiceType services = ServiceType.inmemory; private InMemoryServicesProperties inmemory = new InMemoryServicesProperties(); + private KafkaServicesProperties kafka = new KafkaServicesProperties(); @Getter @@ -29,6 +30,14 @@ public class ChatBackendProperties private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString(); } + @Getter + @Setter + public static class KafkaServicesProperties + { + private String topic = "test"; + private int numPartitions = 2; + } + public enum ServiceType { inmemory } public enum StorageStrategyType { files, mongodb } public enum ShardingStrategyType { none, kafkalike } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java deleted file mode 100644 index 84484d90..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java +++ /dev/null @@ -1,68 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.Message; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.util.UUID; - - -/** - * Derzeit eigentlich einzige aktive Strategie! - * Rückbau?!?! - */ -@RequiredArgsConstructor -@Slf4j -class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy -{ - private final KafkaChatRoomService kafkaChatRoomService; - private final Producer producer; - private final TopicPartition tp; - private final UUID chatRoomId; - private final ZoneOffset zoneOffset; - - - @Override - public Mono handleMessage( - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - tp.topic(), - tp.partition(), - timestamp.toEpochSecond(zoneOffset), - chatRoomId.toString(), - MessageTo.of(key.getUsername(), key.getMessageId(), text)); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - // On successful send - { - // Emit new message - Message message = new Message(key, metadata.offset(), timestamp, text); - kafkaChatRoomService.addMessage(message); - } - - sink.success(); - } - else - { - // On send-failure - sink.error(exception); - } - })); - }); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 3ca5b7f7..22cd74b7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -20,14 +20,12 @@ import reactor.core.publisher.Mono; import java.time.*; import java.util.*; -import java.util.concurrent.ExecutorService; import java.util.stream.IntStream; @Slf4j public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener { - private final ExecutorService executorService; private final Consumer consumer; private final Producer producer; private final String topic; @@ -36,7 +34,7 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; - private final Map[] chatRoomMaps; + private final Map[] chatrooms; private final KafkaLikeShardingStrategy shardingStrategy; private boolean running; @@ -44,7 +42,6 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer public KafkaChatHomeService( - ExecutorService executorService, Consumer consumer, Producer producer, String topic, @@ -52,7 +49,6 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer int numShards) { log.debug("Creating KafkaChatHomeService"); - this.executorService = executorService; this.consumer = consumer; this.producer = producer; this.topic = topic; @@ -61,7 +57,7 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; - this.chatRoomMaps = new Map[numShards]; + this.chatrooms = new Map[numShards]; this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); } @@ -167,7 +163,7 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer Message message = new Message(key, record.offset(), timestamp, messageTo.getText()); - ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId); + ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId); KafkaChatRoomService kafkaChatRoomService = (KafkaChatRoomService) chatRoom.getChatRoomService(); @@ -241,6 +237,12 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer } + public void putChatRoom(ChatRoom chatRoom) + { + // TODO: Nachricht senden! + chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); + } + @Override public Mono getChatRoom(int shard, UUID id) { @@ -250,7 +252,7 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer } else { - return Mono.justOrEmpty(chatRoomMaps[shard].get(id)); + return Mono.justOrEmpty(chatrooms[shard].get(id)); } } @@ -263,7 +265,7 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer } else { - return Flux.fromStream(chatRoomMaps[shard].values().stream()); + return Flux.fromStream(chatrooms[shard].values().stream()); } } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java index 20d85e80..f81d21f1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -2,16 +2,32 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; +import java.time.Clock; import java.util.UUID; +@RequiredArgsConstructor +@Slf4j public class KafkaChatRoomFactory implements ChatRoomFactory { + private final KafkaChatHomeService kafkaChatHomeService; + private final ShardingStrategy shardingStrategy; + private final Clock clock; + private final int bufferSize; + @Override public Mono createChatRoom(UUID id, String name) { - return null; + log.info("Creating ChatRoom with buffer-size {}", bufferSize); + int shard = shardingStrategy.selectShard(id); + KafkaChatRoomService service = new KafkaChatRoomService(kafkaChatHomeService, id); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + kafkaChatHomeService.putChatRoom(chatRoom); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 3a8c2c6f..16ed3a70 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -1,11 +1,8 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomService; -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.MessageMutationException; -import lombok.RequiredArgsConstructor; +import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerRecord; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java new file mode 100644 index 00000000..fd42d9df --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -0,0 +1,78 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType; +import de.juplo.kafka.chat.backend.domain.ChatHome; +import de.juplo.kafka.chat.backend.domain.ShardedChatHome; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import de.juplo.kafka.chat.backend.domain.SimpleChatHome; +import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.time.Clock; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "inmemory", + matchIfMissing = true) +@Configuration +public class KafkaServicesConfiguration implements ApplicationRunner +{ + @Bean + ChatHome kafkaChatHome( + ChatBackendProperties properties, + InMemoryChatHomeService chatHomeService, + StorageStrategy storageStrategy) + { + int numShards = properties.getInmemory().getNumShards(); + SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; + ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); + return new ShardedChatHome(chatHomes, strategy); + } + + @Bean + KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties) + { + ShardingStrategyType sharding = + properties.getInmemory().getShardingStrategy(); + int numShards = sharding == ShardingStrategyType.none + ? 1 + : properties.getInmemory().getNumShards(); + int[] ownedShards = sharding == ShardingStrategyType.none + ? new int[] { 0 } + : properties.getInmemory().getOwnedShards(); + return new InMemoryChatHomeService( + numShards, + ownedShards, + storageStrategy.read()); + } + + @Bean + InMemoryChatRoomFactory chatRoomFactory( + InMemoryChatHomeService service, + ShardingStrategy strategy, + Clock clock, + ChatBackendProperties properties) + { + return new InMemoryChatRoomFactory( + service, + strategy, + clock, + properties.getChatroomBufferSize()); + } + + @Bean + ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties) + { + return new KafkaLikeShardingStrategy( + properties.getKafka().getNumPartitions()); + } +}