#!/bin/bash
-IMAGE=juplo/endless-consumer:1.0-SNAPSHOT
+IMAGE=juplo/sumup-adder:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
exit
fi
-docker-compose up -d zookeeper kafka cli
+docker-compose rm -svf adder-1 adder-2
+docker-compose rm -svf mongo
+docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli mongo express
if [[
$(docker image ls -q $IMAGE) == "" ||
"$1" = "build"
]]
then
- mvn install || exit
+ docker-compose rm -svf adder-1 adder-2
+ mvn -D skipTests clean install || exit
else
echo "Using image existing images:"
docker image ls $IMAGE
echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
docker-compose up setup
-docker-compose up -d
+docker-compose up -d gateway requests-1 requests-2 adder-1 adder-2
-while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done
-while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-2..."; sleep 1; done
-while ! [[ $(http 0:8083/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-3..."; sleep 1; done
-while ! [[ $(http 0:8084/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-4..."; sleep 1; done
-while ! [[ $(http 0:8085/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-5..."; sleep 1; done
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done
+while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-1..."; sleep 1; done
+while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-2..."; sleep 1; done
+while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
+while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done
-sleep 5
+docker-compose up -d peter klaus
-docker-compose exec -T cli bash << 'EOF'
-echo "Writing poison pill into topic test..."
-# tag::poisonpill[]
-echo 'BOOM!' | kafkacat -P -b kafka:9092 -t test
-# end::poisonpill[]
-EOF
+while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done
+http -v --pretty none -S :8091/results
+echo
+while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done
+http -v --pretty none -S :8092/results
+echo
+sleep 3
+echo "Resultate für adder-1"
+http -v --pretty none -S :8091/results
+echo
+echo "Resultate für peter von adder-1"
+http :8091/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-1"
+http :8091/results/klaus | jq .[].sum | uniq
+echo "Resultate für adder-2"
+http -v --pretty none -S :8092/results
+echo
+echo "Resultate für peter von adder-2"
+http :8092/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-2"
+http :8092/results/klaus | jq .[].sum | uniq
-while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-1 is still running..."; sleep 1; done
-while [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-2 is still running..."; sleep 1; done
-while [[ $(http 0:8083/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-3 is still running..."; sleep 1; done
-while [[ $(http 0:8084/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-4 is still running..."; sleep 1; done
-while [[ $(http 0:8085/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-5 is still running..."; sleep 1; done
+docker-compose stop adder-1
+sleep 1
+echo "Resultate für adder-2"
+http -v --pretty none -S :8092/results
+echo
+echo "Resultate für peter von adder-2"
+http :8092/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-2"
+http :8092/results/klaus | jq .[].sum | uniq
-http -v :8081/actuator/health
-echo "Restarting consumer-1"
-http -v post :8081/start
+docker-compose stop adder-2
+docker-compose start adder-1
+while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
+while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done
+echo "Resultate für adder-1"
+http -v --pretty none -S :8091/results
+echo
+echo "Resultate für peter von adder-1"
+http :8091/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-1"
+http :8091/results/klaus | jq .[].sum | uniq
-echo "Waiting for consumer-1 to come up"
-while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done
-http -v :8081/actuator/health
+docker-compose start adder-2
+while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done
+while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done
+echo "Resultate für adder-1"
+http -v --pretty none -S :8091/results
+echo
+echo "Resultate für adder-2"
+http -v --pretty none -S :8092/results
+echo
+echo "Resultate für peter von adder-1"
+http :8091/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-1"
+http :8091/results/klaus | jq .[].sum | uniq
+echo "Resultate für peter von adder-2"
+http :8092/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-2"
+http :8092/results/klaus | jq .[].sum | uniq
-echo "Waiting for consumer-1 to crash"
-while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-1 is still running..."; sleep 1; done
-http -v :8081/actuator/health
+docker-compose kill -s 9 adder-1
-docker-compose stop producer
-docker-compose logs --tail=10 consumer-1
-docker-compose logs --tail=10 consumer-2
-docker-compose logs --tail=10 consumer-3
-docker-compose logs --tail=10 consumer-4
-docker-compose logs --tail=10 consumer-5
+docker-compose start adder-1
+while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
+while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done
+docker-compose kill -s 9 peter klaus
+echo "Resultate für peter von adder-1"
+http :8091/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-1"
+http :8091/results/klaus | jq .[].sum | uniq
+echo "Resultate für peter von adder-2"
+http :8092/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-2"
+http :8092/results/klaus | jq .[].sum | uniq
ports:
- 2181:2181
- kafka:
+ kafka-1:
image: confluentinc/cp-kafka:7.1.3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081
+ KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081
+ KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+ ports:
+ - 9081:9081
+ depends_on:
+ - zookeeper
+
+ kafka-2:
+ image: confluentinc/cp-kafka:7.1.3
+ environment:
+ KAFKA_BROKER_ID: 2
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082
- KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082
+ KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
ports:
- 9092:9082
- 9082:9082
+ networks:
+ default:
+ aliases:
+ - kafka
+ depends_on:
+ - zookeeper
+
+ kafka-3:
+ image: confluentinc/cp-kafka:7.1.3
+ environment:
+ KAFKA_BROKER_ID: 3
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083
+ KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083
+ KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+ ports:
+ - 9083:9083
depends_on:
- zookeeper
+ mongo:
+ image: mongo:4.4.13
+ ports:
+ - 27017:27017
+ environment:
+ MONGO_INITDB_ROOT_USERNAME: juplo
+ MONGO_INITDB_ROOT_PASSWORD: training
+
+ express:
+ image: mongo-express
+ ports:
+ - 8090:8081
+ environment:
+ ME_CONFIG_MONGODB_ADMINUSERNAME: juplo
+ ME_CONFIG_MONGODB_ADMINPASSWORD: training
+ ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/
+ depends_on:
+ - mongo
+
setup:
image: juplo/toolbox
command: >
bash -c "
- kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
- kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+ kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in
+ kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out
+ kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+ kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+ kafka-topics --bootstrap-server kafka:9092 --describe --topic in
+ kafka-topics --bootstrap-server kafka:9092 --describe --topic out
"
cli:
image: juplo/toolbox
command: sleep infinity
- producer:
- image: juplo/endless-long-producer:1.0-SNAPSHOT
+ gateway:
+ image: juplo/sumup-gateway:1.0-SNAPSHOT
ports:
- 8080:8080
environment:
server.port: 8080
- producer.bootstrap-server: kafka:9092
- producer.client-id: producer
- producer.topic: test
- producer.throttle-ms: 200
-
+ sumup.gateway.bootstrap-server: kafka:9092
+ sumup.gateway.client-id: gateway
+ sumup.gateway.topic: in
- consumer-1:
- image: juplo/endless-consumer:1.0-SNAPSHOT
+ requests-1:
+ image: juplo/sumup-requests:1.0-SNAPSHOT
ports:
- 8081:8080
environment:
server.port: 8080
- consumer.bootstrap-server: kafka:9092
- consumer.client-id: consumer-1
+ sumup.requests.bootstrap-server: kafka:9092
+ sumup.requests.client-id: requests-1
- consumer-2:
- image: juplo/endless-consumer:1.0-SNAPSHOT
+ requests-2:
+ image: juplo/sumup-requests:1.0-SNAPSHOT
ports:
- 8082:8080
environment:
server.port: 8080
- consumer.bootstrap-server: kafka:9092
- consumer.client-id: consumer-2
+ sumup.requests.bootstrap-server: kafka:9092
+ sumup.requests.client-id: requests-2
- consumer-3:
- image: juplo/endless-consumer:1.0-SNAPSHOT
+ adder-1:
+ image: juplo/sumup-adder:1.0-SNAPSHOT
ports:
- - 8083:8080
+ - 8091:8080
environment:
server.port: 8080
- consumer.bootstrap-server: kafka:9092
- consumer.client-id: consumer-3
+ sumup.adder.bootstrap-server: kafka:9092
+ sumup.adder.client-id: adder-1
+ sumup.adder.commit-interval: 3s
+ sumup.adder.throttle: 3ms
+ spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
+ spring.data.mongodb.database: juplo
+ logging.level.org.apache.kafka.clients.consumer: DEBUG
- consumer-4:
- image: juplo/endless-consumer:1.0-SNAPSHOT
+ adder-2:
+ image: juplo/sumup-adder:1.0-SNAPSHOT
ports:
- - 8084:8080
+ - 8092:8080
environment:
server.port: 8080
- consumer.bootstrap-server: kafka:9092
- consumer.client-id: consumer-4
+ sumup.adder.bootstrap-server: kafka:9092
+ sumup.adder.client-id: adder-2
+ sumup.adder.commit-interval: 3s
+ sumup.adder.throttle: 3ms
+ spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
+ spring.data.mongodb.database: juplo
+ logging.level.org.apache.kafka.clients.consumer: DEBUG
- consumer-5:
- image: juplo/endless-consumer:1.0-SNAPSHOT
- ports:
- - 8085:8080
- environment:
- server.port: 8080
- consumer.bootstrap-server: kafka:9092
- consumer.client-id: consumer-5
+ peter:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ while [[ true ]];
+ do
+ echo 666 | http -v gateway:8080/peter;
+ sleep 1;
+ done
+ "
+ klaus:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ while [[ true ]];
+ do
+ echo 666 | http -v gateway:8080/klaus;
+ sleep 1;
+ done
+ "
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>endless-consumer</artifactId>
+ <artifactId>sumup-adder</artifactId>
<version>1.0-SNAPSHOT</version>
- <name>Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic</name>
+ <name>SumUp Adder</name>
+ <description>Calculates the sum for the send messages</description>
<properties>
<java.version>11</java.version>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-mongodb</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>de.flapdoodle.embed</groupId>
+ <artifactId>de.flapdoodle.embed.mongo</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
--- /dev/null
+package de.juplo.kafka;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+
+public class AdderBusinessLogic
+{
+ private final Map<String, AdderResult> state;
+
+
+ public AdderBusinessLogic()
+ {
+ this(new HashMap<>());
+ }
+
+ public AdderBusinessLogic(Map<String, AdderResult> state)
+ {
+ this.state = state;
+ }
+
+
+ public synchronized Optional<Long> getSum(String user)
+ {
+ return Optional.ofNullable(state.get(user)).map(result -> result.sum);
+ }
+
+ public synchronized void addToSum(String user, Integer value)
+ {
+ if (value == null || value < 1)
+ throw new IllegalArgumentException("Not a positive number: " + value);
+
+ long sum =
+ Optional
+ .ofNullable(state.get(user))
+ .map(result -> result.sum)
+ .orElse(0l);
+ state.put(user, new AdderResult(value, sum + value));
+ }
+
+ public synchronized AdderResult calculate(String user)
+ {
+ if (!state.containsKey(user))
+ throw new IllegalStateException("No sumation for " + user + " in progress");
+
+ return state.remove(user);
+ }
+
+ protected Map<String, AdderResult> getState()
+ {
+ return state;
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+@Getter
+@EqualsAndHashCode
+public class AdderResult
+{
+ final int number;
+ final long sum;
+
+ @Override
+ public String toString()
+ {
+ return "sum(" + number + ") = " + sum;
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+public class AdderResults
+{
+ private final Map<Integer, Map<String, List<AdderResult>>> results = new HashMap<>();
+
+
+ public void addResults(Integer partition, String user, AdderResult result)
+ {
+ Map<String, List<AdderResult>> resultsByUser = this.results.get(partition);
+
+ List<AdderResult> results = resultsByUser.get(user);
+ if (results == null)
+ {
+ results = new LinkedList<>();
+ resultsByUser.put(user, results);
+ }
+
+ results.add(result);
+ }
+
+ protected void addPartition(Integer partition, Map<String, List<AdderResult>> results)
+ {
+ this.results.put(partition, results);
+ }
+
+ protected Map<String, List<AdderResult>> removePartition(Integer partition)
+ {
+ return this.results.remove(partition);
+ }
+
+ public Map<Integer, Map<String, List<AdderResult>>> getState()
+ {
+ return results;
+ }
+
+ public Map<String, List<AdderResult>> getState(Integer partition)
+ {
+ return results.get(partition);
+ }
+}
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.PreDestroy;
-import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
}
@PreDestroy
- public void stopExecutor()
+ public void shutdown()
{
+ try
+ {
+ log.info("Stopping EndlessConsumer");
+ endlessConsumer.stop();
+ }
+ catch (IllegalStateException e)
+ {
+ log.info("Was already stopped: {}", e.toString());
+ }
+ catch (Exception e)
+ {
+ log.error("Unexpected exception while stopping EndlessConsumer: {}", e);
+ }
+
try
{
log.info("Shutting down the ExecutorService.");
}
catch (InterruptedException e)
{
- log.error("Exception while waiting for the termination of the ExecutorService: {}", e.toString());
+ log.error("Exception while waiting for the termination of the ExecutorService: {}", e);
}
finally
{
package de.juplo.kafka;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ApplicationConfiguration
{
@Bean
- public RecordHandler<String, Long> applicationRecordHandler()
+ public ApplicationRecordHandler recordHandler(
+ AdderResults adderResults,
+ ApplicationProperties properties)
+ {
+ return new ApplicationRecordHandler(
+ adderResults,
+ Optional.ofNullable(properties.getThrottle()),
+ properties.getClientId());
+ }
+
+ @Bean
+ public AdderResults adderResults()
+ {
+ return new AdderResults();
+ }
+
+ @Bean
+ public ApplicationRebalanceListener rebalanceListener(
+ ApplicationRecordHandler recordHandler,
+ AdderResults adderResults,
+ StateRepository stateRepository,
+ Consumer<String, String> consumer,
+ ApplicationProperties properties)
{
- return (record) ->
- {
- // Handle record
- };
+ return new ApplicationRebalanceListener(
+ recordHandler,
+ adderResults,
+ stateRepository,
+ properties.getClientId(),
+ consumer);
}
@Bean
- public EndlessConsumer<String, Long> endlessConsumer(
- KafkaConsumer<String, Long> kafkaConsumer,
+ public EndlessConsumer<String, String> endlessConsumer(
+ KafkaConsumer<String, String> kafkaConsumer,
ExecutorService executor,
- RecordHandler recordHandler,
+ ApplicationRebalanceListener rebalanceListener,
+ ApplicationRecordHandler recordHandler,
ApplicationProperties properties)
{
return
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
+ rebalanceListener,
recordHandler);
}
}
@Bean(destroyMethod = "close")
- public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
+ public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
props.put("group.id", properties.getGroupId());
props.put("client.id", properties.getClientId());
props.put("auto.offset.reset", properties.getAutoOffsetReset());
props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
props.put("metadata.max.age.ms", "1000");
props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", LongDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
return new KafkaConsumer<>(props);
}
@RequiredArgsConstructor
public class ApplicationHealthIndicator implements HealthIndicator
{
- private final EndlessConsumer<String, Long> consumer;
+ private final EndlessConsumer<String, String> consumer;
@Override
import java.time.Duration;
-@ConfigurationProperties(prefix = "consumer")
+@ConfigurationProperties(prefix = "sumup.adder")
@Validated
@Getter
@Setter
private String autoOffsetReset;
@NotNull
private Duration commitInterval;
+ private Duration throttle;
}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.*;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ApplicationRebalanceListener implements RebalanceListener
+{
+ private final ApplicationRecordHandler recordHandler;
+ private final AdderResults adderResults;
+ private final StateRepository stateRepository;
+ private final String id;
+ private final Consumer consumer;
+
+ private final Set<Integer> partitions = new HashSet<>();
+
+ private boolean commitsEnabled = true;
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ log.info("{} - adding partition: {}", id, partition);
+ this.partitions.add(partition);
+ StateDocument document =
+ stateRepository
+ .findById(Integer.toString(partition))
+ .orElse(new StateDocument(partition));
+ recordHandler.addPartition(partition, document.state);
+ for (String user : document.state.keySet())
+ {
+ log.info(
+ "{} - Restored state for partition={}|user={}: {}",
+ id,
+ partition,
+ user,
+ document.state.get(user));
+ }
+ adderResults.addPartition(partition, document.results);
+ });
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ if (commitsEnabled)
+ {
+ log.info("{} - Commiting offsets for all previously assigned partitions", id);
+ try
+ {
+ consumer.commitSync();
+ }
+ catch (Exception e)
+ {
+ log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
+ }
+ }
+
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ log.info("{} - removing partition: {}", id, partition);
+ this.partitions.remove(partition);
+ Map<String, AdderResult> state = recordHandler.removePartition(partition);
+ for (String user : state.keySet())
+ {
+ log.info(
+ "{} - Saved state for partition={}|user={}: {}",
+ id,
+ partition,
+ user,
+ state.get(user));
+ }
+ Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
+ stateRepository.save(new StateDocument(partition, state, results));
+ });
+ }
+
+ @Override
+ public void enableCommits()
+ {
+ commitsEnabled = true;
+ }
+
+ @Override
+ public void disableCommits()
+ {
+ commitsEnabled = false;
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ApplicationRecordHandler implements RecordHandler<String, String>
+{
+ private final AdderResults results;
+ private final Optional<Duration> throttle;
+ private final String id;
+
+ private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+
+
+ @Override
+ public void accept(ConsumerRecord<String, String> record)
+ {
+ Integer partition = record.partition();
+ String user = record.key();
+ String message = record.value();
+
+ if (message.equals("CALCULATE"))
+ {
+ AdderResult result = state.get(partition).calculate(user);
+ log.info("{} - New result for {}: {}", id, user, result);
+ results.addResults(partition, user, result);
+ }
+ else
+ {
+ state.get(partition).addToSum(user, Integer.parseInt(message));
+ }
+
+ if (throttle.isPresent())
+ {
+ try
+ {
+ Thread.sleep(throttle.get().toMillis());
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("{} - Intrerrupted while throttling: {}", id, e);
+ }
+ }
+ }
+
+ protected void addPartition(Integer partition, Map<String, AdderResult> state)
+ {
+ this.state.put(partition, new AdderBusinessLogic(state));
+ }
+
+ protected Map<String, AdderResult> removePartition(Integer partition)
+ {
+ return this.state.remove(partition).getState();
+ }
+
+
+ public Map<Integer, AdderBusinessLogic> getState()
+ {
+ return state;
+ }
+
+ public AdderBusinessLogic getState(Integer partition)
+ {
+ return state.get(partition);
+ }
+}
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
-import org.springframework.web.bind.annotation.ExceptionHandler;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.ResponseStatus;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
@RestController
public class DriverController
{
private final EndlessConsumer consumer;
+ private final ApplicationRecordHandler recordHandler;
+ private final AdderResults results;
@PostMapping("start")
}
- @GetMapping("seen")
- public Map<Integer, Map<String, Long>> seen()
+ @GetMapping("state")
+ public Map<Integer, Map<String, AdderResult>> state()
{
- return consumer.getSeen();
+ return
+ recordHandler
+ .getState()
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ entry -> entry.getKey(),
+ entry -> entry.getValue().getState()));
+ }
+
+ @GetMapping("state/{user}")
+ public ResponseEntity<Long> state(@PathVariable String user)
+ {
+ for (AdderBusinessLogic adder : recordHandler.getState().values())
+ {
+ Optional<Long> sum = adder.getSum(user);
+ if (sum.isPresent())
+ return ResponseEntity.ok(sum.get());
+ }
+
+ return ResponseEntity.notFound().build();
+ }
+
+ @GetMapping("results")
+ public Map<Integer, Map<String, List<AdderResult>>> results()
+ {
+ return results.getState();
+ }
+
+ @GetMapping("results/{user}")
+ public ResponseEntity<List<AdderResult>> results(@PathVariable String user)
+ {
+ for (Map<String, List<AdderResult>> resultsByUser : this.results.getState().values())
+ {
+ List<AdderResult> results = resultsByUser.get(user);
+ if (results != null)
+ return ResponseEntity.ok(results);
+ }
+
+ return ResponseEntity.notFound().build();
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
@Slf4j
@RequiredArgsConstructor
-public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
+public class EndlessConsumer<K, V> implements Runnable
{
private final ExecutorService executor;
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
- private final RecordHandler handler;
+ private final RebalanceListener rebalanceListener;
+ private final RecordHandler<K, V> recordHandler;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Exception exception;
private long consumed = 0;
- private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
- private final Map<Integer, Long> offsets = new HashMap<>();
-
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- Long newOffset = consumer.position(tp);
- Long oldOffset = offsets.remove(partition);
- log.info(
- "{} - removing partition: {}, consumed {} records (offset {} -> {})",
- id,
- partition,
- newOffset - oldOffset,
- oldOffset,
- newOffset);
- Map<String, Long> removed = seen.remove(partition);
- for (String key : removed.keySet())
- {
- log.info(
- "{} - Seen {} messages for partition={}|key={}",
- id,
- removed.get(key),
- partition,
- key);
- }
- });
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- Long offset = consumer.position(tp);
- log.info("{} - adding partition: {}, offset={}", id, partition, offset);
- offsets.put(partition, offset);
- seen.put(partition, new HashMap<>());
- });
- }
@Override
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic), this);
+ rebalanceListener.enableCommits();
+ consumer.subscribe(Arrays.asList(topic), rebalanceListener);
while (true)
{
record.value()
);
- handler.accept(record);
+ recordHandler.accept(record);
consumed++;
-
- Integer partition = record.partition();
- String key = record.key() == null ? "NULL" : record.key().toString();
- Map<String, Long> byKey = seen.get(partition);
-
- if (!byKey.containsKey(key))
- byKey.put(key, 0l);
-
- long seenByKey = byKey.get(key);
- seenByKey++;
- byKey.put(key, seenByKey);
}
}
}
catch(WakeupException e)
{
- log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
- consumer.commitSync();
+ log.info("{} - RIIING! Request to stop consumption.", id);
shutdown();
}
catch(RecordDeserializationException e)
offset,
e.getCause().toString());
- consumer.commitSync();
shutdown(e);
}
catch(Exception e)
{
- log.error("{} - Unexpected error: {}", id, e.toString(), e);
+ log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e);
+ rebalanceListener.disableCommits();
shutdown(e);
}
finally
}
}
- public Map<Integer, Map<String, Long>> getSeen()
- {
- return seen;
- }
-
public void start()
{
lock.lock();
}
}
- public synchronized void stop() throws ExecutionException, InterruptedException
+ public synchronized void stop() throws InterruptedException
{
lock.lock();
try
public void destroy() throws ExecutionException, InterruptedException
{
log.info("{} - Destroy!", id);
- try
- {
- stop();
- }
- catch (IllegalStateException e)
- {
- log.info("{} - Was already stopped", id);
- }
- catch (Exception e)
- {
- log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
- }
- finally
- {
- log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
- }
+ log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
}
public boolean running()
--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+
+public interface RebalanceListener extends ConsumerRebalanceListener
+{
+ void enableCommits();
+ void disableCommits();
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+@Document(collection = "state")
+@ToString
+public class StateDocument
+{
+ @Id
+ public String id;
+ public Map<String, AdderResult> state;
+ public Map<String, List<AdderResult>> results;
+
+ public StateDocument()
+ {
+ }
+
+ public StateDocument(Integer partition)
+ {
+ this.id = Integer.toString(partition);
+ this.state = new HashMap<>();
+ this.results = new HashMap<>();
+ }
+
+ public StateDocument(
+ Integer partition,
+ Map<String, AdderResult> state,
+ Map<String, List<AdderResult>> results)
+ {
+ this.id = Integer.toString(partition);
+ this.state = state;
+ this.results = results;
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.Optional;
+
+
+public interface StateRepository extends MongoRepository<StateDocument, String>
+{
+ public Optional<StateDocument> findById(String partition);
+}
-consumer:
- bootstrap-server: :9092
- group-id: my-group
- client-id: DEV
- topic: test
- auto-offset-reset: earliest
- commit-interval: 5s
+sumup:
+ adder:
+ bootstrap-server: :9092
+ group-id: my-group
+ client-id: DEV
+ topic: out
+ auto-offset-reset: earliest
+ commit-interval: 5s
management:
endpoint:
shutdown:
group-id: ${consumer.group-id}
topic: ${consumer.topic}
auto-offset-reset: ${consumer.auto-offset-reset}
+spring:
+ data:
+ mongodb:
+ uri: mongodb://juplo:training@localhost:27017
+ database: juplo
logging:
level:
root: INFO
--- /dev/null
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.*;
+
+
+public class AdderBusinessLogicTest
+{
+ @Test
+ @DisplayName("An empty Optional should be returned, for a non-existing sum")
+ public void testGetSumReturnsEmptyOptionalForNonExistingSum()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ assertThat(adder.getSum("foo")).isEmpty();
+ }
+
+ @Test
+ @DisplayName("A non-empty Optional should be returned, for an existing sum")
+ public void testGetSumReturnsNonEmptyOptionalForExistingSum()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ adder.addToSum("foo", 6);
+ assertThat(adder.getSum("foo")).isNotEmpty();
+ }
+
+ @Test
+ @DisplayName("A sum can be calculated, if it does exist")
+ public void testCalculatePossibleIfSumExists()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ adder.addToSum("foo", 6);
+ assertThatNoException().isThrownBy(() -> adder.calculate("foo"));
+ }
+
+ @Test
+ @DisplayName("An existing sum is removed, if ended")
+ public void testCalculateRemovesSumIfSumExists()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ adder.addToSum("foo", 6);
+ adder.calculate("foo");
+ assertThat(adder.getSum("foo")).isEmpty();
+ }
+
+ @Test
+ @DisplayName("An existing sum returns a non-null value, if calculated")
+ public void testCalculateReturnsNonNullValueIfSumExists()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ adder.addToSum("foo", 6);
+ assertThat(adder.calculate("foo")).isNotNull();
+ }
+
+ @Test
+ @DisplayName("Ending a non-existing sum, causes an IllegalStateException")
+ public void testCalculateCausesExceptionIfNotExists()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo"));
+ }
+
+ @Test
+ @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException")
+ public void testAddToSumWithNullValueCausesException()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null));
+ }
+
+ @ParameterizedTest(name = "{index}: Adding {0}")
+ @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException")
+ @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE })
+ public void testAddToSumWithNonPositiveValueCausesException(int value)
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value));
+ }
+
+ @ParameterizedTest(name = "{index}: Adding {0}")
+ @DisplayName("Can add a positive value to a sum")
+ @ValueSource(ints = { 1, 3, 6, 66, 7, 9 })
+ public void testAddToSumWithPositiveValuePossible(int value)
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value));
+ }
+
+ @ParameterizedTest(name = "{index}: Summing up {0}")
+ @DisplayName("Adds up numbers correctly")
+ @MethodSource("numbersProvider")
+ public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers)
+ {
+ long expectedResult = Arrays.stream(numbers).sum();
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number));
+ AdderResult result = adder.calculate("foo");
+ assertThat(result.number).isEqualTo(numbers[numbers.length-1]);
+ assertThat(result.sum).isEqualTo(expectedResult);
+ }
+
+ static Stream<Arguments> numbersProvider() {
+ return Stream.of(
+ Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()),
+ Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()),
+ Arguments.of((Object) IntStream.rangeClosed(1,66).toArray()));
+ }
+}
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
"consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
- "consumer.topic=" + TOPIC })
+ "consumer.topic=" + TOPIC,
+ "spring.mongodb.embedded.version=4.4.13" })
@EmbeddedKafka(topics = TOPIC)
@AutoConfigureDataMongo
public class ApplicationIT
package de.juplo.kafka;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.test.context.ContextConfiguration;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.*;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
-@ContextConfiguration(classes = ApplicationTests.Configuration.class)
-public class ApplicationTests extends GenericApplicationTests<String, Long>
+@Slf4j
+public class ApplicationTests extends GenericApplicationTests<String, String>
{
+ @Autowired
+ StateRepository stateRepository;
+
+
public ApplicationTests()
{
- super(
- new RecordGenerator()
+ super(new ApplicationTestRecrodGenerator());
+ ((ApplicationTestRecrodGenerator) recordGenerator).tests = this;
+ }
+
+
+ static class ApplicationTestRecrodGenerator implements RecordGenerator
+ {
+ ApplicationTests tests;
+
+ final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
+ final String[] dieWilden13 =
+ IntStream
+ .range(1, 14)
+ .mapToObj(i -> "seeräuber-" + i)
+ .toArray(i -> new String[i]);
+ final StringSerializer stringSerializer = new StringSerializer();
+ final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
+
+ int counter = 0;
+
+ Map<String, List<AdderResult>> state;
+
+ @Override
+ public int generate(
+ boolean poisonPills,
+ boolean logicErrors,
+ Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+ {
+ counter = 0;
+ state =
+ Arrays
+ .stream(dieWilden13)
+ .collect(Collectors.toMap(
+ seeräuber -> seeräuber,
+ seeräuber -> new LinkedList()));
+
+ int number[] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
+ int message[] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
+ int next = 0;
+
+ for (int pass = 0; pass < 333; pass++)
+ {
+ for (int i = 0; i < 13; i++)
{
- final StringSerializer stringSerializer = new StringSerializer();
- final LongSerializer longSerializer = new LongSerializer();
+ String seeräuber = dieWilden13[i];
+ Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
+
+ if (message[i] > number[i])
+ {
+ send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender);
+ state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
+ // Pick next number to calculate
+ number[i] = numbers[next++ % numbers.length];
+ message[i] = 1;
+ log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
+ }
+
+ Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++)));
+ send(key, value, fail(logicErrors, pass, counter), messageSender);
+ }
+ }
+
+ return counter;
+ }
+ boolean fail(boolean logicErrors, int pass, int counter)
+ {
+ return logicErrors && pass > 300 && counter % 77 == 0;
+ }
+
+ void send(
+ Bytes key,
+ Bytes value,
+ boolean fail,
+ Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+ {
+ counter++;
+
+ if (fail)
+ {
+ value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
+ }
+
+ messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
+ }
+
+ @Override
+ public boolean canGeneratePoisonPill()
+ {
+ return false;
+ }
- @Override
- public int generate(
- boolean poisonPills,
- boolean logicErrors,
- Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+ @Override
+ public void assertBusinessLogic()
+ {
+ for (int i = 0; i < PARTITIONS; i++)
+ {
+ StateDocument stateDocument =
+ tests.stateRepository.findById(Integer.toString(i)).get();
+
+ stateDocument
+ .results
+ .entrySet()
+ .stream()
+ .forEach(entry ->
{
- int i = 0;
+ String user = entry.getKey();
+ List<AdderResult> resultsForUser = entry.getValue();
- for (int partition = 0; partition < 10; partition++)
+ for (int j = 0; j < resultsForUser.size(); j++)
{
- for (int key = 0; key < 10000; key++)
+ if (!(j < state.get(user).size()))
{
- i++;
-
- Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
- if (i == 99977)
- {
- if (logicErrors)
- {
- value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
- }
- if (poisonPills)
- {
- value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
- }
- }
-
- ProducerRecord<Bytes, Bytes> record =
- new ProducerRecord<>(
- TOPIC,
- partition,
- new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
- value);
-
- messageSender.accept(record);
+ break;
}
- }
-
- return i;
- }
- });
- }
+ assertThat(resultsForUser.get(j))
+ .as("Unexpected results calculation %d of user %s", j, user)
+ .isEqualTo(state.get(user).get(j));
+ }
- @TestConfiguration
- public static class Configuration
- {
- @Bean
- public RecordHandler<String, Long> applicationRecordHandler()
- {
- return (record) ->
- {
- if (record.value() == Long.MIN_VALUE)
- throw new RuntimeException("BOOM (Logic-Error)!");
- };
+ assertThat(state.get(user))
+ .as("More results calculated for user %s as expected", user)
+ .containsAll(resultsForUser);
+ });
+ }
}
}
-}
+}
\ No newline at end of file
package de.juplo.kafka;
+import com.mongodb.client.MongoClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.mongo.MongoProperties;
+import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
@TestPropertySource(
properties = {
- "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
- "consumer.topic=" + TOPIC,
- "consumer.commit-interval=500ms" })
+ "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "sumup.adder.topic=" + TOPIC,
+ "sumup.adder.commit-interval=500ms",
+ "spring.mongodb.embedded.version=4.4.13" })
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@EnableAutoConfiguration
+@AutoConfigureDataMongo
@Slf4j
abstract class GenericApplicationTests<K, V>
{
@Autowired
ApplicationProperties applicationProperties;
@Autowired
+ MongoClient mongoClient;
+ @Autowired
+ MongoProperties mongoProperties;
+ @Autowired
+ RebalanceListener rebalanceListener;
+ @Autowired
TestRecordHandler<K, V> recordHandler;
@Autowired
EndlessConsumer<K, V> endlessConsumer;
-
KafkaProducer<Bytes, Bytes> testRecordProducer;
KafkaConsumer<Bytes, Bytes> offsetConsumer;
Map<TopicPartition, Long> oldOffsets;
props.put("value.deserializer", BytesDeserializer.class.getName());
offsetConsumer = new KafkaConsumer<>(props);
+ mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
seekToEnd();
oldOffsets = new HashMap<>();