Verbesserungen aus 'stored-state' nach 'rebalance-listener' gemerged
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationConfiguration.java
index 76d0c8a..7a0a8ad 100644 (file)
@@ -17,17 +17,37 @@ import java.util.concurrent.Executors;
 public class ApplicationConfiguration
 {
   @Bean
-  public EndlessConsumer endlessConsumer(
-      KafkaConsumer<String, String> kafkaConsumer,
+  public KeyCountingRecordHandler keyCountingRecordHandler()
+  {
+    return new KeyCountingRecordHandler();
+  }
+
+  @Bean
+  public KeyCountingRebalanceListener keyCountingRebalanceListener(
+      KeyCountingRecordHandler keyCountingRecordHandler,
+      ApplicationProperties properties)
+  {
+    return new KeyCountingRebalanceListener(
+        keyCountingRecordHandler,
+        properties.getClientId());
+  }
+
+  @Bean
+  public EndlessConsumer<String, Long> endlessConsumer(
+      KafkaConsumer<String, Long> kafkaConsumer,
       ExecutorService executor,
+      KeyCountingRebalanceListener keyCountingRebalanceListener,
+      KeyCountingRecordHandler keyCountingRecordHandler,
       ApplicationProperties properties)
   {
     return
-        new EndlessConsumer(
+        new EndlessConsumer<>(
             executor,
             properties.getClientId(),
             properties.getTopic(),
-            kafkaConsumer);
+            kafkaConsumer,
+            keyCountingRebalanceListener,
+            keyCountingRecordHandler);
   }
 
   @Bean
@@ -37,14 +57,16 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
+  public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
   {
     Properties props = new Properties();
 
     props.put("bootstrap.servers", properties.getBootstrapServer());
+    props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
     props.put("group.id", properties.getGroupId());
     props.put("client.id", properties.getClientId());
     props.put("auto.offset.reset", properties.getAutoOffsetReset());
+    props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
     props.put("metadata.max.age.ms", "1000");
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", LongDeserializer.class.getName());