@KafkaHandler
+ @Override
public void addNumber(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
+ @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+ @Header(KafkaHeaders.OFFSET) Long offset,
@Payload MessageAddNumber message)
{
state.get(partition).addToSum(user, message.getNext());
+ throttle();
}
@KafkaHandler
+ @Override
public void calcSum(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
+ @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+ @Header(KafkaHeaders.OFFSET) Long offset,
@Payload MessageCalculateSum message)
{
AdderResult result = state.get(partition).calculate(user);
log.info("{} - New result for {}: {}", id, user, result);
results.addResults(partition, user, result);
+ throttle();
}
- @Override
- public void accept(ConsumerRecord<String, Message> record)
+ private void throttle()
{
- Integer partition = record.partition();
- String user = record.key();
- Message message = record.value();
-
- switch(message.getType())
- {
- case ADD:
- MessageAddNumber addNumber = (MessageAddNumber)message;
- state.get(partition).addToSum(user, addNumber.getNext());
- break;
-
- case CALC:
- AdderResult result = state.get(partition).calculate(user);
- log.info("{} - New result for {}: {}", id, user, result);
- results.addResults(partition, user, result);
- break;
- }
-
if (throttle.isPresent())
{
try
import java.util.function.Consumer;
-public interface RecordHandler<K, V> extends Consumer<ConsumerRecord<K,V>>
+public interface RecordHandler<K, V>
{
+ void addNumber(
+ String user,
+ String topic,
+ Integer partition,
+ Long offset,
+ MessageAddNumber message);
+ void calcSum(
+ String user,
+ String topic,
+ Integer partition,
+ Long offset,
+ MessageCalculateSum message);
}
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
- TestRecordHandler<K, V> recordHandler;
+ TestRecordHandler recordHandler;
@Autowired
DriverController driverController;
await(numberOfGeneratedMessages + " records received")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
- .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages);
+ .until(() -> recordHandler.receivedMessages.size() >= numberOfGeneratedMessages);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
checkSeenOffsetsForProgress();
assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
- assertThat(recordHandler.receivedRecords.size())
+ assertThat(recordHandler.receivedMessages.size())
.describedAs("Received not all sent events")
.isLessThan(numberOfGeneratedMessages);
oldOffsets = new HashMap<>();
recordHandler.seenOffsets = new HashMap<>();
- recordHandler.receivedRecords = new HashSet<>();
+ recordHandler.receivedMessages = new HashSet<>();
doForCurrentOffsets((tp, offset) ->
{
@RequiredArgsConstructor
-public class TestRecordHandler<K, V> implements RecordHandler<K, V>
+public class TestRecordHandler implements RecordHandler
{
- private final RecordHandler<K, V> handler;
+ private final RecordHandler handler;
Map<TopicPartition, Long> seenOffsets;
- Set<ConsumerRecord<K, V>> receivedRecords;
+ Set<Message> receivedMessages;
- public void onNewRecord(ConsumerRecord<K, V> record)
+ public void onNewRecord(
+ String topic,
+ Integer partition,
+ Long offset,
+ Message messgage)
{
- seenOffsets.put(
- new TopicPartition(record.topic(), record.partition()),
- record.offset());
- receivedRecords.add(record);
+ seenOffsets.put(new TopicPartition(topic, partition), offset);
+ receivedMessages.add(messgage);
}
@Override
- public void accept(ConsumerRecord<K, V> record)
+ public void addNumber(
+ String user,
+ String topic,
+ Integer partition,
+ Long offset,
+ MessageAddNumber message)
{
- this.onNewRecord(record);
- handler.accept(record);
+ this.onNewRecord(topic, partition, offset, message);
+ handler.addNumber(user, topic, partition, offset, message);
+ }
+
+ @Override
+ public void calcSum(
+ String user,
+ String topic,
+ Integer partition,
+ Long offset,
+ MessageCalculateSum message)
+ {
+ this.onNewRecord(topic, partition, offset, message);
+ handler.calcSum(user, topic, partition, offset, message);
}
}