WIP
authorKai Moritz <kai@juplo.de>
Fri, 22 Oct 2021 21:57:56 +0000 (23:57 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 22 Oct 2021 21:57:56 +0000 (23:57 +0200)
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
src/main/resources/application.properties

index b7a94dd..0459e3c 100644 (file)
@@ -4,6 +4,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -19,7 +20,7 @@ import java.util.Properties;
 @EnableConfigurationProperties(SplitterApplicationProperties.class)
 public class SplitterApplication
 {
-       @Bean(destroyMethod = "close")
+       @Bean
        KafkaConsumer<String, String> consumer(SplitterApplicationProperties properties)
        {
                Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
@@ -28,13 +29,13 @@ public class SplitterApplication
                props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
                props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-               props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
-               props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
+               props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+               props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
                return new KafkaConsumer<>(props);
        }
 
-       @Bean(destroyMethod = "close")
+       @Bean
        KafkaProducer<String, String> producer(SplitterApplicationProperties properties)
        {
                Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
index dab15f8..07dfb5e 100644 (file)
@@ -74,6 +74,13 @@ public class SplitterStreamProcessor implements ApplicationRunner
 
         records.forEach(inputRecord ->
         {
+          log.debug(
+              "Received a recording of {}, partition={}, offset={}, epoch={}",
+              inputRecord.key(),
+              inputRecord.partition(),
+              inputRecord.offset(),
+              inputRecord.leaderEpoch());
+
           offsets[inputRecord.partition()] = inputRecord.offset();
           leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
 
@@ -91,8 +98,8 @@ public class SplitterStreamProcessor implements ApplicationRunner
               if (exception == null)
               {
                 // HANDLE SUCCESS
-                log.info(
-                    "sent {}={}, partition={}, offset={}",
+                log.debug(
+                    "Sent {}={}, partition={}, offset={}",
                     outputRecord.key(),
                     outputRecord.value(),
                     metadata.partition(),
@@ -102,28 +109,29 @@ public class SplitterStreamProcessor implements ApplicationRunner
               {
                 // HANDLE ERROR
                 log.error(
-                    "could not send {}={}: {}",
+                    "Could not send {}={}: {}",
                     outputRecord.key(),
                     outputRecord.value(),
                     exception.toString());
               }
             });
           }
-        });
 
-        long delta = lastCommit - clock.millis();
-        if (delta > commitInterval)
-        {
-          log.info("Elapsed time since last commit: {}ms", delta);
-          commitTransaction();
-          beginTransaction();
-        }
+          long delta = clock.millis() - lastCommit;
+          if (delta > commitInterval)
+          {
+            log.info("Elapsed time since last commit: {}ms", delta);
+            commitTransaction();
+            beginTransaction();
+          }
+        });
       }
     }
     catch (WakeupException e)
     {
-      log.info("Waking up from exception!", e);
-      commitTransaction();
+      log.info("Waking up from exception!");
+      // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
+      // commitTransaction();
     }
     catch (Exception e)
     {
index 3046fc3..0d2624f 100644 (file)
@@ -1,2 +1,3 @@
 server.port=8086
 management.endpoints.web.exposure.include=*
+logging.level.de.juplo.kafka.wordcount.splitter=DEBUG