1 package de.trion.kafka.outbox;
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.LongSerializer;
7 import org.apache.kafka.common.serialization.StringSerializer;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10 import org.springframework.stereotype.Component;
12 import javax.annotation.PreDestroy;
13 import java.util.Properties;
14 import java.util.concurrent.TimeUnit;
17 public class OutboxProducer {
19 private final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
21 private final ObjectMapper mapper;
22 private final String topic;
23 private final KafkaProducer<String, String> producer;
26 public OutboxProducer(ObjectMapper mapper, String bootstrapServers, String topic) {
30 Properties props = new Properties();
31 props.put("bootstrap.servers", bootstrapServers);
32 props.put("key.serializer", StringSerializer.class.getName());
33 props.put("value.serializer", StringSerializer.class.getName());
34 producer = new KafkaProducer<>(props);
38 public void send(Event event) {
40 String json = mapper.writeValueAsString(event);
41 ProducerRecord<String, String> record = new ProducerRecord<>(topic, event.user, json);
42 producer.send(record, (metadata, e) -> {
44 LOG.error("Could not send event {}!", json, e);
48 "{}: send event {} with offset {} to partition {}",
52 metadata.partition());
55 } catch (Exception e) {
56 throw new RuntimeException("Fehler beim Senden des Events " + event.id, e);
63 LOG.info("Closing the KafkaProducer...");
65 producer.close(5, TimeUnit.SECONDS);
66 LOG.debug("Successfully closed the KafkaProducer");
69 LOG.warn("Exception while closing the KafkaProducer!", e);