99adae9a43e3565ab8aca893b2fb85ed4424b6dd
[demos/spring/data-jdbc] / src / main / java / de / trion / kafka / outbox / OutboxProducer.java
1 package de.trion.kafka.outbox;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.LongSerializer;
7 import org.apache.kafka.common.serialization.StringSerializer;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10 import org.springframework.stereotype.Component;
11
12 import javax.annotation.PreDestroy;
13 import java.util.Properties;
14 import java.util.concurrent.TimeUnit;
15
16 @Component
17 public class OutboxProducer {
18
19     private final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
20
21     private final ObjectMapper mapper;
22     private final String topic;
23     private final KafkaProducer<String, String> producer;
24
25
26     public OutboxProducer(ObjectMapper mapper, String bootstrapServers, String topic) {
27         this.mapper = mapper;
28         this.topic = topic;
29
30         Properties props = new Properties();
31         props.put("bootstrap.servers", bootstrapServers);
32         props.put("key.serializer", StringSerializer.class.getName());
33         props.put("value.serializer", StringSerializer.class.getName());
34         producer = new KafkaProducer<>(props);
35     }
36
37
38     public void send(Event event) {
39         try {
40             String json = mapper.writeValueAsString(event);
41             ProducerRecord<String, String> record = new ProducerRecord<>(topic, event.user, json);
42             producer.send(record, (metadata, e) -> {
43                 if (e != null) {
44                     LOG.error("Could not send event {}!", json, e);
45                 }
46                 else {
47                     LOG.debug(
48                             "{}: send event {} with offset {} to partition {}",
49                             event.user,
50                             event.id,
51                             metadata.offset(),
52                             metadata.partition());
53                 }
54             });
55         } catch (Exception e) {
56             throw new RuntimeException("Fehler beim Senden des Events " + event.id, e);
57         }
58     }
59
60
61     @PreDestroy
62     public void stop(){
63         LOG.info("Closing the KafkaProducer...");
64         try {
65             producer.close(5, TimeUnit.SECONDS);
66             LOG.debug("Successfully closed the KafkaProducer");
67         }
68         catch (Exception e) {
69             LOG.warn("Exception while closing the KafkaProducer!", e);
70         }
71     }
72 }