From 6f3bc4b9531d967df59cc6a6f2d76d7ba9d2ec7a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 3 Jun 2023 12:08:56 +0200 Subject: [PATCH] WIP --- docker-compose.yml | 4 ++ .../de/juplo/kafka/ApplicationProperties.java | 1 - .../java/de/juplo/kafka/EndlessConsumer.java | 39 ++++++++++++++++--- src/main/resources/application.yml | 1 + 4 files changed, 38 insertions(+), 7 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index c4492a4..98cb40a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -75,6 +75,10 @@ services: cli: image: juplo/toolbox command: sleep infinity + depends_on: + - kafka-1 + - kafka-2 + - kafka-3 gateway: image: juplo/sumup-gateway:1.0-SNAPSHOT diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index c7b0b8e..481fce0 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -29,7 +29,6 @@ public class ApplicationProperties @NotEmpty private String topicIn; @NotNull - @NotEmpty private Integer partitions; @NotNull @NotEmpty diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 3e98842..771b3a2 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -45,13 +45,17 @@ public class EndlessConsumer implements Runnable @Override public void run() { + OffsetRecorder offsetRecorder = new OffsetRecorder(topic, partitions); + try { - OffsetRecorder offsetRecorder = new OffsetRecorder(topic, partitions); + producer.initTransactions(); log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic), offsetRecorder); + producer.beginTransaction(); + while (true) { ConsumerRecords records = @@ -80,6 +84,8 @@ public class EndlessConsumer implements Runnable producer.sendOffsetsToTransaction( offsetRecorder.getOffsets(), consumer.groupMetadata()); + producer.commitTransaction(); + producer.beginTransaction(); } consumed++; } @@ -88,7 +94,10 @@ public class EndlessConsumer implements Runnable catch(WakeupException e) { log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); - consumer.commitSync(); + producer.sendOffsetsToTransaction( + offsetRecorder.getOffsets(), + consumer.groupMetadata()); + producer.commitTransaction(); shutdown(); } catch(RecordDeserializationException e) @@ -102,12 +111,16 @@ public class EndlessConsumer implements Runnable offset, e.getCause().toString()); - consumer.commitSync(); + producer.sendOffsetsToTransaction( + offsetRecorder.getOffsets(), + consumer.groupMetadata()); + producer.commitTransaction(); shutdown(e); } catch(Exception e) { log.error("{} - Unexpected error: {}", id, e.toString(), e); + producer.abortTransaction(); shutdown(e); } finally @@ -141,7 +154,7 @@ public class EndlessConsumer implements Runnable if (!activePartitions[partition]) throw new IllegalStateException("Partition " + partition + " is not active!"); - offsets[partition] = offset; + offsets[partition] = offset + 1; } Map getOffsets() @@ -151,9 +164,12 @@ public class EndlessConsumer implements Runnable for (int i=0; i -1) + { + log.info("Offset for partition {} is {}", i, offsets[i]); recordedOffsets.put( new TopicPartition(topic, i), new OffsetAndMetadata(offsets[i])); + } } return recordedOffsets; @@ -182,9 +198,20 @@ public class EndlessConsumer implements Runnable () -> new HashMap(), (map, topicPartition) -> { - log.info("Commiting & deactivating revoked partition {}", topicPartition); + log.info("Deactivating revoked partition {}", topicPartition); activePartitions[topicPartition.partition()] = false; - map.put(topicPartition, new OffsetAndMetadata(offsets[topicPartition.partition()])); + if (offsets[topicPartition.partition()] < 0) + { + log.info("No offset to commit for partition {}", topicPartition); + } + else + { + log.info( + "Commiting offset {} for partition {}", + offsets[topicPartition.partition()], + topicPartition); + map.put(topicPartition, new OffsetAndMetadata(offsets[topicPartition.partition()])); + } }, (mapA, mapB) -> mapA.putAll(mapB)), consumer.groupMetadata()); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index bf339b0..1c5a517 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,6 +4,7 @@ sumup: group-id: sumup-requests client-id: DEV topic-in: in + partitions: 2 topic-out: out auto-offset-reset: earliest commit-interval: 5s -- 2.20.1