import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.time.Clock;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
@Configuration
@Bean
public EndlessConsumer<String, Integer> endlessConsumer(
KafkaConsumer<String, Integer> kafkaConsumer,
+ KafkaProducer<String, Object> kafkaProducer,
ExecutorService executor,
ApplicationRecordHandler recordHandler,
- ApplicationProperties properties)
+ ApplicationProperties properties,
+ Clock clock)
{
return
new EndlessConsumer<>(
executor,
properties.getClientId(),
properties.getTopicIn(),
+ properties.getPartitions(),
kafkaConsumer,
- recordHandler);
+ kafkaProducer,
+ recordHandler,
+ clock,
+ properties.getCommitInterval().toMillis());
}
@Bean
props.put("group.id", properties.getGroupId());
props.put("client.id", properties.getClientId());
props.put("auto.offset.reset", properties.getAutoOffsetReset());
- props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
+ props.put("enable.auto.commit", false);
props.put("metadata.max.age.ms", "1000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", IntegerDeserializer.class.getName());
return new KafkaProducer<>(props);
}
+
+ @Bean
+ public Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
import javax.annotation.PreDestroy;
+import java.time.Clock;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
private final ExecutorService executor;
private final String id;
private final String topic;
+ private final int partitions;
private final Consumer<K, V> consumer;
+ private final Producer<?, ?> producer;
private final RecordHandler<K, V> handler;
+ private final Clock clock;
+ private final long autoCommitIntervalMs;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private boolean running = false;
private Exception exception;
private long consumed = 0;
+ private long lastCommit = 0;
{
try
{
+ OffsetRecorder offsetRecorder = new OffsetRecorder(topic, partitions);
+
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic));
+ consumer.subscribe(Arrays.asList(topic), offsetRecorder);
while (true)
{
record.value()
);
+ offsetRecorder.recordOffset(record.partition(), record.offset());
handler.accept(record);
+ long now = clock.millis();
+ if (now - lastCommit >= autoCommitIntervalMs)
+ {
+ producer.sendOffsetsToTransaction(
+ offsetRecorder.getOffsets(),
+ consumer.groupMetadata());
+ }
consumed++;
}
}
}
}
+ class OffsetRecorder implements ConsumerRebalanceListener
+ {
+ private final String topic;
+ private final boolean[] activePartitions;
+ private final long[] offsets;
+
+
+ OffsetRecorder(String topic, int partitions)
+ {
+ this.topic = topic;
+ activePartitions = new boolean[partitions];
+ offsets = new long[partitions];
+ for (int i=0; i< partitions; i++)
+ {
+ offsets[i] = -1;
+ activePartitions[i] = false;
+ }
+ }
+
+
+ void recordOffset(int partition, long offset)
+ {
+ if (!activePartitions[partition])
+ throw new IllegalStateException("Partition " + partition + " is not active!");
+
+ offsets[partition] = offset;
+ }
+
+ Map<TopicPartition, OffsetAndMetadata> getOffsets()
+ {
+ Map<TopicPartition, OffsetAndMetadata> recordedOffsets = new HashMap<>();
+
+ for (int i=0; i<offsets.length; i++)
+ {
+ if (activePartitions[i] && offsets[i] > -1)
+ recordedOffsets.put(
+ new TopicPartition(topic, i),
+ new OffsetAndMetadata(offsets[i]));
+ }
+
+ return recordedOffsets;
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> assignedPartitions)
+ {
+ assignedPartitions
+ .stream()
+ .forEach(topicPartition ->
+ {
+ log.info("Activating partition {}", topicPartition);
+ activePartitions[topicPartition.partition()] = true;
+ offsets[topicPartition.partition()] = -1;
+ });
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> revokedPartitions)
+ {
+ producer.sendOffsetsToTransaction(
+ revokedPartitions
+ .stream()
+ .collect(
+ () -> new HashMap<TopicPartition, OffsetAndMetadata>(),
+ (map, topicPartition) ->
+ {
+ log.info("Commiting & deactivating revoked partition {}", topicPartition);
+ activePartitions[topicPartition.partition()] = false;
+ map.put(topicPartition, new OffsetAndMetadata(offsets[topicPartition.partition()]));
+ },
+ (mapA, mapB) -> mapA.putAll(mapB)),
+ consumer.groupMetadata());
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> lostPartitions)
+ {
+ lostPartitions
+ .stream()
+ .forEach((topicPartition) ->
+ {
+ log.info("Deactivating lost partition {}", topicPartition);
+ activePartitions[topicPartition.partition()] = false;
+ });
+ }
+ }
+
private void shutdown()
{
shutdown(null);