import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import java.time.Clock;
import java.util.Properties;
Consumer<String, String> kafkaConsumer,
Producer<String, String> kafkaProducer,
ApplicationProperties properties,
+ Clock clock,
ConfigurableApplicationContext applicationContext)
{
return
kafkaConsumer,
properties.getProducerProperties().getTopic(),
kafkaProducer,
+ clock,
+ properties.getConsumerProperties().getAutoCommitInterval(),
() -> applicationContext.close());
}
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("client.id", properties.getClientId());
props.put("group.id", properties.getConsumerProperties().getGroupId());
+ props.put("enable.auto.commit", false);
if (properties.getConsumerProperties().getAutoOffsetReset() != null)
{
props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name());
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("client.id", properties.getClientId());
+ props.put("transactional.id", "my-tx");
props.put("acks", properties.getProducerProperties().getAcks());
props.put("batch.size", properties.getProducerProperties().getBatchSize());
props.put("metadata.maxage.ms", 5000); // 5 Sekunden
return new KafkaProducer<>(props);
}
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
}
package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import java.time.Clock;
import java.time.Duration;
+import java.time.Instant;
import java.util.*;
import java.util.concurrent.Phaser;
+import java.util.stream.Collectors;
@Slf4j
private final String stateTopic;
private final Producer<String, String> producer;
+ private final Clock clock;
+ private final Duration commitInterval;
+
private volatile boolean running = false;
+ private Instant lastCommit;
private final Phaser phaser = new Phaser(1);
private final Set<TopicPartition> assignedPartitions = new HashSet<>();
private volatile State[] partitionStates;
Consumer<String, String> consumer,
String stateTopic,
Producer<String, String> producer,
+ Clock clock,
+ Duration commitInterval,
Runnable closeCallback)
{
this.id = clientId;
this.consumer = consumer;
this.stateTopic = stateTopic;
this.producer = producer;
+ this.clock = clock;
+ this.commitInterval = commitInterval;
workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
workerThread.start();
consumer.subscribe(Arrays.asList(topic, stateTopic), this);
running = true;
+ log.info("{} - Initializing the transaction", id);
+ producer.initTransactions();
+
+ lastCommit = clock.instant();
+ producer.beginTransaction();
+
while (running)
{
ConsumerRecords<String, String> records =
int arrivedPhase = phaser.arriveAndAwaitAdvance();
log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase);
+
+ commitIfNecessary();
}
}
catch(WakeupException e)
catch(Exception e)
{
log.error("{} - Unexpected error, unsubscribing!", id, e.toString());
- consumer.unsubscribe();
+ producer.abortTransaction();
log.info("{} - Triggering exit of application!", id);
new Thread(closeCallback).start();
+ return;
}
finally
{
consumer.close();
log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
}
+
+ producer.commitTransaction();
+ }
+
+ private void commitIfNecessary()
+ {
+ Instant now = clock.instant();
+ if (!now.isBefore(lastCommit.plus(commitInterval)))
+ {
+ producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata());
+ commit();
+ lastCommit = now;
+ }
+ }
+
+ private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets()
+ {
+ return assignedPartitions
+ .stream()
+ .collect(Collectors.toMap(
+ partition -> partition,
+ partition -> new OffsetAndMetadata(consumer.position(partition))));
+ }
+
+ private void commit()
+ {
+ producer.commitTransaction();
+ producer.beginTransaction();
}
private void handleRecord(