@EnableConfigurationProperties(SplitterApplicationProperties.class)
public class SplitterApplication
{
- @Bean
- KafkaConsumer<String, String> consumer(SplitterApplicationProperties properties)
- {
- Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
-
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
- return new KafkaConsumer<>(props);
- }
-
- @Bean
- KafkaProducer<String, String> producer(SplitterApplicationProperties properties)
- {
- Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
-
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- return new KafkaProducer<>(props);
- }
-
@Bean
Clock clock()
{
package de.juplo.kafka.wordcount.splitter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
import javax.annotation.PreDestroy;
import java.time.Clock;
import java.time.Duration;
import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
private final KafkaProducer<String, String> producer;
private final Clock clock;
private final int commitInterval;
+ private final Lock running = new ReentrantLock();
private boolean stopped = false;
private long[] offsets;
public SplitterStreamProcessor(
SplitterApplicationProperties properties,
- KafkaConsumer<String, String> consumer,
- KafkaProducer<String,String> producer,
Clock clock)
{
this.inputTopic = properties.getInputTopic();
this.outputTopic = properties.getOutputTopic();
- this.consumer = consumer;
- this.producer = producer;
-
this.clock = clock;
this.commitInterval = properties.getCommitInterval();
+
+ Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
+
+ Properties props;
+
+ props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumer = new KafkaConsumer<>(props);
+
+ props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producer = new KafkaProducer<>(props);
}
@Override
public void run(ApplicationArguments args)
{
- log.info("Initializing transaction");
- producer.initTransactions();
+ running.lock();
try
{
+ log.info("Initializing transaction");
+ producer.initTransactions();
+
log.info("Subscribing to topic {}", inputTopic);
consumer.subscribe(
Arrays.asList(inputTopic),
}
finally
{
- log.info("Closing consumer");
- consumer.close();
- log.info("Closing producer");
- producer.close();
- log.info("Exiting!");
+ try
+ {
+ log.info("Closing consumer");
+ consumer.close();
+ log.info("Closing producer");
+ producer.close();
+ log.info("Exiting!");
+ }
+ finally
+ {
+ running.unlock();
+ }
}
}
}
producer.sendOffsetsToTransaction(
offsetsToSend,
- consumer.groupMetadata());log.info("Committing transaction");
+ consumer.groupMetadata());
+ log.info("Committing transaction");
producer.commitTransaction();
}
@PreDestroy
public void stop()
{
- log.info("Stopping Consumer");
+ log.info("Shutdown requested...");
stopped = true;
consumer.wakeup();
+ running.lock();
+ log.info("Shutdown completed!");
}
}