import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.backoff.FixedBackOff;
import java.util.Map;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
@Configuration
public class ApplicationConfiguration
{
@Bean
- public Consumer<ClientMessage> messageHandler()
+ public BiConsumer<ClientMessage, ConsumerRecordMetadata> messageHandler()
{
- return (message) ->
+ return (message, metadata) ->
{
- log.info("Received ClientMessage: {}", message);
+ log.info("Received ClientMessage: {}, Metadata: {}", message, metadata);
};
}
@Bean
- public Consumer<Greeting> greetingsHandler()
+ public BiConsumer<Greeting, ConsumerRecordMetadata> greetingsHandler()
{
- return (greeting) ->
+ return (greeting, metadata) ->
{
- log.info("Received Greeting: {}", greeting);
+ log.info("Received Greeting: {}, Metadata: {}", greeting, metadata);
};
}
new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
- Object.class, new JsonSerializer<>())));
+ ClientMessage.class, new JsonSerializer<>(),
+ Greeting.class, new JsonSerializer<>())));
}
@Bean
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
+import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
@Component
@Value("${spring.kafka.consumer.client-id}")
String id;
@Autowired
- Consumer<ClientMessage> messageHandler;
+ BiConsumer<ClientMessage, ConsumerRecordMetadata> messageHandler;
@Autowired
- Consumer<Greeting> greetingsHandler;
+ BiConsumer<Greeting, ConsumerRecordMetadata> greetingsHandler;
private long consumed = 0;
@KafkaHandler
- public void receiveGreeting(Greeting greeting)
+ public void receiveMessage(
+ @Payload ClientMessage message,
+ @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata)
{
- greetingsHandler.accept(greeting);
+ messageHandler.accept(message, metadata);
consumed++;
}
@KafkaHandler
- public void receiveMessage(ClientMessage message)
+ public void receiveGreeting(
+ @Payload Greeting greeting,
+ @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata)
+ {
+ greetingsHandler.accept(greeting, metadata);
+ consumed++;
+ }
+
+ @KafkaHandler(isDefault = true)
+ public void unknown(
+ @Payload Object unknown,
+ @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata)
{
- messageHandler.accept(message);
consumed++;
}
package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@Autowired
EndlessConsumer endlessConsumer;
@Autowired
- RecordHandler recordHandler;
+ ClientMessageHandler clientMessageHandler;
Map<TopicPartition, Long> oldOffsets;
Map<TopicPartition, Long> newOffsets;
- Set<ConsumerRecord<String, ClientMessage>> receivedRecords;
+ Set<ClientMessage> received;
/** Tests methods */
await("100 records received")
.atMost(Duration.ofSeconds(30))
- .until(() -> receivedRecords.size() == 100);
+ .until(() -> received.size() == 100);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
await("99 records received")
.atMost(Duration.ofSeconds(30))
- .until(() -> receivedRecords.size() == 99);
+ .until(() -> received.size() == 99);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
@Test
void commitsOffsetOnProgramLogicErrorFoo()
{
- recordHandler.testHandler = (record) ->
+ clientMessageHandler.testHandler = (clientMessage, metadata) ->
{
- if (Integer.parseInt(record.value().message)%10 ==0)
- throw new RuntimeException("BOOM: " + record.value().message + "%10 == 0");
+ if (Integer.parseInt(clientMessage.message)%10 ==0)
+ throw new RuntimeException("BOOM: " + clientMessage.message + "%10 == 0");
};
send100Messages((key, counter) -> serialize(key, counter));
await("80 records received")
.atMost(Duration.ofSeconds(30))
- .until(() -> receivedRecords.size() == 100);
+ .until(() -> received.size() == 100);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
@BeforeEach
public void init()
{
- recordHandler.testHandler = (record) -> {};
+ clientMessageHandler.testHandler = (clientMessage, metadata) -> {};
oldOffsets = new HashMap<>();
newOffsets = new HashMap<>();
- receivedRecords = new HashSet<>();
+ received = new HashSet<>();
doForCurrentOffsets((tp, offset) ->
{
newOffsets.put(tp, offset - 1);
});
- recordHandler.captureOffsets =
- record ->
+ clientMessageHandler.captureOffsets =
+ (clientMessage, metadata) ->
{
- receivedRecords.add(record);
- log.debug("TEST: Processing record #{}: {}", receivedRecords.size(), record.value());
+ received.add(clientMessage);
+ log.debug("TEST: Processing record #{}: {}", received.size(), clientMessage);
newOffsets.put(
- new TopicPartition(record.topic(), record.partition()),
- record.offset());
+ new TopicPartition(metadata.topic(), metadata.partition()), metadata.offset());
};
endlessConsumer.start();
}
}
- public static class RecordHandler implements Consumer<ConsumerRecord<String, ClientMessage>>
+ public static class ClientMessageHandler implements BiConsumer<ClientMessage, ConsumerRecordMetadata>
{
- Consumer<ConsumerRecord<String, ClientMessage>> captureOffsets;
- Consumer<ConsumerRecord<String, ClientMessage>> testHandler;
+ BiConsumer<ClientMessage, ConsumerRecordMetadata> captureOffsets;
+ BiConsumer<ClientMessage, ConsumerRecordMetadata> testHandler;
@Override
- public void accept(ConsumerRecord<String, ClientMessage> record)
+ public void accept(ClientMessage clientMessage, ConsumerRecordMetadata metadata)
{
captureOffsets
.andThen(testHandler)
- .accept(record);
+ .accept(clientMessage, metadata);
}
}
{
@Primary
@Bean
- public Consumer<ConsumerRecord<String, ClientMessage>> testHandler()
+ public BiConsumer<ClientMessage, ConsumerRecordMetadata> testHandler()
{
- return new RecordHandler();
+ return new ClientMessageHandler();
}
@Bean