1 package de.trion.kafka.outbox;
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.stereotype.Component;
11 import javax.annotation.PreDestroy;
12 import java.util.Properties;
13 import java.util.concurrent.TimeUnit;
16 public class OutboxProducer {
18 private final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
20 private final ObjectMapper mapper;
21 private final String topic;
22 private final KafkaProducer<String, String> producer;
25 public OutboxProducer(ObjectMapper mapper, String bootstrapServers, String topic) {
29 Properties props = new Properties();
30 props.put("bootstrap.servers", bootstrapServers);
31 props.put("key.serializer", StringSerializer.class.getName());
32 props.put("value.serializer", StringSerializer.class.getName());
33 producer = new KafkaProducer<>(props);
37 public void send(UserEvent event) {
39 String json = mapper.writeValueAsString(event);
40 ProducerRecord<String, String> record = new ProducerRecord<>(topic, event.user, json);
41 producer.send(record, (metadata, e) -> {
43 LOG.error("Could not send event {}!", json, e);
47 "{}: send event {} with offset {} to partition {}",
51 metadata.partition());
54 } catch (Exception e) {
55 throw new RuntimeException("Fehler beim Senden des Events " + event.id, e);
62 LOG.info("Closing the KafkaProducer...");
64 producer.close(5, TimeUnit.SECONDS);
65 LOG.debug("Successfully closed the KafkaProducer");
68 LOG.warn("Exception while closing the KafkaProducer!", e);