d493aeeee211a08d7a6b6e3331f5caa712ad97db
[demos/spring/data-jdbc] / src / main / java / de / trion / kafka / outbox / OutboxProducer.java
1 package de.trion.kafka.outbox;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.stereotype.Component;
10
11 import javax.annotation.PreDestroy;
12 import java.util.Properties;
13 import java.util.concurrent.TimeUnit;
14
15 @Component
16 public class OutboxProducer {
17
18     private final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
19
20     private final ObjectMapper mapper;
21     private final String topic;
22     private final KafkaProducer<String, String> producer;
23
24
25     public OutboxProducer(ObjectMapper mapper, String bootstrapServers, String topic) {
26         this.mapper = mapper;
27         this.topic = topic;
28
29         Properties props = new Properties();
30         props.put("bootstrap.servers", bootstrapServers);
31         props.put("key.serializer", StringSerializer.class.getName());
32         props.put("value.serializer", StringSerializer.class.getName());
33         producer = new KafkaProducer<>(props);
34     }
35
36
37     public void send(UserEvent event) {
38         try {
39             String json = mapper.writeValueAsString(event);
40             ProducerRecord<String, String> record = new ProducerRecord<>(topic, event.user, json);
41             producer.send(record, (metadata, e) -> {
42                 if (e != null) {
43                     LOG.error("Could not send event {}!", json, e);
44                 }
45                 else {
46                     LOG.debug(
47                             "{}: send event {} with offset {} to partition {}",
48                             event.user,
49                             event.id,
50                             metadata.offset(),
51                             metadata.partition());
52                 }
53             });
54         } catch (Exception e) {
55             throw new RuntimeException("Fehler beim Senden des Events " + event.id, e);
56         }
57     }
58
59
60     @PreDestroy
61     public void stop(){
62         LOG.info("Closing the KafkaProducer...");
63         try {
64             producer.close(5, TimeUnit.SECONDS);
65             LOG.debug("Successfully closed the KafkaProducer");
66         }
67         catch (Exception e) {
68             LOG.warn("Exception while closing the KafkaProducer!", e);
69         }
70     }
71 }