FROM openjdk:11-jre-slim
COPY target/*.jar /opt/app.jar
-EXPOSE 8081
+EXPOSE 8086
ENTRYPOINT ["java", "-jar", "/opt/app.jar"]
CMD []
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
- <artifactId>kafka-splitter</artifactId>
- <version>1.0.0</version>
+ <artifactId>splitter</artifactId>
+ <version>1.0.0-vanilla-kafka</version>
<name>Wordcount-Splitter</name>
- <description>Stream-processor of the multi-user wordcount-example, that splits the sentences up into single words</description>
+ <description>A version of the stream-processor for the multi-user wordcount-example, that splits the sentences up into single words, that is implemented in Vanilla-Kafka (without Kafka Streams)</description>
<properties>
<docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
<java.version>11</java.version>
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.util.Assert;
-
-import java.time.Clock;
-import java.util.Properties;
-
-
-@SpringBootApplication
-@EnableConfigurationProperties(SplitterApplicationProperties.class)
-public class SplitterApplication
-{
- @Bean(destroyMethod = "close")
- KafkaConsumer<String, String> consumer(SplitterApplicationProperties properties)
- {
- Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
-
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- return new KafkaConsumer<>(props);
- }
-
- @Bean(destroyMethod = "close")
- KafkaProducer<String, String> producer(SplitterApplicationProperties properties)
- {
- Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
-
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- return new KafkaProducer<>(props);
- }
-
- @Bean
- Clock clock()
- {
- return Clock.systemDefaultZone();
- }
-
-
- public static void main(String[] args)
- {
- SpringApplication.run(SplitterApplication.class, args);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.splitter")
-@Getter
-@Setter
-@ToString
-public class SplitterApplicationProperties
-{
- private String bootstrapServer = "localhost:9092";
- private String groupId = "splitter";
- private String inputTopic = "recordings";
- private String outputTopic = "words";
- private int commitInterval = 1000;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PreDestroy;
-import java.time.Clock;
-import java.time.Duration;
-import java.util.*;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-
-@Component
-@Slf4j
-public class SplitterStreamProcessor implements ApplicationRunner
-{
- final static Pattern PATTERN = Pattern.compile("\\W+");
-
- private final String inputTopic;
- private final String outputTopic;
- private final KafkaConsumer<String, String> consumer;
- private final KafkaProducer<String, String> producer;
- private final Clock clock;
- private final int commitInterval;
-
- private boolean stopped = false;
- private long[] offsets;
- private Optional<Integer>[] leaderEpochs;
- private long lastCommit;
-
- public SplitterStreamProcessor(
- SplitterApplicationProperties properties,
- KafkaConsumer<String, String> consumer,
- KafkaProducer<String,String> producer,
- Clock clock)
- {
- this.inputTopic = properties.getInputTopic();
- this.outputTopic = properties.getOutputTopic();
-
- this.consumer = consumer;
- this.producer = producer;
-
- this.clock = clock;
- this.commitInterval = properties.getCommitInterval();
- }
-
- @Override
- public void run(ApplicationArguments args)
- {
- log.info("Initializing transaction");
- producer.initTransactions();
-
- try
- {
- log.info("Subscribing to topic {}", inputTopic);
- consumer.subscribe(
- Arrays.asList(inputTopic),
- new TransactionalConsumerRebalanceListener());
-
- while (!stopped)
- {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
-
- records.forEach(inputRecord ->
- {
- offsets[inputRecord.partition()] = inputRecord.offset();
- leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
-
- String[] words = PATTERN.split(inputRecord.value());
- for (int i = 0; i < words.length; i++)
- {
- ProducerRecord<String, String> outputRecord =
- new ProducerRecord<>(
- outputTopic,
- inputRecord.key(),
- words[i].trim());
-
- producer.send(outputRecord, (metadata, exception) ->
- {
- if (exception == null)
- {
- // HANDLE SUCCESS
- log.info(
- "sent {}={}, partition={}, offset={}",
- outputRecord.key(),
- outputRecord.value(),
- metadata.partition(),
- metadata.offset());
- }
- else
- {
- // HANDLE ERROR
- log.error(
- "could not send {}={}: {}",
- outputRecord.key(),
- outputRecord.value(),
- exception.toString());
- }
- });
- }
- });
-
- long delta = lastCommit - clock.millis();
- if (delta > commitInterval)
- {
- log.info("Elapsed time since last commit: {}ms", delta);
- commitTransaction();
- beginTransaction();
- }
- }
- }
- catch (WakeupException e)
- {
- log.info("Waking up from exception!", e);
- commitTransaction();
- }
- catch (Exception e)
- {
- log.error("Unexpected exception!", e);
- producer.abortTransaction();
- }
- finally
- {
- log.info("Closing consumer");
- consumer.close();
- log.info("Closing producer");
- producer.close();
- log.info("Exiting!");
- }
- }
-
- private void beginTransaction()
- {
- log.info("Beginning new transaction");
- lastCommit = clock.millis();
- producer.beginTransaction();
- }
-
- private void commitTransaction()
- {
- Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
- for (int i = 0; i < offsets.length; i++)
- {
- if (offsets[i] > 0)
- {
- offsetsToSend.put(
- new TopicPartition(inputTopic, i),
- new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
- }
- }
- producer.sendOffsetsToTransaction(
- offsetsToSend,
- consumer.groupMetadata());log.info("Committing transaction");
- producer.commitTransaction();
- }
-
- class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
- {
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- log.info("Assigned partitions: {}", toString(partitions));
-
- // Compote the length of an array, that can be used to store the offsets
- // (We can ignore the topic, since we only read from a single one!)
- int length =
- partitions
- .stream()
- .reduce(
- 0,
- (i, v) -> i < v.partition() ? v.partition() : i,
- (l, r) -> l < r ? r : l) + 1;
- offsets = new long[length];
- leaderEpochs = new Optional[length];
-
- beginTransaction();
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- log.info("Revoked partitions: {}", toString(partitions));
- commitTransaction();
- }
-
- @Override
- public void onPartitionsLost(Collection<TopicPartition> partitions)
- {
- log.info("Lost partitions: {}", toString(partitions));
- producer.abortTransaction();
- }
-
- String toString(Collection<TopicPartition> partitions)
- {
- return
- partitions
- .stream()
- .map(tp -> tp.topic() + "-" + tp.partition())
- .collect(Collectors.joining(", "));
- }
- }
-
- @PreDestroy
- public void stop()
- {
- log.info("Stopping Consumer");
- stopped = true;
- consumer.wakeup();
- }
-}
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.util.Assert;
+
+import java.time.Clock;
+import java.util.Properties;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(SplitterApplicationProperties.class)
+public class SplitterApplication
+{
+ @Bean(destroyMethod = "close")
+ KafkaConsumer<String, String> consumer(SplitterApplicationProperties properties)
+ {
+ Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
+
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ return new KafkaConsumer<>(props);
+ }
+
+ @Bean(destroyMethod = "close")
+ KafkaProducer<String, String> producer(SplitterApplicationProperties properties)
+ {
+ Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ return new KafkaProducer<>(props);
+ }
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+
+
+ public static void main(String[] args)
+ {
+ SpringApplication.run(SplitterApplication.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.splitter")
+@Getter
+@Setter
+@ToString
+public class SplitterApplicationProperties
+{
+ private String bootstrapServer = "localhost:9092";
+ private String groupId = "splitter";
+ private String inputTopic = "recordings";
+ private String outputTopic = "words";
+ private int commitInterval = 1000;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.*;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+
+@Component
+@Slf4j
+public class SplitterStreamProcessor implements ApplicationRunner
+{
+ final static Pattern PATTERN = Pattern.compile("\\W+");
+
+ private final String inputTopic;
+ private final String outputTopic;
+ private final KafkaConsumer<String, String> consumer;
+ private final KafkaProducer<String, String> producer;
+ private final Clock clock;
+ private final int commitInterval;
+
+ private boolean stopped = false;
+ private long[] offsets;
+ private Optional<Integer>[] leaderEpochs;
+ private long lastCommit;
+
+ public SplitterStreamProcessor(
+ SplitterApplicationProperties properties,
+ KafkaConsumer<String, String> consumer,
+ KafkaProducer<String,String> producer,
+ Clock clock)
+ {
+ this.inputTopic = properties.getInputTopic();
+ this.outputTopic = properties.getOutputTopic();
+
+ this.consumer = consumer;
+ this.producer = producer;
+
+ this.clock = clock;
+ this.commitInterval = properties.getCommitInterval();
+ }
+
+ @Override
+ public void run(ApplicationArguments args)
+ {
+ log.info("Initializing transaction");
+ producer.initTransactions();
+
+ try
+ {
+ log.info("Subscribing to topic {}", inputTopic);
+ consumer.subscribe(
+ Arrays.asList(inputTopic),
+ new TransactionalConsumerRebalanceListener());
+
+ while (!stopped)
+ {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
+
+ records.forEach(inputRecord ->
+ {
+ offsets[inputRecord.partition()] = inputRecord.offset();
+ leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
+
+ String[] words = PATTERN.split(inputRecord.value());
+ for (int i = 0; i < words.length; i++)
+ {
+ ProducerRecord<String, String> outputRecord =
+ new ProducerRecord<>(
+ outputTopic,
+ inputRecord.key(),
+ words[i].trim());
+
+ producer.send(outputRecord, (metadata, exception) ->
+ {
+ if (exception == null)
+ {
+ // HANDLE SUCCESS
+ log.info(
+ "sent {}={}, partition={}, offset={}",
+ outputRecord.key(),
+ outputRecord.value(),
+ metadata.partition(),
+ metadata.offset());
+ }
+ else
+ {
+ // HANDLE ERROR
+ log.error(
+ "could not send {}={}: {}",
+ outputRecord.key(),
+ outputRecord.value(),
+ exception.toString());
+ }
+ });
+ }
+ });
+
+ long delta = lastCommit - clock.millis();
+ if (delta > commitInterval)
+ {
+ log.info("Elapsed time since last commit: {}ms", delta);
+ commitTransaction();
+ beginTransaction();
+ }
+ }
+ }
+ catch (WakeupException e)
+ {
+ log.info("Waking up from exception!", e);
+ commitTransaction();
+ }
+ catch (Exception e)
+ {
+ log.error("Unexpected exception!", e);
+ producer.abortTransaction();
+ }
+ finally
+ {
+ log.info("Closing consumer");
+ consumer.close();
+ log.info("Closing producer");
+ producer.close();
+ log.info("Exiting!");
+ }
+ }
+
+ private void beginTransaction()
+ {
+ log.info("Beginning new transaction");
+ lastCommit = clock.millis();
+ producer.beginTransaction();
+ }
+
+ private void commitTransaction()
+ {
+ Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
+ for (int i = 0; i < offsets.length; i++)
+ {
+ if (offsets[i] > 0)
+ {
+ offsetsToSend.put(
+ new TopicPartition(inputTopic, i),
+ new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
+ }
+ }
+ producer.sendOffsetsToTransaction(
+ offsetsToSend,
+ consumer.groupMetadata());log.info("Committing transaction");
+ producer.commitTransaction();
+ }
+
+ class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
+ {
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ log.info("Assigned partitions: {}", toString(partitions));
+
+ // Compote the length of an array, that can be used to store the offsets
+ // (We can ignore the topic, since we only read from a single one!)
+ int length =
+ partitions
+ .stream()
+ .reduce(
+ 0,
+ (i, v) -> i < v.partition() ? v.partition() : i,
+ (l, r) -> l < r ? r : l) + 1;
+ offsets = new long[length];
+ leaderEpochs = new Optional[length];
+
+ beginTransaction();
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ log.info("Revoked partitions: {}", toString(partitions));
+ commitTransaction();
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ log.info("Lost partitions: {}", toString(partitions));
+ producer.abortTransaction();
+ }
+
+ String toString(Collection<TopicPartition> partitions)
+ {
+ return
+ partitions
+ .stream()
+ .map(tp -> tp.topic() + "-" + tp.partition())
+ .collect(Collectors.joining(", "));
+ }
+ }
+
+ @PreDestroy
+ public void stop()
+ {
+ log.info("Stopping Consumer");
+ stopped = true;
+ consumer.wakeup();
+ }
+}
-server.port=8081
+server.port=8086
management.endpoints.web.exposure.include=*
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.boot.test.context.SpringBootTest;
-
-@SpringBootTest
-class ApplicationTests
-{
- @Test
- void contextLoads()
- {
- }
-}
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class ApplicationTests
+{
+ @Test
+ void contextLoads()
+ {
+ }
+}