import javax.annotation.PreDestroy;
+import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
@Component
public class OutboxProducer
{
final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
+ public final static String HEADER = "#";
private final OutboxRepository repository;
private final KafkaProducer<String, String> producer;
this.repository = repository;
Properties props = new Properties();
- props.put("bootstrap.servers", properties.bootstrapServers);
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+ props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ENABLE_IDEMPOTENCE_CONFIG, true);
this.producer = new KafkaProducer<>(props);
this.topic = properties.topic;
}
- @Scheduled(fixedDelay = 500)
+ @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}")
public void poll()
{
List<OutboxItem> items;
new ProducerRecord<>(topic, item.getKey(), item.getValue());
sequenceNumber = item.getSequenceNumber();
- record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
+ record.headers().add(HEADER, Longs.toByteArray(sequenceNumber));
producer.send(record, (metadata, e) ->
{