package de.juplo.kafka;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.StickyAssignor;
@Configuration
@EnableConfigurationProperties(ApplicationProperties.class)
+@Slf4j
public class ApplicationConfiguration
{
@Bean
public ExampleConsumer exampleConsumer(
Consumer<String, Long> kafkaConsumer,
+ RecordHandler<String, Long> recordHandler,
ApplicationProperties properties,
ConfigurableApplicationContext applicationContext)
{
properties.getClientId(),
properties.getConsumerProperties().getTopic(),
kafkaConsumer,
+ recordHandler,
() -> applicationContext.close());
}
+ @Bean
+ public RecordHandler<String, Long> recordHandler(ApplicationProperties properties)
+ {
+ return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value);
+ }
+
@Bean(destroyMethod = "")
public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
{
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
private final String id;
private final String topic;
private final Consumer<String, Long> consumer;
+ private final RecordHandler<String, Long> recordHandler;
private final Thread workerThread;
private final Runnable closeCallback;
String clientId,
String topic,
Consumer<String, Long> consumer,
+ RecordHandler<String, Long> recordHandler,
Runnable closeCallback)
{
this.id = clientId;
this.topic = topic;
this.consumer = consumer;
+ this.recordHandler = recordHandler;
workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
workerThread.start();
record.value());
}
}
- catch (RecordDeserializationException e)
+ catch(RecordDeserializationException e)
{
- log.error(
- "{} - Ignoring invalid record for offset {} on partition {}: {}",
- id,
- e.offset(),
- e.topicPartition(),
- e.getMessage());
- consumer.seek(e.topicPartition(), e.offset() + 1);
+ log.error("{} - Deserialization Exception {}:{}", id, e.topicPartition(), e.offset());
+ consumer.seek(e.topicPartition(), e.offset() +1);
}
}
}
{
consumed++;
log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
+ recordHandler.handleRecord(topic, partition, offset, key, value);
}