package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaOperations;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
ApplicationRecordHandler recordHandler,
AdderResults adderResults,
StateRepository stateRepository,
- Consumer<String, Message> kafkaConsumer,
+ ConsumerFactory<String, Message> consumerFactory,
KafkaProperties kafkaProperties)
{
return new ApplicationRebalanceListener(
recordHandler,
adderResults,
stateRepository,
- kafkaConsumer,
+ consumerFactory.createConsumer(),
kafkaProperties.getConsumer().getGroupId());
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
private final ApplicationRecordHandler recordHandler;
private final AdderResults adderResults;
private final StateRepository stateRepository;
- private final String id;
- private final String topic;
private final Consumer consumer;
+ private final String id;
private final Set<Integer> partitions = new HashSet<>();