#!/bin/bash
-IMAGE=juplo/sumup-requests-json:1.0-SNAPSHOT
+IMAGE=juplo/sumup-requests-fehlerteufel:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
sumup.gateway.topic: in
requests:
- image: juplo/sumup-requests-json:1.0-SNAPSHOT
+ image: juplo/sumup-requests-fehlerteufel:1.0-SNAPSHOT
ports:
- 8081:8080
environment:
server.port: 8080
sumup.requests.bootstrap-server: kafka:9092
sumup.requests.client-id: requests
+ sumup.requests.error-position: 6
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>sumup-requests-json</artifactId>
- <name>SumUp-Requests</name>
+ <artifactId>sumup-requests-fehlerteufel</artifactId>
+ <name>SumUp-Requests (Fehlerteufel!)</name>
<description>A service that reads computation requests from an incomming topic and generates according messages for the SumUp-Consumer on an outgoing topic. This version generates two types of JSON-messages.</description>
<version>1.0-SNAPSHOT</version>
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
{
return new ApplicationRecordHandler(
kafkaProducer,
+ Optional.ofNullable(properties.getErrorPosition()),
properties.getClientId(),
properties.getTopicOut());
}
@Setter
public class ApplicationProperties
{
+ private Integer errorPosition;
@NotNull
@NotEmpty
private String bootstrapServer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import java.util.Optional;
+
@RequiredArgsConstructor
@Slf4j
public class ApplicationRecordHandler implements RecordHandler<String, Integer>
{
private final Producer<String, Object> producer;
+ private final Optional<Integer> errorPosition;
private final String id;
private final String topic;
+ private int counter = 0;
+
@Override
public void accept(ConsumerRecord<String, Integer> record)
for (int i = 1; i <= number; i++)
{
+ if (errorPosition.isPresent() && ++counter == errorPosition.get())
+ {
+ log.info("{} - Erzeuge fachlichen Fehler!");
+ send(key, new AddNumberMessage(number, counter * -1));
+ }
send(key, new AddNumberMessage(number, i));
}
send(key, new CalculateSumMessage(number));