X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Foutbox;a=blobdiff_plain;f=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FOutboxProducer.java;fp=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FOutboxProducer.java;h=c08cae7d6756655e5dbbe55dce19c2c3ed92256d;hp=0000000000000000000000000000000000000000;hb=fbd0ca0df5004d13a5e93cdb8373bafc60440c8b;hpb=bdc7b089e14ddc16f7e76f6a736b27b608e35ab4 diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java new file mode 100644 index 0000000..c08cae7 --- /dev/null +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -0,0 +1,105 @@ +package de.juplo.kafka.outbox.delivery; + +import com.google.common.primitives.Longs; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + + +@Component +public class OutboxProducer +{ + final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class); + + + private final OutboxRepository repository; + private final KafkaProducer producer; + private final String topic; + + private long sequenceNumber = 0l; + + public OutboxProducer( + ApplicationProperties properties, + OutboxRepository repository) + { + this.repository = repository; + + Properties props = new Properties(); + props.put("bootstrap.servers", properties.bootstrapServers); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + this.producer = new KafkaProducer<>(props); + this.topic = properties.topic; + } + + @Scheduled(fixedDelay = 500) + public void poll() + { + List items; + do + { + items = repository.fetch(sequenceNumber); + LOG.debug("Polled {} new items", items.size()); + for (OutboxItem item : items) + send(item); + } + while (items.size() > 0); + } + + void send(OutboxItem item) + { + final ProducerRecord record = + new ProducerRecord<>(topic, item.getKey(), item.getValue()); + + sequenceNumber = item.getSequenceNumber(); + record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber)); + + producer.send(record, (metadata, e) -> + { + if (metadata != null) + { + int deleted = repository.delete(item.getSequenceNumber()); + LOG.info( + "{}/{}:{} - {}:{}={} - deleted: {}", + metadata.topic(), + metadata.partition(), + metadata.offset(), + item.getSequenceNumber(), + record.key(), + record.value(), + deleted); + } + else + { + // HANDLE ERROR + LOG.error( + "{}/{} - {}:{}={} -> ", + record.topic(), + record.partition(), + item.getSequenceNumber(), + record.key(), + record.value(), + e); + } + }); + } + + + @PreDestroy + public void close() + { + producer.close(Duration.ofSeconds(5)); + } +}