juplo.producer.topic: test
consumer-1:
- image: juplo/spring-consumer:1.1-long-SNAPSHOT
+ image: juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: consumer-1
juplo.consumer.topic: test
consumer-2:
- image: juplo/spring-consumer:1.1-long-SNAPSHOT
+ image: juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: consumer-2
<artifactId>spring-consumer</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.1-long-SNAPSHOT</version>
+ <version>1.1-deserialization-error-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
while (running)
{
- ConsumerRecords<String, Long> records =
+ try
+ {
+ ConsumerRecords<String, Long> records =
consumer.poll(Duration.ofSeconds(1));
- log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<String, Long> record : records)
+ log.info("{} - Received {} messages", id, records.count());
+ for (ConsumerRecord<String, Long> record : records)
+ {
+ handleRecord(
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.key(),
+ record.value());
+ }
+ }
+ catch (RecordDeserializationException e)
{
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
+ log.error(
+ "{} - Ignoring invalid record for offset {} on partition {}: {}",
+ id,
+ e.offset(),
+ e.topicPartition(),
+ e.getMessage());
+ consumer.seek(e.topicPartition(), e.offset() + 1);
}
}
}