exit
fi
-docker-compose up -d zookeeper kafka cli
+docker-compose up -d zookeeper kafka cli mongo express
if [[
$(docker image ls -q $IMAGE) == "" ||
echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
docker-compose up setup
-docker-compose up -d producer
-docker-compose up consumer &
-while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done
-sleep 5
-docker-compose exec -T cli bash << 'EOF'
-echo "Writing poison pill into topic test..."
-# tag::poisonpill[]
-echo 'BOOM!' | kafkacat -P -b kafka:9092 -t test
-# end::poisonpill[]
-EOF
-while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done
-http -v :8081/actuator/health
-echo "Restarting consumer"
-http -v post :8081/start
-while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done
-while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done
-http -v :8081/actuator/health
-http -v post :8081/actuator/shutdown
-docker-compose stop producer
+docker-compose up -d producer peter beate
+
+sleep 15
+
+http -v post :8082/stop
+sleep 10
+docker-compose kill -s 9 peter
+http -v post :8082/start
+sleep 60
+
+docker-compose stop producer peter beate
+docker-compose logs beate
+docker-compose logs --tail=10 peter
depends_on:
- zookeeper
+ mongo:
+ image: mongo:4.4.13
+ ports:
+ - 27017:27017
+ environment:
+ MONGO_INITDB_ROOT_USERNAME: juplo
+ MONGO_INITDB_ROOT_PASSWORD: training
+
+ express:
+ image: mongo-express
+ ports:
+ - 8090:8081
+ environment:
+ ME_CONFIG_MONGODB_ADMINUSERNAME: juplo
+ ME_CONFIG_MONGODB_ADMINPASSWORD: training
+ ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/
+ depends_on:
+ - mongo
+
setup:
image: juplo/toolbox
command: >
producer.bootstrap-server: kafka:9092
producer.client-id: producer
producer.topic: test
- producer.throttle-ms: 200
+ producer.throttle-ms: 500
- consumer:
+ peter:
image: juplo/endless-consumer:1.0-SNAPSHOT
ports:
- 8081:8080
environment:
server.port: 8080
consumer.bootstrap-server: kafka:9092
- consumer.client-id: my-group
- consumer.client-id: consumer
+ consumer.client-id: peter
+ consumer.topic: test
+ spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
+ spring.data.mongodb.database: juplo
+
+ beate:
+ image: juplo/endless-consumer:1.0-SNAPSHOT
+ ports:
+ - 8082:8080
+ environment:
+ server.port: 8080
+ consumer.bootstrap-server: kafka:9092
+ consumer.client-id: beate
consumer.topic: test
+ spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
+ spring.data.mongodb.database: juplo
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-mongodb</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>de.flapdoodle.embed</groupId>
+ <artifactId>de.flapdoodle.embed.mongo</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.PreDestroy;
-import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
}
@PreDestroy
- public void stopExecutor()
+ public void shutdown()
{
+ try
+ {
+ log.info("Stopping EndlessConsumer");
+ endlessConsumer.stop();
+ }
+ catch (IllegalStateException e)
+ {
+ log.info("Was already stopped: {}", e.toString());
+ }
+ catch (Exception e)
+ {
+ log.error("Unexpected exception while stopping EndlessConsumer: {}", e);
+ }
+
try
{
log.info("Shutting down the ExecutorService.");
}
catch (InterruptedException e)
{
- log.error("Exception while waiting for the termination of the ExecutorService: {}", e.toString());
+ log.error("Exception while waiting for the termination of the ExecutorService: {}", e);
}
finally
{
package de.juplo.kafka;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import java.time.Clock;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.function.Consumer;
@Configuration
public class ApplicationConfiguration
{
@Bean
- public Consumer<ConsumerRecord<String, Long>> consumer()
+ public KeyCountingRecordHandler messageCountingRecordHandler()
{
- return (record) ->
- {
- // Handle record
- };
+ return new KeyCountingRecordHandler();
+ }
+
+ @Bean
+ public KeyCountingRebalanceListener wordcountRebalanceListener(
+ KeyCountingRecordHandler keyCountingRecordHandler,
+ PartitionStatisticsRepository repository,
+ Consumer<String, Long> consumer,
+ ApplicationProperties properties)
+ {
+ return new KeyCountingRebalanceListener(
+ keyCountingRecordHandler,
+ repository,
+ properties.getClientId(),
+ properties.getTopic(),
+ Clock.systemDefaultZone(),
+ properties.getCommitInterval(),
+ consumer);
}
@Bean
public EndlessConsumer<String, Long> endlessConsumer(
KafkaConsumer<String, Long> kafkaConsumer,
ExecutorService executor,
- Consumer<ConsumerRecord<String, Long>> handler,
+ KeyCountingRebalanceListener keyCountingRebalanceListener,
+ KeyCountingRecordHandler keyCountingRecordHandler,
ApplicationProperties properties)
{
return
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
- handler);
+ keyCountingRebalanceListener,
+ keyCountingRecordHandler);
}
@Bean
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
props.put("group.id", properties.getGroupId());
props.put("client.id", properties.getClientId());
+ props.put("enable.auto.commit", false);
props.put("auto.offset.reset", properties.getAutoOffsetReset());
props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
props.put("metadata.max.age.ms", "1000");
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
-import org.springframework.web.bind.annotation.ExceptionHandler;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.ResponseStatus;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.bind.annotation.*;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class DriverController
{
private final EndlessConsumer consumer;
+ private final KeyCountingRecordHandler keyCountingRecordHandler;
@PostMapping("start")
@GetMapping("seen")
public Map<Integer, Map<String, Long>> seen()
{
- return consumer.getSeen();
+ return keyCountingRecordHandler.getSeen();
}
@Slf4j
@RequiredArgsConstructor
-public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
+public class EndlessConsumer<K, V> implements Runnable
{
private final ExecutorService executor;
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
- private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
+ private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener;
+ private final RecordHandler<K, V> handler;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Exception exception;
private long consumed = 0;
- private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
- private final Map<Integer, Long> offsets = new HashMap<>();
-
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- Long newOffset = consumer.position(tp);
- Long oldOffset = offsets.remove(partition);
- log.info(
- "{} - removing partition: {}, consumed {} records (offset {} -> {})",
- id,
- partition,
- newOffset - oldOffset,
- oldOffset,
- newOffset);
- Map<String, Long> removed = seen.remove(partition);
- for (String key : removed.keySet())
- {
- log.info(
- "{} - Seen {} messages for partition={}|key={}",
- id,
- removed.get(key),
- partition,
- key);
- }
- });
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- Long offset = consumer.position(tp);
- log.info("{} - adding partition: {}, offset={}", id, partition, offset);
- offsets.put(partition, offset);
- seen.put(partition, new HashMap<>());
- });
- }
@Override
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic), this);
+ consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
while (true)
{
handler.accept(record);
consumed++;
-
- Integer partition = record.partition();
- String key = record.key() == null ? "NULL" : record.key().toString();
- Map<String, Long> byKey = seen.get(partition);
-
- if (!byKey.containsKey(key))
- byKey.put(key, 0l);
-
- long seenByKey = byKey.get(key);
- seenByKey++;
- byKey.put(key, seenByKey);
}
+
+ pollIntervalAwareRebalanceListener.beforeNextPoll();
}
}
catch(WakeupException e)
{
log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
- consumer.commitSync();
shutdown();
}
catch(RecordDeserializationException e)
offset,
e.getCause().toString());
- consumer.commitSync();
shutdown(e);
}
catch(Exception e)
}
}
- public Map<Integer, Map<String, Long>> getSeen()
- {
- return seen;
- }
-
public void start()
{
lock.lock();
}
}
- public synchronized void stop() throws ExecutionException, InterruptedException
+ public synchronized void stop() throws InterruptedException
{
lock.lock();
try
public void destroy() throws ExecutionException, InterruptedException
{
log.info("{} - Destroy!", id);
- try
- {
- stop();
- }
- catch (IllegalStateException e)
- {
- log.info("{} - Was already stopped", id);
- }
- catch (Exception e)
- {
- log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
- }
- finally
- {
- log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
- }
+ log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
}
public boolean running()
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+{
+ private final KeyCountingRecordHandler handler;
+ private final PartitionStatisticsRepository repository;
+ private final String id;
+ private final String topic;
+ private final Clock clock;
+ private final Duration commitInterval;
+ private final Consumer<String, Long> consumer;
+
+ private Instant lastCommit = Instant.EPOCH;
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ Long offset = consumer.position(tp);
+ log.info("{} - adding partition: {}, offset={}", id, partition, offset);
+ StatisticsDocument document =
+ repository
+ .findById(Integer.toString(partition))
+ .orElse(new StatisticsDocument(partition));
+ if (document.offset >= 0)
+ {
+ // Only seek, if a stored offset was found
+ // Otherwise: Use initial offset, generated by Kafka
+ consumer.seek(tp, document.offset);
+ }
+ handler.addPartition(partition, document.statistics);
+ });
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ Long newOffset = consumer.position(tp);
+ log.info(
+ "{} - removing partition: {}, offset of next message {})",
+ id,
+ partition,
+ newOffset);
+ Map<String, Long> removed = handler.removePartition(partition);
+ repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+ });
+ }
+
+
+ @Override
+ public void beforeNextPoll()
+ {
+ if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+ {
+ log.debug("Storing data and offsets, last commit: {}", lastCommit);
+ handler.getSeen().forEach((partiton, statistics) -> repository.save(
+ new StatisticsDocument(
+ partiton,
+ statistics,
+ consumer.position(new TopicPartition(topic, partiton)))));
+ lastCommit = clock.instant();
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Slf4j
+public class KeyCountingRecordHandler implements RecordHandler<String, Long>
+{
+ private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
+
+
+ @Override
+ public void accept(ConsumerRecord<String, Long> record)
+ {
+ Integer partition = record.partition();
+ String key = record.key() == null ? "NULL" : record.key().toString();
+ Map<String, Long> byKey = seen.get(partition);
+
+ if (!byKey.containsKey(key))
+ byKey.put(key, 0l);
+
+ long seenByKey = byKey.get(key);
+ seenByKey++;
+ byKey.put(key, seenByKey);
+ }
+
+ public void addPartition(Integer partition, Map<String, Long> statistics)
+ {
+ seen.put(partition, statistics);
+ }
+
+ public Map<String, Long> removePartition(Integer partition)
+ {
+ return seen.remove(partition);
+ }
+
+
+ public Map<Integer, Map<String, Long>> getSeen()
+ {
+ return seen;
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.Optional;
+
+
+public interface PartitionStatisticsRepository extends MongoRepository<StatisticsDocument, String>
+{
+ public Optional<StatisticsDocument> findById(String partition);
+}
--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+
+public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener
+{
+ default void beforeNextPoll() {}
+}
--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.function.Consumer;
+
+
+public interface RecordHandler<K, V> extends Consumer<ConsumerRecord<K,V>>
+{
+ default void beforeNextPoll() {}
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Document(collection = "statistics")
+@ToString
+public class StatisticsDocument
+{
+ @Id
+ public String id;
+ public long offset = -1l;
+ public Map<String, Long> statistics;
+
+ public StatisticsDocument()
+ {
+ }
+
+ public StatisticsDocument(Integer partition)
+ {
+ this.id = Integer.toString(partition);
+ this.statistics = new HashMap<>();
+ }
+
+ public StatisticsDocument(Integer partition, Map<String, Long> statistics, long offset)
+ {
+ this.id = Integer.toString(partition);
+ this.statistics = statistics;
+ this.offset = offset;
+ }
+}
group-id: ${consumer.group-id}
topic: ${consumer.topic}
auto-offset-reset: ${consumer.auto-offset-reset}
+spring:
+ data:
+ mongodb:
+ uri: mongodb://juplo:training@localhost:27017
+ database: juplo
logging:
level:
root: INFO
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
-import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
properties = {
"consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
"consumer.topic=" + TOPIC,
- "consumer.commit-interval=1s" })
+ "consumer.commit-interval=1s",
+ "spring.mongodb.embedded.version=4.4.13" })
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@EnableAutoConfiguration
+@AutoConfigureDataMongo
@Slf4j
class ApplicationTests
{
@Autowired
KafkaConsumer<Bytes, Bytes> offsetConsumer;
@Autowired
+ PartitionStatisticsRepository partitionStatisticsRepository;
+ @Autowired
ApplicationProperties properties;
@Autowired
ExecutorService executor;
+ @Autowired
+ PartitionStatisticsRepository repository;
+ @Autowired
+ KeyCountingRebalanceListener keyCountingRebalanceListener;
+ @Autowired
+ KeyCountingRecordHandler keyCountingRecordHandler;
- Consumer<ConsumerRecord<String, Long>> testHandler;
EndlessConsumer<String, Long> endlessConsumer;
Map<TopicPartition, Long> oldOffsets;
Map<TopicPartition, Long> newOffsets;
void seekToEnd()
{
offsetConsumer.assign(partitions());
- offsetConsumer.seekToEnd(partitions());
partitions().forEach(tp ->
{
- // seekToEnd() works lazily: it only takes effect on poll()/position()
Long offset = offsetConsumer.position(tp);
log.info("New position for {}: {}", tp, offset);
+ Integer partition = tp.partition();
+ StatisticsDocument document =
+ partitionStatisticsRepository
+ .findById(partition.toString())
+ .orElse(new StatisticsDocument(partition));
+ document.offset = offset;
+ partitionStatisticsRepository.save(document);
});
- // The new positions must be commited!
- offsetConsumer.commitSync();
offsetConsumer.unsubscribe();
}
void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
{
- offsetConsumer.assign(partitions());
- partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
- offsetConsumer.unsubscribe();
- }
+ partitions().forEach(tp ->
+ {
+ String partition = Integer.toString(tp.partition());
+ Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
+ consumer.accept(tp, offset.orElse(0l));
+ });
+ }
List<TopicPartition> partitions()
{
@BeforeEach
public void init()
{
- testHandler = record -> {} ;
-
seekToEnd();
oldOffsets = new HashMap<>();
newOffsets.put(tp, offset - 1);
});
- Consumer<ConsumerRecord<String, Long>> captureOffsetAndExecuteTestHandler =
- record ->
- {
- newOffsets.put(
- new TopicPartition(record.topic(), record.partition()),
- record.offset());
- receivedRecords.add(record);
- testHandler.accept(record);
+ TestRecordHandler<String, Long> captureOffsetAndExecuteTestHandler =
+ new TestRecordHandler<String, Long>(keyCountingRecordHandler) {
+ @Override
+ public void onNewRecord(ConsumerRecord<String, Long> record)
+ {
+ newOffsets.put(
+ new TopicPartition(record.topic(), record.partition()),
+ record.offset());
+ receivedRecords.add(record);
+ }
};
endlessConsumer =
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
+ keyCountingRebalanceListener,
captureOffsetAndExecuteTestHandler);
endlessConsumer.start();
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("client.id", "OFFSET-CONSUMER");
- props.put("group.id", properties.getGroupId());
+ props.put("enable.auto.commit", false);
+ props.put("auto.offset.reset", "latest");
props.put("key.deserializer", BytesDeserializer.class.getName());
props.put("value.deserializer", BytesDeserializer.class.getName());
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+
+@RequiredArgsConstructor
+public abstract class TestRecordHandler<K, V> implements RecordHandler<K, V>
+{
+ private final RecordHandler<K, V> handler;
+
+
+ public abstract void onNewRecord(ConsumerRecord<K, V> record);
+
+
+ @Override
+ public void accept(ConsumerRecord<K, V> record)
+ {
+ this.onNewRecord(record);
+ handler.accept(record);
+ }
+ @Override
+
+ public void beforeNextPoll()
+ {
+ handler.beforeNextPoll();
+ }
+}