From 1df7c19a97ff53ba7cf19748c2e394f12e4fc137 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 14 Dec 2024 17:30:32 +0100 Subject: [PATCH] TMP --- .../java/de/juplo/kafka/ExampleConsumer.java | 2 +- .../kafka/NonRetriableErrorException.java | 9 ++++ .../java/de/juplo/kafka/RecordHandler.java | 2 +- .../juplo/kafka/RetriableErrorException.java | 9 ++++ .../de/juplo/kafka/ExampleConsumerTest.java | 24 +++++++++++ .../de/juplo/kafka/MockRecordHandler.java | 42 +++++++++++++++++-- 6 files changed, 82 insertions(+), 6 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/NonRetriableErrorException.java create mode 100644 src/main/java/de/juplo/kafka/RetriableErrorException.java diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 65a1b5d1..17a02642 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -114,7 +114,7 @@ public class ExampleConsumer implements Runnable Integer partition, Long offset, String key, - Long value) + Long value) throws RetriableErrorException, NonRetriableErrorException { consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); diff --git a/src/main/java/de/juplo/kafka/NonRetriableErrorException.java b/src/main/java/de/juplo/kafka/NonRetriableErrorException.java new file mode 100644 index 00000000..0eb0ff28 --- /dev/null +++ b/src/main/java/de/juplo/kafka/NonRetriableErrorException.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +public class NonRetriableErrorException extends Exception +{ + public NonRetriableErrorException(String message) + { + super(message); + } +} diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java index a7b65af2..5edcf587 100644 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -7,5 +7,5 @@ public interface RecordHandler Integer partition, Long offset, K key, - V value); + V value) throws RetriableErrorException, NonRetriableErrorException; } diff --git a/src/main/java/de/juplo/kafka/RetriableErrorException.java b/src/main/java/de/juplo/kafka/RetriableErrorException.java new file mode 100644 index 00000000..598ddb09 --- /dev/null +++ b/src/main/java/de/juplo/kafka/RetriableErrorException.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +public class RetriableErrorException extends Exception +{ + public RetriableErrorException(String message) + { + super(message); + } +} diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java index 19a91702..f9bf48ed 100644 --- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -202,6 +202,26 @@ public class ExampleConsumerTest send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION); } + private void sendMessageThatTriggersNonRetriableExceptionInDomain(int partition) throws Exception + { + sendMessageThatTriggersNonRetriableExceptionInDomain(partition, 1); + } + + private void sendMessageThatTriggersNonRetriableExceptionInDomain(int partition, int numFailures) + { + send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION * numFailures)); + } + + private void sendMessageThatTriggersRetriableExceptionInDomain(int partition) + { + sendMessageThatTriggersRetriableExceptionInDomain(partition, 0); + } + + private void sendMessageThatTriggersRetriableExceptionInDomain(int partition, int numFailures) + { + send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION * numFailures)); + } + private void send(int partition, long message) { send(partition, serializer.serialize(TOPIC, message)); @@ -213,7 +233,11 @@ public class ExampleConsumerTest kafkaTemplate.send(TOPIC, partition, "EGAL", bytes); } + public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1; + public final static int VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION = -2; + public final static int VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION = -3; + @TestConfiguration diff --git a/src/test/java/de/juplo/kafka/MockRecordHandler.java b/src/test/java/de/juplo/kafka/MockRecordHandler.java index fe873617..4587648e 100644 --- a/src/test/java/de/juplo/kafka/MockRecordHandler.java +++ b/src/test/java/de/juplo/kafka/MockRecordHandler.java @@ -6,7 +6,7 @@ import lombok.extern.slf4j.Slf4j; import java.time.Duration; -import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION; +import static de.juplo.kafka.ExampleConsumerTest.*; @RequiredArgsConstructor @@ -24,7 +24,7 @@ public class MockRecordHandler implements RecordHandler { Integer partition, Long offset, String key, - Long value) { + Long value) throws RetriableErrorException, NonRetriableErrorException { if (value < 0) { @@ -35,16 +35,50 @@ public class MockRecordHandler implements RecordHandler { log.trace("Handled request for topic {}, num requests handled so far: {}", topic, numRequestsHandled); } - private void generateError(long value) + private void generateError(long value) throws RetriableErrorException, NonRetriableErrorException { if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION) { - throw new RuntimeException("Unexpected application error!"); + + } + + if ((float)value % (float)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION == 0f) + { + sleep(exceptionDelay); + if (value % VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION == 0) + { + throw new NonRetriableErrorException("Non-Retriable application error!"); + } + return; } + if ((float)value % (float)VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION == 0f) + { + sleep(exceptionDelay); + if (value % VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION == 0) + { + throw new NonRetriableErrorException("Non-Retriable application error!"); + } + return; + } + + log.info("Not specifically mapped error: {}", value); } + private void sleep(Duration duration) + { + try + { + Thread.sleep(exceptionDelay); + } + catch(InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + public void clear() { numRequestsHandled = 0; -- 2.20.1