import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
import java.util.List;
import java.util.Optional;
@RequiredArgsConstructor
@Slf4j
-public class EndlessConsumer<K, V>
+@KafkaListener(
+ id = "${spring.kafka.client-id}",
+ idIsGroup = false,
+ topics = "${sumup.adder.topic}",
+ autoStartup = "false")
+public class EndlessConsumer
{
private final String id;
private final KafkaListenerEndpointRegistry registry;
- private final ApplicationErrorHandler errorHandler;
- private final RecordHandler<K, V> recordHandler;
+ private final RecordHandler recordHandler;
private long consumed = 0;
- @KafkaListener(
- id = "${spring.kafka.client-id}",
- idIsGroup = false,
- topics = "${sumup.adder.topic}",
- batch = "true",
- autoStartup = "false")
- public void accept(List<ConsumerRecord<K, V>> records)
+ @KafkaHandler
+ public void addNumber(
+ @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
+ @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
+ @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+ @Header(KafkaHeaders.OFFSET) Long offset,
+ @Payload MessageAddNumber message)
{
- // Do something with the data...
- log.info("{} - Received {} messages", id, records.size());
- for (ConsumerRecord<K, V> record : records)
- {
log.info(
"{} - {}: {}/{} - {}={}",
id,
- record.offset(),
- record.topic(),
- record.partition(),
- record.key(),
- record.value()
+ offset,
+ topic,
+ partition,
+ key,
+ message
);
- recordHandler.accept(record);
+ recordHandler.addNumber(topic, partition, offset, key, message);
+
+ consumed++;
+ }
+
+ @KafkaHandler
+ public void calculateSum(
+ @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
+ @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
+ @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+ @Header(KafkaHeaders.OFFSET) Long offset,
+ @Payload MessageCalculateSum message)
+ {
+ log.info(
+ "{} - {}: {}/{} - {}={}",
+ id,
+ offset,
+ topic,
+ partition,
+ key,
+ message
+ );
+
+ recordHandler.calculateSum(topic, partition, offset, key, message);
consumed++;
- }
}
public void start()
throw new IllegalStateException("Consumer instance " + id + " is already running!");
log.info("{} - Starting - consumed {} messages before", id, consumed);
- errorHandler.clearState();
registry.getListenerContainer(id).start();
}
{
return registry.getListenerContainer(id).isRunning();
}
-
- public Optional<Exception> exitStatus()
- {
- if (running())
- throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
-
- return errorHandler.getException();
- }
}