From: Kai Moritz Date: Fri, 18 Aug 2023 14:19:22 +0000 (+0200) Subject: NG - kompiliert! X-Git-Tag: rebase--2023-08-18-abends~6 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=40e558dd182fa405ab724a8127b5d7754d3216e5;p=demos%2Fkafka%2Fchat NG - kompiliert! --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 71893624..77790bd6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -21,15 +21,6 @@ public class KafkaChatHome implements ChatHome private final ChatMessageChannel chatMessageChanel; - public KafkaChatHome( - int numPartitions, - ChatMessageChannel chatMessageChannel) - { - this.shardingStrategy = new KafkaLikeShardingStrategy(numPartitions); - this.chatMessageChanel = chatMessageChannel; - } - - @Override public Mono getChatRoom(UUID id) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 9e1f75e7..4ebed8df 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -36,35 +36,16 @@ public class KafkaServicesConfiguration { @Bean ChatHome kafkaChatHome( - ShardingStrategy shardingStrategy, + KafkaLikeShardingStrategy shardingStrategy, ChatMessageChannel chatMessageChannel) { return new KafkaChatHome(shardingStrategy, chatMessageChannel); } @Bean - KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel) + KafkaChatRoomFactory chatRoomFactory(ChatMessageChannel chatMessageChannel) { - return new KafkaChatRoomFactory(chatRoomChannel); - } - - @Bean - ChatRoomChannel chatRoomChannel( - ChatBackendProperties properties, - Producer chatRoomChannelProducer, - Consumer chatRoomChannelConsumer, - ShardingStrategy shardingStrategy, - ChatMessageChannel chatMessageChannel, - Clock clock) - { - return new ChatRoomChannel( - properties.getKafka().getChatroomChannelTopic(), - chatRoomChannelProducer, - chatRoomChannelConsumer, - shardingStrategy, - chatMessageChannel, - clock, - properties.getChatroomBufferSize()); + return new KafkaChatRoomFactory(chatMessageChannel); } @Bean @@ -86,63 +67,7 @@ public class KafkaServicesConfiguration } @Bean - IntegerSerializer integerSerializer() - { - return new IntegerSerializer(); - } - - @Bean - JsonSerializer chatRoomSerializer() - { - JsonSerializer serializer = new JsonSerializer<>(); - serializer.configure( - Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), - false); - return serializer; - } - - @Bean - Consumer chatRoomChannelConsumer( - Properties defaultConsumerProperties, - ChatBackendProperties chatBackendProperties, - IntegerDeserializer integerDeserializer, - JsonDeserializer chatRoomDeserializer) - { - Map properties = new HashMap<>(); - defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); - properties.put( - ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); - properties.put( - ConsumerConfig.GROUP_ID_CONFIG, - "chat_room_channel"); - return new KafkaConsumer<>( - properties, - integerDeserializer, - chatRoomDeserializer); - } - - @Bean - IntegerDeserializer integerDeserializer() - { - return new IntegerDeserializer(); - } - - @Bean - JsonDeserializer chatRoomDeserializer() - { - JsonDeserializer deserializer = new JsonDeserializer<>(); - deserializer.configure( - Map.of( - JsonDeserializer.USE_TYPE_INFO_HEADERS, false, - JsonDeserializer.VALUE_DEFAULT_TYPE, CreateChatRoomRequestTo.class, - JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), - false ); - return deserializer; - } - - @Bean - ShardingStrategy shardingStrategy(ChatBackendProperties properties) + KafkaLikeShardingStrategy shardingStrategy(ChatBackendProperties properties) { return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions()); } @@ -152,14 +77,17 @@ public class KafkaServicesConfiguration ChatBackendProperties properties, Producer chatMessageChannelProducer, Consumer chatMessageChannelConsumer, - ZoneId zoneId) + ZoneId zoneId, + Clock clock) { return new ChatMessageChannel( properties.getKafka().getMessageChannelTopic(), chatMessageChannelProducer, chatMessageChannelConsumer, zoneId, - properties.getKafka().getNumPartitions()); + properties.getKafka().getNumPartitions(), + properties.getChatroomBufferSize(), + clock); } @Bean