NEU
authorKai Moritz <kai@juplo.de>
Thu, 20 Apr 2023 15:36:11 +0000 (17:36 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 20 Apr 2023 15:36:11 +0000 (17:36 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java [deleted file]

index 43ea399..1925cc8 100644 (file)
@@ -154,7 +154,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
   @Override
   public void run()
   {
-    consumer.subscribe(List.of(topic));
+    consumer.subscribe(List.of(topic), this);
 
     running = true;
 
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java
new file mode 100644 (file)
index 0000000..4350779
--- /dev/null
@@ -0,0 +1,270 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+
+@ConditionalOnProperty(
+    prefix = "chat.backend",
+    name = "services",
+    havingValue = "kafka")
+@Configuration
+public class KafkaServicesConfiguration
+{
+  @Bean
+  ChatHome kafkaChatHome(
+      ShardingStrategy shardingStrategy,
+      ChatMessageChannel chatMessageChannel)
+  {
+    return new KafkaChatHome(shardingStrategy, chatMessageChannel);
+  }
+
+  @Bean
+  KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
+  {
+    return new KafkaChatRoomFactory(chatRoomChannel);
+  }
+
+  @Bean
+  ChatRoomChannel chatRoomChannel(
+      ChatBackendProperties properties,
+      Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
+      Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
+      ShardingStrategy shardingStrategy,
+      ChatMessageChannel chatMessageChannel,
+      Clock clock)
+  {
+    return new ChatRoomChannel(
+        properties.getKafka().getChatroomChannelTopic(),
+        chatRoomChannelProducer,
+        chatRoomChannelConsumer,
+        shardingStrategy,
+        chatMessageChannel,
+        clock,
+        properties.getChatroomBufferSize());
+  }
+
+  @Bean
+  Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
+      Properties defaultProducerProperties,
+      ChatBackendProperties chatBackendProperties,
+      IntegerSerializer integerSerializer,
+      JsonSerializer<ChatRoomTo> chatRoomSerializer)
+  {
+    Map<String, Object> properties = new HashMap<>();
+    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ProducerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
+    return new KafkaProducer<>(
+        properties,
+        integerSerializer,
+        chatRoomSerializer);
+  }
+
+  @Bean
+  IntegerSerializer integerSerializer()
+  {
+    return new IntegerSerializer();
+  }
+
+  @Bean
+  JsonSerializer<ChatRoomTo> chatRoomSerializer()
+  {
+    JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+    serializer.configure(
+        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+        false);
+    return serializer;
+  }
+
+  @Bean
+  Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
+      Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
+      IntegerDeserializer integerDeserializer,
+      JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+  {
+    Map<String, Object> properties = new HashMap<>();
+    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
+    properties.put(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        "chat_room_channel");
+    return new KafkaConsumer<>(
+        properties,
+        integerDeserializer,
+        chatRoomDeserializer);
+  }
+
+  @Bean
+  IntegerDeserializer integerDeserializer()
+  {
+    return new IntegerDeserializer();
+  }
+
+  @Bean
+  JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+  {
+    JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+    deserializer.configure(
+        Map.of(
+            JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
+            JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
+        false );
+    return deserializer;
+  }
+
+  @Bean
+  ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+  {
+    return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+  }
+
+  @Bean
+  ChatMessageChannel chatMessageChannel(
+      ChatBackendProperties properties,
+      Producer<String, MessageTo> chatMessageChannelProducer,
+      Consumer<String, MessageTo> chatMessageChannelConsumer,
+      ZoneId zoneId)
+  {
+    return new ChatMessageChannel(
+        properties.getKafka().getMessageChannelTopic(),
+        chatMessageChannelProducer,
+        chatMessageChannelConsumer,
+        zoneId,
+        properties.getKafka().getNumPartitions());
+  }
+
+  @Bean
+  Producer<String, MessageTo>  chatMessageChannelProducer(
+      Properties defaultProducerProperties,
+      ChatBackendProperties chatBackendProperties,
+      StringSerializer stringSerializer,
+      JsonSerializer<MessageTo> messageSerializer)
+  {
+    Map<String, Object> properties = new HashMap<>();
+    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ProducerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
+    return new KafkaProducer<>(
+        properties,
+        stringSerializer,
+        messageSerializer);
+  }
+
+  @Bean
+  StringSerializer stringSerializer()
+  {
+    return new StringSerializer();
+  }
+
+  @Bean
+  JsonSerializer<MessageTo> chatMessageSerializer()
+  {
+    JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+    serializer.configure(
+        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+        false);
+    return serializer;
+  }
+
+  @Bean
+  Consumer<String, MessageTo>  chatMessageChannelConsumer(
+      Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
+      StringDeserializer stringDeserializer,
+      JsonDeserializer<MessageTo> messageDeserializer)
+  {
+    Map<String, Object> properties = new HashMap<>();
+    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
+    properties.put(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        "chat_message_channel");
+    return new KafkaConsumer<>(
+        properties,
+        stringDeserializer,
+        messageDeserializer);
+  }
+
+  @Bean
+  StringDeserializer stringDeserializer()
+  {
+    return new StringDeserializer();
+  }
+
+  @Bean
+  JsonDeserializer<MessageTo> chatMessageDeserializer()
+  {
+    JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+    deserializer.configure(
+        Map.of(
+            JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
+            JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
+        false );
+    return deserializer;
+  }
+
+  @Bean
+  Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
+  {
+    Properties properties = new Properties();
+    properties.setProperty(
+        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        chatBackendProperties.getKafka().getBootstrapServers());
+    return properties;
+  }
+
+  @Bean
+  Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
+  {
+    Properties properties = new Properties();
+    properties.setProperty(
+        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        chatBackendProperties.getKafka().getBootstrapServers());
+    properties.setProperty(
+        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+        "false");
+    properties.setProperty(
+        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        "earliest");
+    return properties;
+  }
+
+  @Bean
+  ZoneId zoneId()
+  {
+    return ZoneId.systemDefault();
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java
deleted file mode 100644 (file)
index 4350779..0000000
+++ /dev/null
@@ -1,270 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-import java.time.Clock;
-import java.time.ZoneId;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-
-@ConditionalOnProperty(
-    prefix = "chat.backend",
-    name = "services",
-    havingValue = "kafka")
-@Configuration
-public class KafkaServicesConfiguration
-{
-  @Bean
-  ChatHome kafkaChatHome(
-      ShardingStrategy shardingStrategy,
-      ChatMessageChannel chatMessageChannel)
-  {
-    return new KafkaChatHome(shardingStrategy, chatMessageChannel);
-  }
-
-  @Bean
-  KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
-  {
-    return new KafkaChatRoomFactory(chatRoomChannel);
-  }
-
-  @Bean
-  ChatRoomChannel chatRoomChannel(
-      ChatBackendProperties properties,
-      Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
-      Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
-      ShardingStrategy shardingStrategy,
-      ChatMessageChannel chatMessageChannel,
-      Clock clock)
-  {
-    return new ChatRoomChannel(
-        properties.getKafka().getChatroomChannelTopic(),
-        chatRoomChannelProducer,
-        chatRoomChannelConsumer,
-        shardingStrategy,
-        chatMessageChannel,
-        clock,
-        properties.getChatroomBufferSize());
-  }
-
-  @Bean
-  Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
-      Properties defaultProducerProperties,
-      ChatBackendProperties chatBackendProperties,
-      IntegerSerializer integerSerializer,
-      JsonSerializer<ChatRoomTo> chatRoomSerializer)
-  {
-    Map<String, Object> properties = new HashMap<>();
-    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
-    properties.put(
-        ProducerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
-    return new KafkaProducer<>(
-        properties,
-        integerSerializer,
-        chatRoomSerializer);
-  }
-
-  @Bean
-  IntegerSerializer integerSerializer()
-  {
-    return new IntegerSerializer();
-  }
-
-  @Bean
-  JsonSerializer<ChatRoomTo> chatRoomSerializer()
-  {
-    JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
-    serializer.configure(
-        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
-        false);
-    return serializer;
-  }
-
-  @Bean
-  Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
-      Properties defaultConsumerProperties,
-      ChatBackendProperties chatBackendProperties,
-      IntegerDeserializer integerDeserializer,
-      JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
-  {
-    Map<String, Object> properties = new HashMap<>();
-    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
-    properties.put(
-        ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
-    properties.put(
-        ConsumerConfig.GROUP_ID_CONFIG,
-        "chat_room_channel");
-    return new KafkaConsumer<>(
-        properties,
-        integerDeserializer,
-        chatRoomDeserializer);
-  }
-
-  @Bean
-  IntegerDeserializer integerDeserializer()
-  {
-    return new IntegerDeserializer();
-  }
-
-  @Bean
-  JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
-  {
-    JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
-    deserializer.configure(
-        Map.of(
-            JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
-            JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
-            JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
-        false );
-    return deserializer;
-  }
-
-  @Bean
-  ShardingStrategy shardingStrategy(ChatBackendProperties properties)
-  {
-    return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
-  }
-
-  @Bean
-  ChatMessageChannel chatMessageChannel(
-      ChatBackendProperties properties,
-      Producer<String, MessageTo> chatMessageChannelProducer,
-      Consumer<String, MessageTo> chatMessageChannelConsumer,
-      ZoneId zoneId)
-  {
-    return new ChatMessageChannel(
-        properties.getKafka().getMessageChannelTopic(),
-        chatMessageChannelProducer,
-        chatMessageChannelConsumer,
-        zoneId,
-        properties.getKafka().getNumPartitions());
-  }
-
-  @Bean
-  Producer<String, MessageTo>  chatMessageChannelProducer(
-      Properties defaultProducerProperties,
-      ChatBackendProperties chatBackendProperties,
-      StringSerializer stringSerializer,
-      JsonSerializer<MessageTo> messageSerializer)
-  {
-    Map<String, Object> properties = new HashMap<>();
-    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
-    properties.put(
-        ProducerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
-    return new KafkaProducer<>(
-        properties,
-        stringSerializer,
-        messageSerializer);
-  }
-
-  @Bean
-  StringSerializer stringSerializer()
-  {
-    return new StringSerializer();
-  }
-
-  @Bean
-  JsonSerializer<MessageTo> chatMessageSerializer()
-  {
-    JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
-    serializer.configure(
-        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
-        false);
-    return serializer;
-  }
-
-  @Bean
-  Consumer<String, MessageTo>  chatMessageChannelConsumer(
-      Properties defaultConsumerProperties,
-      ChatBackendProperties chatBackendProperties,
-      StringDeserializer stringDeserializer,
-      JsonDeserializer<MessageTo> messageDeserializer)
-  {
-    Map<String, Object> properties = new HashMap<>();
-    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
-    properties.put(
-        ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
-    properties.put(
-        ConsumerConfig.GROUP_ID_CONFIG,
-        "chat_message_channel");
-    return new KafkaConsumer<>(
-        properties,
-        stringDeserializer,
-        messageDeserializer);
-  }
-
-  @Bean
-  StringDeserializer stringDeserializer()
-  {
-    return new StringDeserializer();
-  }
-
-  @Bean
-  JsonDeserializer<MessageTo> chatMessageDeserializer()
-  {
-    JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
-    deserializer.configure(
-        Map.of(
-            JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
-            JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
-            JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
-        false );
-    return deserializer;
-  }
-
-  @Bean
-  Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
-  {
-    Properties properties = new Properties();
-    properties.setProperty(
-        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
-        chatBackendProperties.getKafka().getBootstrapServers());
-    return properties;
-  }
-
-  @Bean
-  Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
-  {
-    Properties properties = new Properties();
-    properties.setProperty(
-        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-        chatBackendProperties.getKafka().getBootstrapServers());
-    properties.setProperty(
-        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-        "false");
-    properties.setProperty(
-        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-        "earliest");
-    return properties;
-  }
-
-  @Bean
-  ZoneId zoneId()
-  {
-    return ZoneId.systemDefault();
-  }
-}