splitter: 1.0.0-spring-integration - Inital implementation (incomplete)
authorKai Moritz <kai@juplo.de>
Sun, 26 Jun 2022 12:05:11 +0000 (14:05 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 30 Jun 2022 18:43:18 +0000 (20:43 +0200)
* "Implementing" the splitter with Spring Integration mainly is a
  configuration task.
* This version does not yet send the messages with the correct key.
* Managment of transactions is not yet considered.
* The test fails, because the key is missing.

pom.xml
src/main/java/de/juplo/kafka/wordcount/splitter/MessageSplitter.java
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java [deleted file]
src/main/resources/application.properties
src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java

diff --git a/pom.xml b/pom.xml
index 1938fd9..d201639 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,9 +10,9 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>splitter</artifactId>
-       <version>1.0.0-vanilla-kafka</version>
+       <version>1.0.0-spring-integration</version>
        <name>Wordcount-Splitter</name>
-       <description>A version of the stream-processor for the multi-user wordcount-example, that splits the sentences up into single words, that is implemented in Vanilla-Kafka (without Kafka Streams)</description>
+       <description>A version of the stream-processor for the multi-user wordcount-example, that splits the sentences up into single words and is implemented based on Spring Integration</description>
        <properties>
                <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
                <java.version>11</java.version>
@@ -27,8 +27,8 @@
                        <artifactId>spring-boot-starter-web</artifactId>
                </dependency>
                <dependency>
-                       <groupId>org.apache.kafka</groupId>
-                       <artifactId>kafka-clients</artifactId>
+                       <groupId>org.springframework.integration</groupId>
+                       <artifactId>spring-integration-kafka</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.hibernate.validator</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
-               <dependency>
-                       <groupId>org.springframework.kafka</groupId>
-                       <artifactId>spring-kafka</artifactId>
-                       <scope>test</scope>
-               </dependency>
                <dependency>
                        <groupId>org.springframework.kafka</groupId>
                        <artifactId>spring-kafka-test</artifactId>
index 0665f3e..6128f0a 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.wordcount.splitter;
 
+import org.springframework.integration.annotation.Splitter;
 import org.springframework.stereotype.Component;
 
 import java.util.regex.Pattern;
@@ -10,6 +11,7 @@ public class MessageSplitter
 {
   final static Pattern PATTERN = Pattern.compile("\\W+");
 
+  @Splitter(inputChannel = "recordings", outputChannel = "words")
   String[] split(String message)
   {
     return PATTERN.split(message);
index d46a7cd..412f429 100644 (file)
@@ -4,20 +4,43 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
-
-import java.time.Clock;
+import org.springframework.expression.common.LiteralExpression;
+import org.springframework.integration.annotation.InboundChannelAdapter;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.kafka.inbound.KafkaMessageSource;
+import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.listener.ConsumerProperties;
+import org.springframework.messaging.MessageHandler;
 
 
 @SpringBootApplication
 @EnableConfigurationProperties(SplitterApplicationProperties.class)
+@EnableIntegration
 public class SplitterApplication
 {
+       @InboundChannelAdapter(channel = "recordings")
        @Bean
-       Clock clock()
+       KafkaMessageSource<String, String> source(
+                       ConsumerFactory<String, String> cf,
+                       SplitterApplicationProperties properties)
        {
-               return Clock.systemDefaultZone();
+               return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
        }
 
+       @Bean
+       @ServiceActivator(inputChannel = "words")
+       MessageHandler handler(
+                       KafkaTemplate<String, String> kafkaTemplate,
+                       SplitterApplicationProperties properties)
+       {
+               KafkaProducerMessageHandler<String, String> handler =
+                               new KafkaProducerMessageHandler<>(kafkaTemplate);
+               handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
+               return handler;
+       }
 
        public static void main(String[] args)
        {
index f699f32..4881ea5 100644 (file)
@@ -13,9 +13,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 @ToString
 public class SplitterApplicationProperties
 {
-  private String bootstrapServer = "localhost:9092";
-  private String groupId = "splitter";
   private String inputTopic = "recordings";
   private String outputTopic = "words";
-  private int commitInterval = 1000;
 }
diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
deleted file mode 100644 (file)
index 0eafbda..0000000
+++ /dev/null
@@ -1,267 +0,0 @@
-package de.juplo.kafka.wordcount.splitter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.core.task.TaskExecutor;
-import org.springframework.stereotype.Component;
-import org.springframework.util.Assert;
-
-import javax.annotation.PreDestroy;
-import java.time.Clock;
-import java.time.Duration;
-import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-
-@Component
-@Slf4j
-public class SplitterStreamProcessor implements Runnable
-{
-  private final MessageSplitter splitter;
-  private final String inputTopic;
-  private final String outputTopic;
-  private final KafkaConsumer<String, String> consumer;
-  private final KafkaProducer<String, String> producer;
-  private final Clock clock;
-  private final int commitInterval;
-  private final Lock running = new ReentrantLock();
-
-  private boolean stopped = false;
-  private long[] offsets;
-  private Optional<Integer>[] leaderEpochs;
-  private long lastCommit;
-
-  public SplitterStreamProcessor(
-      MessageSplitter splitter,
-      SplitterApplicationProperties properties,
-      Clock clock,
-      TaskExecutor executor)
-  {
-    this.splitter = splitter;
-
-    this.inputTopic = properties.getInputTopic();
-    this.outputTopic = properties.getOutputTopic();
-
-    this.clock = clock;
-    this.commitInterval = properties.getCommitInterval();
-
-    Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
-
-    Properties props;
-
-    props = new Properties();
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-    consumer = new KafkaConsumer<>(props);
-
-    props = new Properties();
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
-    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
-    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-    producer = new KafkaProducer<>(props);
-
-    executor.execute(this);
-  }
-
-  public void run()
-  {
-    running.lock();
-
-    try
-    {
-      log.info("Initializing transaction");
-      producer.initTransactions();
-
-      log.info("Subscribing to topic {}", inputTopic);
-      consumer.subscribe(
-          Arrays.asList(inputTopic),
-          new TransactionalConsumerRebalanceListener());
-
-      while (!stopped)
-      {
-        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
-
-        records.forEach(inputRecord ->
-        {
-          log.debug(
-              "Received a recording of {}, partition={}, offset={}, epoch={}",
-              inputRecord.key(),
-              inputRecord.partition(),
-              inputRecord.offset(),
-              inputRecord.leaderEpoch());
-
-          offsets[inputRecord.partition()] = inputRecord.offset();
-          leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
-
-          String[] words = splitter.split(inputRecord.value());
-          for (int i = 0; i < words.length; i++)
-          {
-            ProducerRecord<String, String> outputRecord =
-                new ProducerRecord<>(
-                    outputTopic,
-                    inputRecord.key(),
-                    words[i].trim());
-
-            producer.send(outputRecord, (metadata, exception) ->
-            {
-              if (exception == null)
-              {
-                // HANDLE SUCCESS
-                log.debug(
-                    "Sent {}={}, partition={}, offset={}",
-                    outputRecord.key(),
-                    outputRecord.value(),
-                    metadata.partition(),
-                    metadata.offset());
-              }
-              else
-              {
-                // HANDLE ERROR
-                log.error(
-                    "Could not send {}={}: {}",
-                    outputRecord.key(),
-                    outputRecord.value(),
-                    exception.toString());
-              }
-            });
-          }
-
-          long delta = clock.millis() - lastCommit;
-          if (delta > commitInterval)
-          {
-            log.info("Elapsed time since last commit: {}ms", delta);
-            commitTransaction();
-            beginTransaction();
-          }
-        });
-      }
-    }
-    catch (WakeupException e)
-    {
-      log.info("Waking up from exception!");
-      // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
-      // commitTransaction();
-    }
-    catch (Exception e)
-    {
-      log.error("Unexpected exception!", e);
-      producer.abortTransaction();
-    }
-    finally
-    {
-      try
-      {
-        log.info("Closing consumer");
-        consumer.close();
-        log.info("Closing producer");
-        producer.close();
-        log.info("Exiting!");
-      }
-      finally
-      {
-        running.unlock();
-      }
-    }
-  }
-
-  private void beginTransaction()
-  {
-    log.info("Beginning new transaction");
-    lastCommit = clock.millis();
-    producer.beginTransaction();
-  }
-
-  private void commitTransaction()
-  {
-    Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
-    for (int i = 0; i < offsets.length; i++)
-    {
-      if (offsets[i] > 0)
-      {
-        offsetsToSend.put(
-            new TopicPartition(inputTopic, i),
-            new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
-      }
-    }
-    producer.sendOffsetsToTransaction(
-        offsetsToSend,
-        consumer.groupMetadata());
-    log.info("Committing transaction");
-    producer.commitTransaction();
-  }
-
-  class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
-  {
-    @Override
-    public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-    {
-      log.info("Assigned partitions: {}", toString(partitions));
-
-      // Compote the length of an array, that can be used to store the offsets
-      // (We can ignore the topic, since we only read from a single one!)
-      int length =
-          partitions
-              .stream()
-              .reduce(
-                  0,
-                  (i, v) -> i < v.partition() ? v.partition() : i,
-                  (l, r) -> l < r ? r : l) + 1;
-      offsets = new long[length];
-      leaderEpochs = new Optional[length];
-
-      beginTransaction();
-    }
-
-    @Override
-    public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-    {
-      log.info("Revoked partitions: {}", toString(partitions));
-      commitTransaction();
-    }
-
-    @Override
-    public void onPartitionsLost(Collection<TopicPartition> partitions)
-    {
-      log.info("Lost partitions: {}", toString(partitions));
-      producer.abortTransaction();
-    }
-
-    String toString(Collection<TopicPartition> partitions)
-    {
-      return
-          partitions
-              .stream()
-              .map(tp -> tp.topic() + "-" + tp.partition())
-              .collect(Collectors.joining(", "));
-    }
-  }
-
-  @PreDestroy
-  public void stop()
-  {
-    log.info("Shutdown requested...");
-    if (stopped)
-    {
-      log.warn("Ignoring request: already stopped!");
-      return;
-    }
-    stopped = true;
-    consumer.wakeup();
-    running.lock();
-    log.info("Shutdown completed!");
-  }
-}
index 0d2624f..94e7492 100644 (file)
@@ -1,3 +1,10 @@
 server.port=8086
 management.endpoints.web.exposure.include=*
 logging.level.de.juplo.kafka.wordcount.splitter=DEBUG
+spring.kafka.consumer.auto-offset-reset. earliest
+spring.kafka.consumer.group-id=splitter
+spring.kafka.bootstrap-servers=localhost:9092
+spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
index 775d3bd..113501f 100644 (file)
@@ -39,8 +39,6 @@ class ApplicationTests
        final static String TOPIC_IN = "in";
        final static String TOPIC_OUT = "out";
 
-       @Autowired
-       SplitterStreamProcessor splitter;
        @Autowired
        KafkaTemplate<String, String> kafkaTemplate;
        @Autowired
@@ -57,7 +55,6 @@ class ApplicationTests
        @Test
        void contextLoads()
        {
-               splitter.stop();
        }
 
        @Test