From 27e0b6fd938cefd97f99251d8ae714e657e85d97 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Sep 2022 13:21:58 +0200 Subject: [PATCH] WIP:kafkahandler --- .../juplo/kafka/ApplicationRecordHandler.java | 29 ++++--------- .../java/de/juplo/kafka/RecordHandler.java | 14 ++++++- .../juplo/kafka/GenericApplicationTests.java | 8 ++-- .../de/juplo/kafka/TestRecordHandler.java | 41 ++++++++++++++----- 4 files changed, 56 insertions(+), 36 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 61b265b..1804951 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -34,46 +34,35 @@ public class ApplicationRecordHandler implements RecordHandler @KafkaHandler + @Override public void addNumber( @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) Long offset, @Payload MessageAddNumber message) { state.get(partition).addToSum(user, message.getNext()); + throttle(); } @KafkaHandler + @Override public void calcSum( @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) Long offset, @Payload MessageCalculateSum message) { AdderResult result = state.get(partition).calculate(user); log.info("{} - New result for {}: {}", id, user, result); results.addResults(partition, user, result); + throttle(); } - @Override - public void accept(ConsumerRecord record) + private void throttle() { - Integer partition = record.partition(); - String user = record.key(); - Message message = record.value(); - - switch(message.getType()) - { - case ADD: - MessageAddNumber addNumber = (MessageAddNumber)message; - state.get(partition).addToSum(user, addNumber.getNext()); - break; - - case CALC: - AdderResult result = state.get(partition).calculate(user); - log.info("{} - New result for {}: {}", id, user, result); - results.addResults(partition, user, result); - break; - } - if (throttle.isPresent()) { try diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java index 327ac9f..d71bfed 100644 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -5,6 +5,18 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.function.Consumer; -public interface RecordHandler extends Consumer> +public interface RecordHandler { + void addNumber( + String user, + String topic, + Integer partition, + Long offset, + MessageAddNumber message); + void calcSum( + String user, + String topic, + Integer partition, + Long offset, + MessageCalculateSum message); } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 1e28b06..8e81e26 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -68,7 +68,7 @@ abstract class GenericApplicationTests @Autowired KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @Autowired - TestRecordHandler recordHandler; + TestRecordHandler recordHandler; @Autowired DriverController driverController; @@ -98,7 +98,7 @@ abstract class GenericApplicationTests await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages); + .until(() -> recordHandler.receivedMessages.size() >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -140,7 +140,7 @@ abstract class GenericApplicationTests checkSeenOffsetsForProgress(); assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThat(recordHandler.receivedRecords.size()) + assertThat(recordHandler.receivedMessages.size()) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -347,7 +347,7 @@ abstract class GenericApplicationTests oldOffsets = new HashMap<>(); recordHandler.seenOffsets = new HashMap<>(); - recordHandler.receivedRecords = new HashSet<>(); + recordHandler.receivedMessages = new HashSet<>(); doForCurrentOffsets((tp, offset) -> { diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java index 37d3f65..cc5908a 100644 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -9,26 +9,45 @@ import java.util.Set; @RequiredArgsConstructor -public class TestRecordHandler implements RecordHandler +public class TestRecordHandler implements RecordHandler { - private final RecordHandler handler; + private final RecordHandler handler; Map seenOffsets; - Set> receivedRecords; + Set receivedMessages; - public void onNewRecord(ConsumerRecord record) + public void onNewRecord( + String topic, + Integer partition, + Long offset, + Message messgage) { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); + seenOffsets.put(new TopicPartition(topic, partition), offset); + receivedMessages.add(messgage); } @Override - public void accept(ConsumerRecord record) + public void addNumber( + String user, + String topic, + Integer partition, + Long offset, + MessageAddNumber message) { - this.onNewRecord(record); - handler.accept(record); + this.onNewRecord(topic, partition, offset, message); + handler.addNumber(user, topic, partition, offset, message); + } + + @Override + public void calcSum( + String user, + String topic, + Integer partition, + Long offset, + MessageCalculateSum message) + { + this.onNewRecord(topic, partition, offset, message); + handler.calcSum(user, topic, partition, offset, message); } } -- 2.20.1