Integer partition,
Long offset,
String key,
- Long value)
+ Long value) throws RetriableErrorException, NonRetriableErrorException
{
consumed++;
log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
--- /dev/null
+package de.juplo.kafka;
+
+public class NonRetriableErrorException extends Exception
+{
+ public NonRetriableErrorException(String message)
+ {
+ super(message);
+ }
+}
Integer partition,
Long offset,
K key,
- V value);
+ V value) throws RetriableErrorException, NonRetriableErrorException;
}
--- /dev/null
+package de.juplo.kafka;
+
+public class RetriableErrorException extends Exception
+{
+ public RetriableErrorException(String message)
+ {
+ super(message);
+ }
+}
send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION);
}
+ private void sendMessageThatTriggersNonRetriableExceptionInDomain(int partition) throws Exception
+ {
+ sendMessageThatTriggersNonRetriableExceptionInDomain(partition, 1);
+ }
+
+ private void sendMessageThatTriggersNonRetriableExceptionInDomain(int partition, int numFailures)
+ {
+ send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION * numFailures));
+ }
+
+ private void sendMessageThatTriggersRetriableExceptionInDomain(int partition)
+ {
+ sendMessageThatTriggersRetriableExceptionInDomain(partition, 0);
+ }
+
+ private void sendMessageThatTriggersRetriableExceptionInDomain(int partition, int numFailures)
+ {
+ send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION * numFailures));
+ }
+
private void send(int partition, long message)
{
send(partition, serializer.serialize(TOPIC, message));
kafkaTemplate.send(TOPIC, partition, "EGAL", bytes);
}
+
public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;
+ public final static int VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION = -2;
+ public final static int VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION = -3;
+
@TestConfiguration
import java.time.Duration;
-import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION;
+import static de.juplo.kafka.ExampleConsumerTest.*;
@RequiredArgsConstructor
Integer partition,
Long offset,
String key,
- Long value) {
+ Long value) throws RetriableErrorException, NonRetriableErrorException {
if (value < 0)
{
log.trace("Handled request for topic {}, num requests handled so far: {}", topic, numRequestsHandled);
}
- private void generateError(long value)
+ private void generateError(long value) throws RetriableErrorException, NonRetriableErrorException
{
if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION)
{
- throw new RuntimeException("Unexpected application error!");
+
+ }
+
+ if ((float)value % (float)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION == 0f)
+ {
+ sleep(exceptionDelay);
+ if (value % VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION == 0)
+ {
+ throw new NonRetriableErrorException("Non-Retriable application error!");
+ }
+ return;
}
+ if ((float)value % (float)VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION == 0f)
+ {
+ sleep(exceptionDelay);
+ if (value % VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION == 0)
+ {
+ throw new NonRetriableErrorException("Non-Retriable application error!");
+ }
+ return;
+ }
+
+
log.info("Not specifically mapped error: {}", value);
}
+ private void sleep(Duration duration)
+ {
+ try
+ {
+ Thread.sleep(exceptionDelay);
+ }
+ catch(InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
public void clear()
{
numRequestsHandled = 0;