Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
props.put("group.id", properties.getGroupId());
props.put("client.id", properties.getClientId());
+ props.put("enable.auto.commit", false);
props.put("auto.offset.reset", properties.getAutoOffsetReset());
+ props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
props.put("metadata.max.age.ms", "1000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", LongDeserializer.class.getName());
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
- import java.util.function.Function;
-import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@Autowired
KafkaConsumer<String, Long> kafkaConsumer;
@Autowired
+ KafkaConsumer<Bytes, Bytes> offsetConsumer;
+ @Autowired
+ PartitionStatisticsRepository partitionStatisticsRepository;
+ @Autowired
ApplicationProperties properties;
@Autowired
ExecutorService executor;
/** Helper methods for setting up and running the tests */
- offsetConsumer.seekToEnd(partitions());
+ void seekToEnd()
+ {
+ offsetConsumer.assign(partitions());
- // seekToEnd() works lazily: it only takes effect on poll()/position()
+ partitions().forEach(tp ->
+ {
- // The new positions must be commited!
- offsetConsumer.commitSync();
+ Long offset = offsetConsumer.position(tp);
+ log.info("New position for {}: {}", tp, offset);
++ Integer partition = tp.partition();
++ StatisticsDocument document =
++ partitionStatisticsRepository
++ .findById(partition.toString())
++ .orElse(new StatisticsDocument(partition));
++ document.offset = offset;
++ partitionStatisticsRepository.save(document);
+ });
+ offsetConsumer.unsubscribe();
+ }
+
void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
{
- offsetConsumer.assign(partitions());
- partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
- offsetConsumer.unsubscribe();
- }
+ partitions().forEach(tp ->
+ {
+ String partition = Integer.toString(tp.partition());
+ Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
+ consumer.accept(tp, offset.orElse(0l));
+ });
+ }
List<TopicPartition> partitions()
{
@BeforeEach
public void init()
{
- testHandler = record -> {} ;
-
+ seekToEnd();
+
oldOffsets = new HashMap<>();
newOffsets = new HashMap<>();
receivedRecords = new HashSet<>();
return new KafkaProducer<>(props);
}
- props.put("group.id", properties.getGroupId());
+
+ @Bean
+ KafkaConsumer<Bytes, Bytes> offsetConsumer(ApplicationProperties properties)
+ {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("client.id", "OFFSET-CONSUMER");
++ props.put("enable.auto.commit", false);
++ props.put("auto.offset.reset", "latest");
+ props.put("key.deserializer", BytesDeserializer.class.getName());
+ props.put("value.deserializer", BytesDeserializer.class.getName());
+
+ return new KafkaConsumer<>(props);
+ }
}
}