X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Foutbox;a=blobdiff_plain;f=outbox%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2FOutboxProducer.java;fp=outbox%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2FOutboxProducer.java;h=627ca0578fc3a41d78f90183b2669b388c5bf2c3;hp=0000000000000000000000000000000000000000;hb=996911bbed45e0211e48976e3cb3971631361e5b;hpb=64251b8eafa2534c359e8e2fc243c17b5a97a61a diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java new file mode 100644 index 0000000..627ca05 --- /dev/null +++ b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java @@ -0,0 +1,107 @@ +package de.juplo.kafka.outbox; + +import org.apache.kafka.common.serialization.StringSerializer; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + + +@Component +public class OutboxProducer +{ + final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class); + + + private final OutboxRepository repository; + private final KafkaProducer producer; + private final String topic; + + private long sequenceNumber = 0l; + + public OutboxProducer( + ApplicationProperties properties, + OutboxRepository repository) + { + this.repository = repository; + + Properties props = new Properties(); + props.put("bootstrap.servers", properties.bootstrapServers); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + this.producer = new KafkaProducer<>(props); + this.topic = properties.topic; + } + + @Scheduled(fixedDelay = 500) + public void poll() + { + List items; + do + { + items = repository.fetch(sequenceNumber); + LOG.debug("Polled {} new items", items.size()); + for (OutboxItem item : items) + send(item); + } + while (items.size() > 0); + } + + void send(OutboxItem item) + { + final ProducerRecord record = + new ProducerRecord<>(topic, item.getKey(), item.getValue()); + + sequenceNumber = item.getSequenceNumber(); + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(item.getSequenceNumber()); + record.headers().add("SEQ#", buffer.array()); + + producer.send(record, (metadata, e) -> + { + if (metadata != null) + { + int deleted = repository.delete(item.getSequenceNumber()); + LOG.info( + "{}/{}:{} - {}:{}={} - deleted: {}", + metadata.topic(), + metadata.partition(), + metadata.offset(), + item.getSequenceNumber(), + record.key(), + record.value(), + deleted); + } + else + { + // HANDLE ERROR + LOG.error( + "{}/{} - {}:{}={} -> ", + record.topic(), + record.partition(), + item.getSequenceNumber(), + record.key(), + record.value(), + e); + } + }); + } + + + @PreDestroy + public void close() + { + producer.close(Duration.ofSeconds(5)); + } +}