}
@Bean
- public EndlessConsumer<String, Message> endlessConsumer(
+ public EndlessConsumer endlessConsumer(
RecordHandler recordHandler,
ApplicationErrorHandler errorHandler,
KafkaProperties kafkaProperties,
KafkaListenerEndpointRegistry endpointRegistry)
{
return
- new EndlessConsumer<>(
+ new EndlessConsumer(
kafkaProperties.getClientId(),
endpointRegistry,
errorHandler,
@RequiredArgsConstructor
public class ApplicationHealthIndicator implements HealthIndicator
{
- private final EndlessConsumer<String, Message> consumer;
+ private final EndlessConsumer consumer;
@Override
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.HashMap;
@RequiredArgsConstructor
@Slf4j
-public class ApplicationRecordHandler implements RecordHandler<String, Message>
+public class ApplicationRecordHandler implements RecordHandler
{
private final AdderResults results;
private final Optional<Duration> throttle;
private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+ @Override
public void addNumber(
+ String topic,
Integer partition,
+ Long offset,
String user,
MessageAddNumber message)
{
state.get(partition).addToSum(user, message.getNext());
+ throttle();
}
+ @Override
public void calculateSum(
+ String topic,
Integer partition,
+ Long offset,
String user,
MessageCalculateSum message)
{
AdderResult result = state.get(partition).calculate(user);
log.info("{} - New result for {}: {}", id, user, result);
results.addResults(partition, user, result);
+ throttle();
}
- @Override
- public void accept(ConsumerRecord<String, Message> record)
+ private void throttle()
{
- Integer partition = record.partition();
- String user = record.key();
- Message message = record.value();
-
- switch(message.getType())
- {
- case ADD:
- addNumber(partition, user, (MessageAddNumber) message);
- break;
-
- case CALC:
- calculateSum(partition, user, (MessageCalculateSum) message);
- break;
- }
-
if (throttle.isPresent())
{
try
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
import java.util.List;
import java.util.Optional;
@RequiredArgsConstructor
@Slf4j
-public class EndlessConsumer<K, V>
+@KafkaListener(
+ id = "${spring.kafka.client-id}",
+ idIsGroup = false,
+ topics = "${sumup.adder.topic}",
+ autoStartup = "false")
+public class EndlessConsumer
{
private final String id;
private final KafkaListenerEndpointRegistry registry;
private final ApplicationErrorHandler errorHandler;
- private final RecordHandler<K, V> recordHandler;
+ private final RecordHandler recordHandler;
private long consumed = 0;
- @KafkaListener(
- id = "${spring.kafka.client-id}",
- idIsGroup = false,
- topics = "${sumup.adder.topic}",
- autoStartup = "false")
- public void accept(ConsumerRecord<K, V> record)
+ @KafkaHandler
+ public void addNumber(
+ @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
+ @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
+ @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+ @Header(KafkaHeaders.OFFSET) Long offset,
+ @Payload MessageAddNumber message)
{
log.info(
"{} - {}: {}/{} - {}={}",
id,
- record.offset(),
- record.topic(),
- record.partition(),
- record.key(),
- record.value()
+ offset,
+ topic,
+ partition,
+ key,
+ message
);
- recordHandler.accept(record);
+ recordHandler.addNumber(topic, partition, offset, key, message);
+
+ consumed++;
+ }
+
+ @KafkaHandler
+ public void calculateSum(
+ @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
+ @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
+ @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+ @Header(KafkaHeaders.OFFSET) Long offset,
+ @Payload MessageCalculateSum message)
+ {
+ log.info(
+ "{} - {}: {}/{} - {}={}",
+ id,
+ offset,
+ topic,
+ partition,
+ key,
+ message
+ );
+
+ recordHandler.calculateSum(topic, partition, offset, key, message);
consumed++;
}
import java.util.function.Consumer;
-public interface RecordHandler<K, V> extends Consumer<ConsumerRecord<K,V>>
+public interface RecordHandler
{
+ void addNumber(
+ String topic,
+ Integer partition,
+ Long offset,
+ String user,
+ MessageAddNumber message);
+ void calculateSum(
+ String topic,
+ Integer partition,
+ Long offset,
+ String user,
+ MessageCalculateSum message);
}
import com.mongodb.client.MongoClient;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
- TestRecordHandler<K, V> recordHandler;
+ TestRecordHandler recordHandler;
@Autowired
- EndlessConsumer<K, V> endlessConsumer;
+ EndlessConsumer endlessConsumer;
KafkaProducer<Bytes, Bytes> testRecordProducer;
KafkaConsumer<Bytes, Bytes> offsetConsumer;
await(numberOfGeneratedMessages + " records received")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
- .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages);
+ .until(() -> recordHandler.receivedMessages >= numberOfGeneratedMessages);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
checkSeenOffsetsForProgress();
assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
- assertThat(recordHandler.receivedRecords.size())
+ assertThat(recordHandler.receivedMessages)
.describedAs("Received not all sent events")
.isLessThan(numberOfGeneratedMessages);
oldOffsets = new HashMap<>();
recordHandler.seenOffsets = new HashMap<>();
- recordHandler.receivedRecords = new HashSet<>();
+ recordHandler.receivedMessages = 0;
doForCurrentOffsets((tp, offset) ->
{
package de.juplo.kafka;
import lombok.RequiredArgsConstructor;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
@RequiredArgsConstructor
-public class TestRecordHandler<K, V> implements RecordHandler<K, V>
+public class TestRecordHandler implements RecordHandler
{
- private final RecordHandler<K, V> handler;
+ private final RecordHandler handler;
Map<TopicPartition, Long> seenOffsets;
- Set<ConsumerRecord<K, V>> receivedRecords;
+ int receivedMessages;
- public void onNewRecord(ConsumerRecord<K, V> record)
+ public void onNewRecord(
+ String topic,
+ Integer partition,
+ Long offset,
+ Message messgage)
{
- seenOffsets.put(
- new TopicPartition(record.topic(), record.partition()),
- record.offset());
- receivedRecords.add(record);
+ seenOffsets.put(new TopicPartition(topic, partition), offset);
+ receivedMessages++;
}
@Override
- public void accept(ConsumerRecord<K, V> record)
+ public void addNumber(
+ String topic,
+ Integer partition,
+ Long offset,
+ String user,
+ MessageAddNumber message)
{
- this.onNewRecord(record);
- handler.accept(record);
+ this.onNewRecord(topic, partition, offset, message);
+ handler.addNumber(topic, partition, offset, user, message);
+ }
+
+ @Override
+ public void calculateSum(
+ String topic,
+ Integer partition,
+ Long offset,
+ String user,
+ MessageCalculateSum message)
+ {
+ this.onNewRecord(topic, partition, offset, message);
+ handler.calculateSum(topic, partition, offset, user, message);
}
}