WIP notnagel
authorKai Moritz <kai@juplo.de>
Fri, 22 Oct 2021 22:23:33 +0000 (00:23 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 22 Oct 2021 22:23:33 +0000 (00:23 +0200)
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java

index 0459e3c..b9e3c9c 100644 (file)
@@ -20,36 +20,6 @@ import java.util.Properties;
 @EnableConfigurationProperties(SplitterApplicationProperties.class)
 public class SplitterApplication
 {
-       @Bean
-       KafkaConsumer<String, String> consumer(SplitterApplicationProperties properties)
-       {
-               Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
-
-               Properties props = new Properties();
-               props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-               props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
-               props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-               props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-               props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-               return new KafkaConsumer<>(props);
-       }
-
-       @Bean
-       KafkaProducer<String, String> producer(SplitterApplicationProperties properties)
-       {
-               Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
-
-               Properties props = new Properties();
-               props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-               props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
-               props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
-               props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
-               return new KafkaProducer<>(props);
-       }
-
        @Bean
        Clock clock()
        {
index 07dfb5e..51d22b3 100644 (file)
@@ -1,22 +1,27 @@
 package de.juplo.kafka.wordcount.splitter;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
 
 import javax.annotation.PreDestroy;
 import java.time.Clock;
 import java.time.Duration;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -33,6 +38,7 @@ public class SplitterStreamProcessor implements ApplicationRunner
   private final KafkaProducer<String, String> producer;
   private final Clock clock;
   private final int commitInterval;
+  private final Lock running = new ReentrantLock();
 
   private boolean stopped = false;
   private long[] offsets;
@@ -41,28 +47,45 @@ public class SplitterStreamProcessor implements ApplicationRunner
 
   public SplitterStreamProcessor(
       SplitterApplicationProperties properties,
-      KafkaConsumer<String, String> consumer,
-      KafkaProducer<String,String> producer,
       Clock clock)
   {
     this.inputTopic = properties.getInputTopic();
     this.outputTopic = properties.getOutputTopic();
 
-    this.consumer = consumer;
-    this.producer = producer;
-
     this.clock = clock;
     this.commitInterval = properties.getCommitInterval();
+
+    Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
+
+    Properties props;
+
+    props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    consumer = new KafkaConsumer<>(props);
+
+    props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    producer = new KafkaProducer<>(props);
   }
 
   @Override
   public void run(ApplicationArguments args)
   {
-    log.info("Initializing transaction");
-    producer.initTransactions();
+    running.lock();
 
     try
     {
+      log.info("Initializing transaction");
+      producer.initTransactions();
+
       log.info("Subscribing to topic {}", inputTopic);
       consumer.subscribe(
           Arrays.asList(inputTopic),
@@ -140,11 +163,18 @@ public class SplitterStreamProcessor implements ApplicationRunner
     }
     finally
     {
-      log.info("Closing consumer");
-      consumer.close();
-      log.info("Closing producer");
-      producer.close();
-      log.info("Exiting!");
+      try
+      {
+        log.info("Closing consumer");
+        consumer.close();
+        log.info("Closing producer");
+        producer.close();
+        log.info("Exiting!");
+      }
+      finally
+      {
+        running.unlock();
+      }
     }
   }
 
@@ -169,7 +199,8 @@ public class SplitterStreamProcessor implements ApplicationRunner
     }
     producer.sendOffsetsToTransaction(
         offsetsToSend,
-        consumer.groupMetadata());log.info("Committing transaction");
+        consumer.groupMetadata());
+    log.info("Committing transaction");
     producer.commitTransaction();
   }
 
@@ -222,8 +253,10 @@ public class SplitterStreamProcessor implements ApplicationRunner
   @PreDestroy
   public void stop()
   {
-    log.info("Stopping Consumer");
+    log.info("Shutdown requested...");
     stopped = true;
     consumer.wakeup();
+    running.lock();
+    log.info("Shutdown completed!");
   }
 }