package de.juplo.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
++import org.springframework.kafka.core.ConsumerFactory;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.function.Consumer;
}
@Bean
- public EndlessConsumer<String, ClientMessage> endlessConsumer(
- KafkaConsumer<String, ClientMessage> kafkaConsumer,
- ExecutorService executor,
- Consumer<ConsumerRecord<String, ClientMessage>> handler,
- ApplicationProperties properties)
+ public ApplicationErrorHandler errorHandler()
{
- return
- new EndlessConsumer<>(
- executor,
- properties.getClientId(),
- properties.getTopic(),
- kafkaConsumer,
- handler);
- }
-
- @Bean
- public ExecutorService executor()
- {
- return Executors.newSingleThreadExecutor();
+ return new ApplicationErrorHandler();
}
- public KafkaConsumer<String, ClientMessage> kafkaConsumer(ApplicationProperties properties)
+
+ @Bean(destroyMethod = "close")
- Properties props = new Properties();
-
- props.put("bootstrap.servers", properties.getBootstrapServer());
- props.put("group.id", properties.getGroupId());
- props.put("client.id", properties.getClientId());
- props.put("auto.offset.reset", properties.getAutoOffsetReset());
- props.put("metadata.max.age.ms", "1000");
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", JsonDeserializer.class.getName());
- props.put(JsonDeserializer.TYPE_MAPPINGS, "message:" + ClientMessage.class.getName());
- props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka");
-
- return new KafkaConsumer<>(props);
++ public org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer(ConsumerFactory<String, ClientMessage> factory)
+ {
++ return factory.createConsumer();
+ }
}
group-id: ${consumer.group-id}
topic: ${consumer.topic}
auto-offset-reset: ${consumer.auto-offset-reset}
- value-deserializer: org.apache.kafka.common.serialization.LongDeserializer
+spring:
+ kafka:
+ consumer:
+ bootstrap-servers: ${consumer.bootstrap-server}
+ client-id: ${consumer.client-id}
+ auto-offset-reset: ${consumer.auto-offset-reset}
+ group-id: ${consumer.group-id}
++ value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
++ properties:
++ spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage"
++ spring.json.trusted.packages: "de.juplo.kafka"
logging:
level:
root: INFO
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.Primary;
+ import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
+ import java.util.function.BiFunction;
import java.util.function.Consumer;
- import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@Autowired
ApplicationProperties properties;
@Autowired
- ExecutorService executor;
+ EndlessConsumer endlessConsumer;
+ @Autowired
+ RecordHandler recordHandler;
- Consumer<ConsumerRecord<String, ClientMessage>> testHandler;
- EndlessConsumer<String, ClientMessage> endlessConsumer;
Map<TopicPartition, Long> oldOffsets;
Map<TopicPartition, Long> newOffsets;
- Set<ConsumerRecord<String, Long>> receivedRecords;
+ Set<ConsumerRecord<String, ClientMessage>> receivedRecords;
/** Tests methods */
}
}
- public static class RecordHandler implements Consumer<ConsumerRecord<String, Long>>
++ public static class RecordHandler implements Consumer<ConsumerRecord<String, ClientMessage>>
+ {
- Consumer<ConsumerRecord<String, Long>> captureOffsets;
- Consumer<ConsumerRecord<String, Long>> testHandler;
++ Consumer<ConsumerRecord<String, ClientMessage>> captureOffsets;
++ Consumer<ConsumerRecord<String, ClientMessage>> testHandler;
+
+
+ @Override
- public void accept(ConsumerRecord<String, Long> record)
++ public void accept(ConsumerRecord<String, ClientMessage> record)
+ {
+ captureOffsets
+ .andThen(testHandler)
+ .accept(record);
+ }
+ }
@TestConfiguration
@Import(ApplicationConfiguration.class)
public static class Configuration
{
- public Consumer<ConsumerRecord<String, Long>> testHandler()
+ @Primary
+ @Bean
++ public Consumer<ConsumerRecord<String, ClientMessage>> testHandler()
+ {
+ return new RecordHandler();
+ }
+
@Bean
- Serializer<Long> serializer()
+ Serializer<ClientMessage> serializer()
{
- return new LongSerializer();
+ return new JsonSerializer<>();
}
@Bean