* Die `id` kann nur weggelassen werden, wenn `assign()` verwendet wird.
#!/bin/bash
-IMAGE=juplo/sumup-adder-springified:1.0-SNAPSHOT
+IMAGE=juplo/supersimple:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
exit
fi
-docker-compose rm -svf adder-1 adder-2
-docker-compose rm -svf mongo
-docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli mongo express
+docker-compose rm -svf supersimple
+docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli
if [[
$(docker image ls -q $IMAGE) == "" ||
"$1" = "build"
]]
then
- docker-compose rm -svf adder-1 adder-2
- mvn -D skipTests clean install || exit
+ docker-compose rm -svf supersimple
+ mvn clean install || exit
else
echo "Using image existing images:"
docker image ls $IMAGE
echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
docker-compose up setup
-docker-compose up -d gateway requests-1 requests-2
+docker-compose up -d gateway requests-1 requests-2 supersimple
while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done
while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-1..."; sleep 1; done
while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-2..."; sleep 1; done
-docker-compose up -d peter klaus
+echo 6 | http -v :8080/peter
-docker-compose up -d adder-1
-while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
-while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done
-http -v --pretty none -S :8091/results
-echo
+sleep 10
-sleep 3
-echo "Resultate für adder-1"
-http -v --pretty none -S :8091/results
-echo
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
-
-
-docker-compose up -d adder-2
-while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done
-while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done
-http -v --pretty none -S :8092/results
-echo
-
-sleep 3
-echo "Resultate für adder-2"
-http -v --pretty none -S :8092/results
-echo
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
-
-echo "Resultate für peter von adder-2"
-http :8092/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-2"
-http :8092/results/klaus | jq .[].sum | uniq
-
-docker-compose stop adder-1
-until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done
-until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done
-
-echo "Resultate für adder-2"
-http -v --pretty none -S :8092/results
-echo
-
-echo "Resultate für peter von adder-2"
-http :8092/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-2"
-http :8092/results/klaus | jq .[].sum | uniq
-
-docker-compose kill -s 9 adder-2
-docker-compose start adder-1
-docker-compose kill -s 9 peter klaus
-while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
-until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done
-until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done
-
-echo "Resultate für adder-1"
-http -v --pretty none -S :8091/results
-echo
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
-
-sleep 5
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
+docker-compose logs supersimple
depends_on:
- zookeeper
- mongo:
- image: mongo:4.4.13
- ports:
- - 27017:27017
- environment:
- MONGO_INITDB_ROOT_USERNAME: juplo
- MONGO_INITDB_ROOT_PASSWORD: training
-
- express:
- image: mongo-express
- ports:
- - 8090:8081
- environment:
- ME_CONFIG_MONGODB_ADMINUSERNAME: juplo
- ME_CONFIG_MONGODB_ADMINPASSWORD: training
- ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/
- depends_on:
- - mongo
setup:
image: juplo/toolbox
command: >
bash -c "
- kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in
- kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out
- kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out.DLT
- kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2
- kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 2 --replication-factor 3 --config min.insync.replicas=2
- kafka-topics --bootstrap-server kafka:9092 --create --topic out.DLT --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+ kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+ kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic out --partitions 2 --replication-factor 3 --config min.insync.replicas=2
kafka-topics --bootstrap-server kafka:9092 --describe --topic in
kafka-topics --bootstrap-server kafka:9092 --describe --topic out
- kafka-topics --bootstrap-server kafka:9092 --describe --topic out.DLT
"
cli:
sumup.requests.bootstrap-server: kafka:9092
sumup.requests.client-id: requests-2
- adder-1:
- image: juplo/sumup-adder-springified:1.0-SNAPSHOT
- ports:
- - 8091:8080
- environment:
- server.port: 8080
- spring.kafka.bootstrap-servers: kafka:9092
- spring.kafka.producer.bootstrap-servers: kafka:9092
- spring.kafak.client-id: adder-1
- spring.kafka.auto-commit-interval: 1s
- sumup.adder.throttle: 3ms
- spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
- spring.data.mongodb.database: juplo
- logging.level.org.apache.kafka.clients.consumer: INFO
-
- adder-2:
- image: juplo/sumup-adder-springified:1.0-SNAPSHOT
+ supersimple:
+ image: juplo/supersimple:1.0-SNAPSHOT
ports:
- - 8092:8080
+ - 8090:8080
environment:
server.port: 8080
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.producer.bootstrap-servers: kafka:9092
- spring.kafak.client-id: adder-2
- spring.kafka.auto-commit-interval: 1s
- sumup.adder.throttle: 3ms
- spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
- spring.data.mongodb.database: juplo
+ spring.kafak.client-id: supersimple
logging.level.org.apache.kafka.clients.consumer: INFO
-
- peter:
- image: juplo/toolbox
- command: >
- bash -c "
- while [[ true ]];
- do
- echo 666 | http -v gateway:8080/peter;
- sleep 1;
- done
- "
- klaus:
- image: juplo/toolbox
- command: >
- bash -c "
- while [[ true ]];
- do
- echo 666 | http -v gateway:8080/klaus;
- sleep 1;
- done
- "
-
- dlt:
- image: juplo/toolbox
- tty: true
- command: kafkacat -C -b kafka:9092 -t out.DLT -f'p=%p|o=%o|%k=%s\n' -o 0 -q
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>sumup-adder-springified</artifactId>
+ <artifactId>supersimple</artifactId>
<version>1.0-SNAPSHOT</version>
- <name>SumUp Adder</name>
- <description>Calculates the sum for the send messages. This version consumes JSON-messages.</description>
+ <name>Supersimple Consumer-Group</name>
+ <description>Most minimal Consumer-Group ever!</description>
<properties>
<java.version>11</java.version>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-mongodb</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-validation</artifactId>
- </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>de.flapdoodle.embed</groupId>
- <artifactId>de.flapdoodle.embed.mongo</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
+++ /dev/null
-package de.juplo.kafka;
-
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-public class AdderBusinessLogic
-{
- private final Map<String, AdderResult> state;
-
-
- public AdderBusinessLogic()
- {
- this(new HashMap<>());
- }
-
- public AdderBusinessLogic(Map<String, AdderResult> state)
- {
- this.state = state;
- }
-
-
- public synchronized Optional<Long> getSum(String user)
- {
- return Optional.ofNullable(state.get(user)).map(result -> result.sum);
- }
-
- public synchronized void addToSum(String user, Integer value)
- {
- if (value == null || value < 1)
- throw new IllegalArgumentException("Not a positive number: " + value);
-
- long sum =
- Optional
- .ofNullable(state.get(user))
- .map(result -> result.sum)
- .orElse(0l);
- state.put(user, new AdderResult(value, sum + value));
- }
-
- public synchronized AdderResult calculate(String user)
- {
- if (!state.containsKey(user))
- throw new IllegalStateException("No sumation for " + user + " in progress");
-
- return state.remove(user);
- }
-
- protected Map<String, AdderResult> getState()
- {
- return state;
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-
-@RequiredArgsConstructor
-@Getter
-@EqualsAndHashCode
-public class AdderResult
-{
- final int number;
- final long sum;
-
- @Override
- public String toString()
- {
- return "sum(" + number + ") = " + sum;
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-
-public class AdderResults
-{
- private final Map<Integer, Map<String, List<AdderResult>>> results = new HashMap<>();
-
-
- public void addResults(Integer partition, String user, AdderResult result)
- {
- Map<String, List<AdderResult>> resultsByUser = this.results.get(partition);
-
- List<AdderResult> results = resultsByUser.get(user);
- if (results == null)
- {
- results = new LinkedList<>();
- resultsByUser.put(user, results);
- }
-
- results.add(result);
- }
-
- protected void addPartition(Integer partition, Map<String, List<AdderResult>> results)
- {
- this.results.put(partition, results);
- }
-
- protected Map<String, List<AdderResult>> removePartition(Integer partition)
- {
- return this.results.remove(partition);
- }
-
- public Map<Integer, Map<String, List<AdderResult>>> getState()
- {
- return results;
- }
-
- public Map<String, List<AdderResult>> getState(Integer partition)
- {
- return results.get(partition);
- }
-}
package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaOperations;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
-import org.springframework.kafka.listener.DefaultErrorHandler;
-import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.util.backoff.FixedBackOff;
-
-import java.util.Map;
-import java.util.Optional;
-
+import org.springframework.kafka.annotation.KafkaListener;
@SpringBootApplication
-@Slf4j
-@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
@EnableKafka
+@Slf4j
public class Application
{
- @Bean
- public ApplicationRecordHandler applicationRecordHandler(
- AdderResults adderResults,
- KafkaProperties kafkaProperties,
- ApplicationProperties applicationProperties)
- {
- return new ApplicationRecordHandler(
- adderResults,
- Optional.ofNullable(applicationProperties.getThrottle()),
- kafkaProperties.getConsumer().getGroupId());
- }
-
- @Bean
- public AdderResults adderResults()
- {
- return new AdderResults();
- }
-
- @Bean
- public ApplicationRebalanceListener rebalanceListener(
- ApplicationRecordHandler recordHandler,
- AdderResults adderResults,
- StateRepository stateRepository,
- KafkaProperties kafkaProperties)
- {
- return new ApplicationRebalanceListener(
- recordHandler,
- adderResults,
- stateRepository,
- kafkaProperties.getConsumer().getGroupId());
- }
-
- @Bean
- ApplicationHealthIndicator applicationHealthIndicator(
- KafkaListenerEndpointRegistry registry,
- KafkaProperties properties)
+ @KafkaListener(id = "supersimple", topics = "out")
+ public void recieve(String message)
{
- return new ApplicationHealthIndicator(
- properties.getConsumer().getGroupId(),
- registry);
+ log.info("Recieved message: {}", message);
}
- @Bean
- public ProducerFactory<String, Object> producerFactory(
- KafkaProperties properties)
- {
- return new DefaultKafkaProducerFactory<>(
- properties.getProducer().buildProperties(),
- new StringSerializer(),
- new DelegatingByTypeSerializer(
- Map.of(
- byte[].class, new ByteArraySerializer(),
- MessageAddNumber.class, new JsonSerializer<>(),
- MessageCalculateSum.class, new JsonSerializer<>())));
- }
-
- @Bean
- public KafkaTemplate<String, Object> kafkaTemplate(
- ProducerFactory<String, Object> producerFactory)
- {
- return new KafkaTemplate<>(producerFactory);
- }
-
- @Bean
- public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
- KafkaOperations<?, ?> kafkaTemplate)
- {
- return new DeadLetterPublishingRecoverer(kafkaTemplate);
- }
-
- @Bean
- public DefaultErrorHandler errorHandler(
- DeadLetterPublishingRecoverer recoverer)
- {
- return new DefaultErrorHandler(
- recoverer,
- new FixedBackOff(0l, 0l));
- }
-
-
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import org.springframework.data.mongodb.core.aggregation.ArithmeticOperators;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.*;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-
-@RestController
-@RequiredArgsConstructor
-public class ApplicationController
-{
- private final AdderResults results;
-
-
- @GetMapping("results")
- public Map<Integer, Map<String, List<AdderResult>>> results()
- {
- return results.getState();
- }
-
- @GetMapping("results/{user}")
- public ResponseEntity<List<AdderResult>> results(@PathVariable String user)
- {
- for (Map<String, List<AdderResult>> resultsByUser : this.results.getState().values())
- {
- List<AdderResult> results = resultsByUser.get(user);
- if (results != null)
- return ResponseEntity.ok(results);
- }
-
- return ResponseEntity.notFound().build();
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import org.springframework.boot.actuate.health.Health;
-import org.springframework.boot.actuate.health.HealthIndicator;
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-
-
-@RequiredArgsConstructor
-public class ApplicationHealthIndicator implements HealthIndicator
-{
- private final String id;
- private final KafkaListenerEndpointRegistry registry;
-
-
- @Override
- public Health health()
- {
- return registry.getListenerContainer(id).isRunning()
- ? Health.up().build()
- : Health.down().build();
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.validation.annotation.Validated;
-
-import javax.validation.constraints.NotEmpty;
-import javax.validation.constraints.NotNull;
-import java.time.Duration;
-
-
-@ConfigurationProperties(prefix = "sumup.adder")
-@Validated
-@Getter
-@Setter
-public class ApplicationProperties
-{
- @NotNull
- @NotEmpty
- private String topic;
- private Duration throttle;
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.common.TopicPartition;
-import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
-
-import java.util.*;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListener
-{
- private final ApplicationRecordHandler recordHandler;
- private final AdderResults adderResults;
- private final StateRepository stateRepository;
- private final String id;
-
- private final Set<Integer> partitions = new HashSet<>();
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- log.info("{} - adding partition: {}", id, partition);
- this.partitions.add(partition);
- StateDocument document =
- stateRepository
- .findById(Integer.toString(partition))
- .orElse(new StateDocument(partition));
- recordHandler.addPartition(partition, document.state);
- for (String user : document.state.keySet())
- {
- log.info(
- "{} - Restored state for partition={}|user={}: {}",
- id,
- partition,
- user,
- document.state.get(user));
- }
- adderResults.addPartition(partition, document.results);
- });
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- log.info("{} - removing partition: {}", id, partition);
- this.partitions.remove(partition);
- Map<String, AdderResult> state = recordHandler.removePartition(partition);
- for (String user : state.keySet())
- {
- log.info(
- "{} - Saved state for partition={}|user={}: {}",
- id,
- partition,
- user,
- state.get(user));
- }
- Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
- stateRepository.save(new StateDocument(partition, state, results));
- });
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.kafka.annotation.KafkaHandler;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-@RequiredArgsConstructor
-@Slf4j
-@KafkaListener(
- id = "${spring.kafka.consumer.group-id}",
- topics = "${sumup.adder.topic}")
-public class ApplicationRecordHandler
-{
- private final AdderResults results;
- private final Optional<Duration> throttle;
- private final String id;
-
- private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
-
-
- @KafkaHandler
- public void addNumber(
- @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
- Integer partition,
- @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
- String user,
- @Payload
- MessageAddNumber message)
- {
- log.debug("{} - Received {} for {} on {}", id, message, user, partition);
- state.get(partition).addToSum(user, message.getNext());
- throttle();
- }
-
- @KafkaHandler
- public void calculateSum(
- @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
- Integer partition,
- @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
- String user,
- @Payload
- MessageCalculateSum message)
- {
- AdderResult result = state.get(partition).calculate(user);
- log.info("{} - New result for {}: {}", id, user, result);
- results.addResults(partition, user, result);
- throttle();
- }
-
- private void throttle()
- {
- if (throttle.isPresent())
- {
- try
- {
- Thread.sleep(throttle.get().toMillis());
- }
- catch (InterruptedException e)
- {
- log.warn("{} - Intrerrupted while throttling: {}", id, e);
- }
- }
- }
-
- protected void addPartition(Integer partition, Map<String, AdderResult> state)
- {
- this.state.put(partition, new AdderBusinessLogic(state));
- }
-
- protected Map<String, AdderResult> removePartition(Integer partition)
- {
- return this.state.remove(partition).getState();
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-
-public abstract class Message
-{
- public enum Type {ADD, CALC}
-
- public abstract Type getType();
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class MessageAddNumber extends Message
-{
- private Integer next;
-
-
- @Override
- public Type getType()
- {
- return Type.ADD;
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class MessageCalculateSum extends Message
-{
- @Override
- public Type getType()
- {
- return Type.CALC;
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.ToString;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.mongodb.core.mapping.Document;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-@Document(collection = "state")
-@ToString
-public class StateDocument
-{
- @Id
- public String id;
- public Map<String, AdderResult> state;
- public Map<String, List<AdderResult>> results;
-
- public StateDocument()
- {
- }
-
- public StateDocument(Integer partition)
- {
- this.id = Integer.toString(partition);
- this.state = new HashMap<>();
- this.results = new HashMap<>();
- }
-
- public StateDocument(
- Integer partition,
- Map<String, AdderResult> state,
- Map<String, List<AdderResult>> results)
- {
- this.id = Integer.toString(partition);
- this.state = state;
- this.results = results;
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-import java.util.Optional;
-
-
-public interface StateRepository extends MongoRepository<StateDocument, String>
-{
- public Optional<StateDocument> findById(String partition);
-}
-sumup:
- adder:
- topic: out
management:
endpoint:
shutdown:
java:
enabled: true
spring:
- data:
- mongodb:
- uri: mongodb://juplo:training@localhost:27017
- database: juplo
kafka:
bootstrap-servers: :9092
- consumer:
- group-id: my-group
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
- properties:
- partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
- metadata.max.age.ms: 1000
- spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
- spring.json.type.mapping: >
- ADD:de.juplo.kafka.MessageAddNumber,
- CALC:de.juplo.kafka.MessageCalculateSum
- producer:
- bootstrap-servers: :9092
logging:
level:
root: INFO
+++ /dev/null
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.Arrays;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.*;
-
-
-public class AdderBusinessLogicTest
-{
- @Test
- @DisplayName("An empty Optional should be returned, for a non-existing sum")
- public void testGetSumReturnsEmptyOptionalForNonExistingSum()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- assertThat(adder.getSum("foo")).isEmpty();
- }
-
- @Test
- @DisplayName("A non-empty Optional should be returned, for an existing sum")
- public void testGetSumReturnsNonEmptyOptionalForExistingSum()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.addToSum("foo", 6);
- assertThat(adder.getSum("foo")).isNotEmpty();
- }
-
- @Test
- @DisplayName("A sum can be calculated, if it does exist")
- public void testCalculatePossibleIfSumExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.addToSum("foo", 6);
- assertThatNoException().isThrownBy(() -> adder.calculate("foo"));
- }
-
- @Test
- @DisplayName("An existing sum is removed, if ended")
- public void testCalculateRemovesSumIfSumExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.addToSum("foo", 6);
- adder.calculate("foo");
- assertThat(adder.getSum("foo")).isEmpty();
- }
-
- @Test
- @DisplayName("An existing sum returns a non-null value, if calculated")
- public void testCalculateReturnsNonNullValueIfSumExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.addToSum("foo", 6);
- assertThat(adder.calculate("foo")).isNotNull();
- }
-
- @Test
- @DisplayName("Ending a non-existing sum, causes an IllegalStateException")
- public void testCalculateCausesExceptionIfNotExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo"));
- }
-
- @Test
- @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException")
- public void testAddToSumWithNullValueCausesException()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null));
- }
-
- @ParameterizedTest(name = "{index}: Adding {0}")
- @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException")
- @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE })
- public void testAddToSumWithNonPositiveValueCausesException(int value)
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value));
- }
-
- @ParameterizedTest(name = "{index}: Adding {0}")
- @DisplayName("Can add a positive value to a sum")
- @ValueSource(ints = { 1, 3, 6, 66, 7, 9 })
- public void testAddToSumWithPositiveValuePossible(int value)
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value));
- }
-
- @ParameterizedTest(name = "{index}: Summing up {0}")
- @DisplayName("Adds up numbers correctly")
- @MethodSource("numbersProvider")
- public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers)
- {
- long expectedResult = Arrays.stream(numbers).sum();
- AdderBusinessLogic adder = new AdderBusinessLogic();
- Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number));
- AdderResult result = adder.calculate("foo");
- assertThat(result.number).isEqualTo(numbers[numbers.length-1]);
- assertThat(result.sum).isEqualTo(expectedResult);
- }
-
- static Stream<Arguments> numbersProvider() {
- return Stream.of(
- Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()),
- Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()),
- Arguments.of((Object) IntStream.rangeClosed(1,66).toArray()));
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.web.client.TestRestTemplate;
-import org.springframework.boot.test.web.server.LocalServerPort;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-
-import static de.juplo.kafka.ApplicationIT.TOPIC;
-
-
-@SpringBootTest(
- webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
- properties = {
- "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "sumup.adder.topic=" + TOPIC,
- "spring.mongodb.embedded.version=4.4.13" })
-@EmbeddedKafka(topics = TOPIC)
-@AutoConfigureDataMongo
-public class ApplicationIT
-{
- public static final String TOPIC = "FOO";
-
- @LocalServerPort
- private int port;
-
- @Autowired
- private TestRestTemplate restTemplate;
-
-
-
- @Test
- public void testApplicationStartup()
- {
- restTemplate.getForObject(
- "http://localhost:" + port + "/actuator/health",
- String.class
- )
- .contains("UP");
- }
-}
--- /dev/null
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.boot.test.web.server.LocalServerPort;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
+@EmbeddedKafka(topics = "out")
+public class ApplicationTests
+{
+ @LocalServerPort
+ private int port;
+
+ @Autowired
+ private TestRestTemplate restTemplate;
+
+
+
+ @Test
+ public void testApplicationStartup()
+ {
+ restTemplate.getForObject(
+ "http://localhost:" + port + "/actuator/health",
+ String.class
+ )
+ .contains("UP");
+ }
+}
+++ /dev/null
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.Arrays;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.*;
-
-
-public class MessageTest
-{
- ObjectMapper mapper = new ObjectMapper();
-
- @Test
- @DisplayName("Deserialize a MessageAddNumber message")
- public void testDeserializeMessageAddNumber()
- {
- Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class));
- Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class));
- }
-
- @Test
- @DisplayName("Deserialize a MessageCalculateSum message")
- public void testDeserializeMessageCalculateSum() throws JsonProcessingException
- {
- Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class));
- Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class));
- }
-}