#!/bin/bash
-IMAGE=juplo/sumup-adder:1.0-SNAPSHOT
+IMAGE=juplo/endless-consumer:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
exit
fi
-docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli mongo express
+docker-compose up -d zookeeper kafka cli mongo express
if [[
$(docker image ls -q $IMAGE) == "" ||
"$1" = "build"
]]
then
- docker-compose rm -svf adder
+ docker-compose rm -svf peter beate
mvn clean install || exit
else
echo "Using image existing images:"
echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
docker-compose up setup
-docker-compose up -d gateway requests adder
+docker-compose up -d producer peter beate
-while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done
-while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests..."; sleep 1; done
-while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder..."; sleep 1; done
+sleep 15
-echo 66 | http -v :8080/foo
-echo 666 | http -v :8080/bar
+http -v post :8082/stop
+sleep 10
+docker-compose kill -s 9 peter
+http -v post :8082/start
+sleep 60
-sleep 5
-
-http -v :8082/state
-http -v :8082/state/foo
-http -v :8082/state/bar
-
-docker-compose logs adder
+docker-compose stop producer peter beate
+docker-compose logs beate
+docker-compose logs --tail=10 peter
ports:
- 2181:2181
- kafka-1:
+ kafka:
image: confluentinc/cp-kafka:7.1.3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081
- KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081
- KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
- KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
- ports:
- - 9081:9081
- depends_on:
- - zookeeper
-
- kafka-2:
- image: confluentinc/cp-kafka:7.1.3
- environment:
- KAFKA_BROKER_ID: 2
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082
- KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082
+ KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
ports:
- 9092:9082
- 9082:9082
- networks:
- default:
- aliases:
- - kafka
- depends_on:
- - zookeeper
-
- kafka-3:
- image: confluentinc/cp-kafka:7.1.3
- environment:
- KAFKA_BROKER_ID: 3
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083
- KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083
- KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
- KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
- ports:
- - 9083:9083
depends_on:
- zookeeper
image: juplo/toolbox
command: >
bash -c "
- kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in
- kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out
- kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2
- kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 1 --replication-factor 1
- kafka-topics --bootstrap-server kafka:9092 --describe --topic in
- kafka-topics --bootstrap-server kafka:9092 --describe --topic out
+ kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+ kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
"
cli:
image: juplo/toolbox
command: sleep infinity
- gateway:
- image: juplo/sumup-gateway:1.0-SNAPSHOT
+ producer:
+ image: juplo/endless-long-producer:1.0-SNAPSHOT
ports:
- 8080:8080
environment:
server.port: 8080
- sumup.gateway.bootstrap-server: kafka:9092
- sumup.gateway.client-id: gateway
- sumup.gateway.topic: in
+ producer.bootstrap-server: kafka:9092
+ producer.client-id: producer
+ producer.topic: test
+ producer.throttle-ms: 500
+
- requests:
- image: juplo/sumup-requests:1.0-SNAPSHOT
+ peter:
+ image: juplo/endless-consumer:1.0-SNAPSHOT
ports:
- 8081:8080
environment:
server.port: 8080
- sumup.requests.bootstrap-server: kafka:9092
- sumup.requests.client-id: requests
+ consumer.bootstrap-server: kafka:9092
+ consumer.client-id: peter
+ consumer.topic: test
+ spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
+ spring.data.mongodb.database: juplo
- adder:
- image: juplo/sumup-adder:1.0-SNAPSHOT
+ beate:
+ image: juplo/endless-consumer:1.0-SNAPSHOT
ports:
- 8082:8080
environment:
server.port: 8080
- sumup.adder.bootstrap-server: kafka:9092
- sumup.adder.client-id: adder
+ consumer.bootstrap-server: kafka:9092
+ consumer.client-id: beate
+ consumer.topic: test
spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
spring.data.mongodb.database: juplo
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>sumup-adder</artifactId>
+ <artifactId>endless-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
- <name>SumUp Adder</name>
- <description>Calculates the sum for the send messages</description>
+ <name>Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic</name>
<properties>
<java.version>11</java.version>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
+++ /dev/null
-package de.juplo.kafka;
-
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-public class AdderBusinessLogic
-{
- private final Map<String, Long> state;
-
-
- public AdderBusinessLogic()
- {
- this(new HashMap<>());
- }
-
- public AdderBusinessLogic(Map<String, Long> state)
- {
- this.state = state;
- }
-
-
- public synchronized void startSum(String user)
- {
- if (state.containsKey(user))
- throw new IllegalStateException("Sumation for " + user + " already in progress, state: " + state.get(user));
-
- state.put(user, 0l);
- }
-
- public synchronized Optional<Long> getSum(String user)
- {
- return Optional.ofNullable(state.get(user));
- }
-
- public synchronized void addToSum(String user, Integer value)
- {
- if (!state.containsKey(user))
- throw new IllegalStateException("No sumation for " + user + " in progress");
- if (value == null || value < 1)
- throw new IllegalArgumentException("Not a positive number: " + value);
-
- long result = state.get(user) + value;
- state.put(user, result);
- }
-
- public synchronized Long endSum(String user)
- {
- if (!state.containsKey(user))
- throw new IllegalStateException("No sumation for " + user + " in progress");
-
- return state.remove(user);
- }
-
- protected Map<String, Long> getState()
- {
- return state;
- }
-}
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
public ApplicationRebalanceListener rebalanceListener(
ApplicationRecordHandler recordHandler,
StateRepository stateRepository,
- Consumer<String, String> consumer,
+ Consumer<String, Long> consumer,
ApplicationProperties properties)
{
return new ApplicationRebalanceListener(
}
@Bean
- public EndlessConsumer<String, String> endlessConsumer(
- KafkaConsumer<String, String> kafkaConsumer,
+ public EndlessConsumer<String, Long> endlessConsumer(
+ KafkaConsumer<String, Long> kafkaConsumer,
ExecutorService executor,
ApplicationRebalanceListener rebalanceListener,
ApplicationRecordHandler recordHandler,
}
@Bean(destroyMethod = "close")
- public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
+ public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("client.id", properties.getClientId());
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", properties.getAutoOffsetReset());
+ props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
props.put("metadata.max.age.ms", "1000");
props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", LongDeserializer.class.getName());
return new KafkaConsumer<>(props);
}
@RequiredArgsConstructor
public class ApplicationHealthIndicator implements HealthIndicator
{
- private final EndlessConsumer<String, String> consumer;
+ private final EndlessConsumer<String, Long> consumer;
@Override
import java.time.Duration;
-@ConfigurationProperties(prefix = "sumup.adder")
+@ConfigurationProperties(prefix = "consumer")
@Validated
@Getter
@Setter
private final String topic;
private final Clock clock;
private final Duration commitInterval;
- private final Consumer<String, String> consumer;
+ private final Consumer<String, Long> consumer;
private Instant lastCommit = Instant.EPOCH;
private boolean commitsEnabled = true;
if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
{
log.debug("Storing data and offsets, last commit: {}", lastCommit);
- recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
+ recordHandler.getState().forEach((partiton, state) -> stateRepository.save(
new StateDocument(
partiton,
- adder.getState(),
+ state,
consumer.position(new TopicPartition(topic, partiton)))));
lastCommit = clock.instant();
}
@Slf4j
-public class ApplicationRecordHandler implements RecordHandler<String, String>
+public class ApplicationRecordHandler implements RecordHandler<String, Long>
{
- private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+ private final Map<Integer, Map<String, Long>> state = new HashMap<>();
@Override
- public void accept(ConsumerRecord<String, String> record)
+ public void accept(ConsumerRecord<String, Long> record)
{
Integer partition = record.partition();
- String user = record.key();
- String message = record.value();
- switch (message)
- {
- case "START":
- state.get(partition).startSum(user);
- break;
-
- case "END":
- Long result = state.get(partition).endSum(user);
- log.info("New result for {}: {}", user, result);
- break;
-
- default:
- state.get(partition).addToSum(user, Integer.parseInt(message));
- break;
- }
+ String key = record.key() == null ? "NULL" : record.key().toString();
+ Map<String, Long> byKey = state.get(partition);
+
+ if (!byKey.containsKey(key))
+ byKey.put(key, 0l);
+
+ long seenByKey = byKey.get(key);
+ seenByKey++;
+ byKey.put(key, seenByKey);
}
protected void addPartition(Integer partition, Map<String, Long> state)
{
- this.state.put(partition, new AdderBusinessLogic(state));
+ this.state.put(partition, state);
}
protected Map<String, Long> removePartition(Integer partition)
{
- return this.state.remove(partition).getState();
+ return this.state.remove(partition);
}
- public Map<Integer, AdderBusinessLogic> getState()
+ public Map<Integer, Map<String, Long>> getState()
{
return state;
}
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
@RestController
@GetMapping("state")
public Map<Integer, Map<String, Long>> state()
{
- return
- recordHandler
- .getState()
- .entrySet()
- .stream()
- .collect(Collectors.toMap(
- entry -> entry.getKey(),
- entry -> entry.getValue().getState()));
- }
-
- @GetMapping("state/{user}")
- public ResponseEntity<Long> seen(@PathVariable String user)
- {
- for (AdderBusinessLogic adder : recordHandler.getState().values())
- {
- Optional<Long> sum = adder.getSum(user);
- if (sum.isPresent())
- return ResponseEntity.ok(sum.get());
- }
-
- return ResponseEntity.notFound().build();
+ return recordHandler.getState();
}
this.state = new HashMap<>();
}
- public StateDocument(
- Integer partition,
- Map<String, Long> state,
- long offset)
+ public StateDocument(Integer partition, Map<String, Long> state, long offset)
{
this.id = Integer.toString(partition);
this.state = state;
-sumup:
- adder:
- bootstrap-server: :9092
- group-id: my-group
- client-id: DEV
- topic: out
- auto-offset-reset: earliest
- commit-interval: 5s
+consumer:
+ bootstrap-server: :9092
+ group-id: my-group
+ client-id: DEV
+ topic: test
+ auto-offset-reset: earliest
+ commit-interval: 5s
management:
endpoint:
shutdown:
+++ /dev/null
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.Arrays;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.*;
-
-
-public class AdderBusinessLogicTest
-{
- @Test
- @DisplayName("A new sum can be started, if it does not exist")
- public void testStartSumPossibleIfNotExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- assertThatNoException().isThrownBy(() -> adder.startSum("foo"));
- }
-
- @Test
- @DisplayName("Starting an already existing sum again, causes an IllegalStateException")
- public void testStartSumCausesExceptionIfExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- assertThatIllegalStateException().isThrownBy(() -> adder.startSum("foo"));
- }
-
- @Test
- @DisplayName("An empty Optional should be returned, for a non-existing sum")
- public void testGetSumReturnsEmptyOptionalForNonExistingSum()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- assertThat(adder.getSum("foo")).isEmpty();
- }
-
- @Test
- @DisplayName("A non-empty Optional should be returned, for an existing sum")
- public void testGetSumReturnsNonEmptyOptionalForExistingSum()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- assertThat(adder.getSum("foo")).isNotEmpty();
- }
-
- @Test
- @DisplayName("A sum can be ended, if it does exist")
- public void testEndSumPossibleIfSumExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- assertThatNoException().isThrownBy(() -> adder.endSum("foo"));
- }
-
- @Test
- @DisplayName("An existing sum is removed, if ended")
- public void testEndSumRemovesSumIfSumExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- adder.endSum("foo");
- assertThat(adder.getSum("foo")).isEmpty();
- }
-
- @Test
- @DisplayName("An existing Sum returns a non-null value, if ended")
- public void testEndSumReturnsNonNullValueIfSumExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- assertThat(adder.endSum("foo")).isNotNull();
- }
-
- @Test
- @DisplayName("An existing Sum returns a non-negative value, if ended")
- public void testEndSumReturnsNonNegativeValueIfSumExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- assertThat(adder.endSum("foo")).isNotNegative();
- }
-
- @Test
- @DisplayName("Ending a non-existing sum, causes an IllegalStateException")
- public void testEndSumCausesExceptionIfNotExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- assertThatIllegalStateException().isThrownBy(() -> adder.endSum("foo"));
- }
-
- @Test
- @DisplayName("Adding to a non-existent sum causes an IllegalStateException")
- public void testAddToSumCausesExceptionIfNotExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- assertThatIllegalStateException().isThrownBy(() -> adder.addToSum("foo", 1));
- }
-
- @Test
- @DisplayName("Adding a null-value to an existing sum causes an IllegalArgumentException")
- public void testAddSumWithNullValueToExistingSumCausesException()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null));
- }
-
- @ParameterizedTest(name = "{index}: Adding {0}")
- @DisplayName("Adding a non-positive value to an existing sum causes an IllegalArgumentException")
- @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE })
- public void testAddSumWithNonPositiveValueToExistingSumCausesException(int value)
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value));
- }
-
- @Test
- @DisplayName("Can add a positive value to an existing sum")
- public void testAddSumWithPositiveValuePossibleIfSumExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", -1));
- }
-
- @ParameterizedTest(name = "{index}: Summing up {0}")
- @DisplayName("Adds up numbers correctly")
- @MethodSource("numbersProvider")
- public void testAddSumAddsUpNumbersCorrectlyIfSumExists(int... numbers)
- {
- long expectedResult = Arrays.stream(numbers).sum();
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number));
- assertThat(adder.endSum("foo")).isEqualTo(expectedResult);
- }
-
- static Stream<Arguments> numbersProvider() {
- return Stream.of(
- Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()),
- Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()),
- Arguments.of((Object) IntStream.rangeClosed(1,66).toArray()));
- }
-}
package de.juplo.kafka;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.test.context.ContextConfiguration;
import java.util.function.Consumer;
-import java.util.stream.IntStream;
-public class ApplicationTests extends GenericApplicationTests<String, String>
+@ContextConfiguration(classes = ApplicationTests.Configuration.class)
+public class ApplicationTests extends GenericApplicationTests<String, Long>
{
public ApplicationTests()
{
super(
new RecordGenerator()
{
- final int[] numbers = { 1, 7, 3, 2, 33, 6, 11 };
- final String[] dieWilden13 =
- IntStream
- .range(1,14)
- .mapToObj(i -> "seeräuber-" + i)
- .toArray(i -> new String[i]);
final StringSerializer stringSerializer = new StringSerializer();
- final Bytes startMessage = new Bytes(stringSerializer.serialize(TOPIC, "START"));
- final Bytes endMessage = new Bytes(stringSerializer.serialize(TOPIC, "END"));
-
- int counter = 0;
+ final LongSerializer longSerializer = new LongSerializer();
@Override
boolean logicErrors,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
{
- counter = 0;
+ int i = 0;
- for (int i = 0; i < 33; i++)
+ for (int partition = 0; partition < 10; partition++)
{
- String seeräuber = dieWilden13[i%13];
- int number = numbers[i%7];
-
- Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
-
- send(key, startMessage, logicErrors, messageSender);
- for (int message = 1; message <= number; message++)
+ for (int key = 0; key < 10; key++)
{
- Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
- send(key, value, logicErrors, messageSender);
- }
- send(key, endMessage, logicErrors, messageSender);
- }
+ i++;
- return counter;
- }
+ Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
+ if (i == 77)
+ {
+ if (logicErrors)
+ {
+ value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
+ }
+ if (poisonPills)
+ {
+ value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
+ }
+ }
- void send(
- Bytes key,
- Bytes value,
- boolean logicErrors,
- Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
- {
- counter++;
+ ProducerRecord<Bytes, Bytes> record =
+ new ProducerRecord<>(
+ TOPIC,
+ partition,
+ new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
+ value);
- if (counter == 77)
- {
- if (logicErrors)
- {
- value = value.equals(startMessage) ? endMessage : startMessage;
+ messageSender.accept(record);
}
}
- messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
- }
-
- @Override
- public boolean canGeneratePoisonPill()
- {
- return false;
+ return i;
}
});
}
+
+
+ @TestConfiguration
+ public static class Configuration
+ {
+ @Primary
+ @Bean
+ public ApplicationRecordHandler recordHandler()
+ {
+ ApplicationRecordHandler recordHandler = new ApplicationRecordHandler();
+ return new ApplicationRecordHandler()
+ {
+ @Override
+ public void accept(ConsumerRecord<String, Long> record)
+ {
+ if (record.value() == Long.MIN_VALUE)
+ throw new RuntimeException("BOOM (Logic-Error)!");
+ super.accept(record);
+ }
+ };
+ }
+ }
}
@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
@TestPropertySource(
properties = {
- "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
- "sumup.adder.topic=" + TOPIC,
- "sumup.adder.commit-interval=1s",
+ "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "consumer.topic=" + TOPIC,
+ "consumer.commit-interval=1s",
"spring.mongodb.embedded.version=4.4.13" })
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
@EnableAutoConfiguration