import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import java.util.Optional;
+
@RequiredArgsConstructor
@Slf4j
public class ApplicationRecordHandler implements RecordHandler<String, Integer>
{
private final Producer<String, Object> producer;
+ private final Optional<Integer> errorPosition;
private final String id;
private final String topic;
+ private int counter = 0;
+
@Override
public void accept(ConsumerRecord<String, Integer> record)
for (int i = 1; i <= number; i++)
{
+ if (errorPosition.isPresent() && ++counter == errorPosition.get())
+ {
+ log.info("{} - Erzeuge fachlichen Fehler!");
+ send(key, new AddNumberMessage(number, counter * -1));
+ }
send(key, new AddNumberMessage(number, i));
}
send(key, new CalculateSumMessage(number));