5b731a254eb9e348f0d699b7c234c21032a2f273
[demos/kafka/outbox] / src / main / java / de / juplo / kafka / outbox / polling / OutboxListener.java
1 package de.juplo.kafka.outbox.polling;
2
3 import org.apache.kafka.clients.producer.KafkaProducer;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.LongSerializer;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.dao.DataAccessException;
10 import org.springframework.jdbc.core.JdbcTemplate;
11 import org.springframework.stereotype.Component;
12 import org.springframework.transaction.annotation.Propagation;
13 import org.springframework.transaction.annotation.Transactional;
14 import org.springframework.transaction.event.TransactionalEventListener;
15
16 import javax.annotation.PreDestroy;
17 import java.util.*;
18 import java.util.concurrent.TimeUnit;
19
20 @Component
21 public class OutboxListener {
22
23     private static final Logger LOG = LoggerFactory.getLogger(OutboxListener.class);
24
25
26     private final JdbcTemplate jdbcTemplate;
27     private final String topic;
28     private final KafkaProducer<Long, String> producer;
29
30     private long last = 0;
31
32     public OutboxListener(
33             JdbcTemplate jdbcTemplate,
34             String bootstrapServers,
35             String topic)
36     {
37         this.jdbcTemplate = jdbcTemplate;
38         this.topic = topic;
39
40         Properties props = new Properties();
41         props.put("bootstrap.servers", bootstrapServers);
42         props.put("key.serializer", LongSerializer.class.getName());
43         props.put("value.serializer", StringSerializer.class.getName());
44         producer = new KafkaProducer<>(props);
45     }
46
47
48     @TransactionalEventListener
49     @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
50     public synchronized void onOutboxEvent(UserEvent userEvent)
51     {
52         List<Map<String, Object>> result =
53             jdbcTemplate.queryForList(
54                     "SELECT id, event, username FROM events WHERE id > ? ORDER BY id ASC",
55                     last);
56
57         try
58         {
59             for (Map<String, Object> entry : result)
60             {
61                 Long id = (Long)entry.get("id");
62                 UserEvent.Type type = UserEvent.Type.ofInt((Short)entry.get("event"));
63                 String username = (String)entry.get("username");
64                 String event = username + ":" + type.name();
65
66                 ProducerRecord<Long, String> record = new ProducerRecord<>(topic, id, event);
67                 producer.send(record, (metadata, e) -> {
68                     if (e != null) {
69                         LOG.error("Could not send event {} ({}): ", id, event, e);
70                     }
71                     else {
72                         LOG.debug(
73                                 "Send event {} ({}) with offset {} to partition {}",
74                                 id,
75                                 event,
76                                 metadata.offset(),
77                                 metadata.partition());
78                         deleteOutboxEntry(id);
79                     }
80                 });
81
82                 last = id;
83             }
84         }
85         catch (Exception e) {
86             throw new RuntimeException("Fehler beim Senden des Events", e);
87         }
88     }
89
90     @Transactional
91     void deleteOutboxEntry(Long id)
92     {
93         try
94         {
95             int result = jdbcTemplate.update("DELETE FROM events WHERE id = ?", id);
96             LOG.debug("entry {} {} from outbox", id, result == 1 ? "deleted" : "has already been deleted");
97         }
98         catch (DataAccessException e)
99         {
100             LOG.error("Execption while deleting row from outbox: {}!", id, e);
101         }
102     }
103
104     @PreDestroy
105     public void stop(){
106         LOG.info("Closing the KafkaProducer...");
107         try {
108             producer.close(5, TimeUnit.SECONDS);
109             LOG.debug("Successfully closed the KafkaProducer");
110         }
111         catch (Exception e) {
112             LOG.warn("Exception while closing the KafkaProducer!", e);
113         }
114     }
115 }