WIP
[demos/kafka/outbox] / src / main / java / de / juplo / kafka / outbox / polling / OutboxListener.java
1 package de.juplo.kafka.outbox.polling;
2
3 import org.apache.kafka.clients.producer.KafkaProducer;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.LongSerializer;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.dao.DataAccessException;
10 import org.springframework.jdbc.core.JdbcTemplate;
11 import org.springframework.stereotype.Component;
12 import org.springframework.transaction.annotation.Propagation;
13 import org.springframework.transaction.annotation.Transactional;
14 import org.springframework.transaction.event.TransactionalEventListener;
15
16 import javax.annotation.PreDestroy;
17 import java.util.*;
18 import java.util.concurrent.TimeUnit;
19
20 @Component
21 public class OutboxListener {
22
23     private static final Logger LOG = LoggerFactory.getLogger(OutboxListener.class);
24
25
26     private final JdbcTemplate jdbcTemplate;
27     private final String topic;
28     private final KafkaProducer<Long, String> producer;
29
30     private long last = 0;
31
32     public OutboxListener(
33             JdbcTemplate jdbcTemplate,
34             String bootstrapServers,
35             String topic)
36     {
37         this.jdbcTemplate = jdbcTemplate;
38         this.topic = topic;
39
40         Properties props = new Properties();
41         props.put("bootstrap.servers", bootstrapServers);
42         props.put("key.serializer", LongSerializer.class.getName());
43         props.put("value.serializer", StringSerializer.class.getName());
44         producer = new KafkaProducer<>(props);
45     }
46
47
48     @TransactionalEventListener
49     @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
50     public synchronized void onOutboxEvent(UserEvent userEvent)
51     {
52         List<Map<String, Object>> result =
53             jdbcTemplate.queryForList(
54                     "SELECT id, event, username FROM events WHERE id > ? ORDER BY id ASC",
55                     last);
56
57         try
58         {
59             for (Map<String, Object> entry : result)
60             {
61                 Long id = (Long)entry.get("id");
62                 UserEvent.Type type = UserEvent.Type.ofInt((Short)entry.get("event"));
63                 String username = (String)entry.get("username");
64                 String event = username + ":" + type.name();
65
66                 LOG.debug("Sending event {} ({})", id, event);
67
68                 ProducerRecord<Long, String> record = new ProducerRecord<>(topic, id, event);
69                 producer.send(record, (metadata, e) -> {
70                     if (e != null) {
71                         LOG.error("Could not send event {} ({}): ", id, event, e);
72                     }
73                     else {
74                         LOG.debug(
75                                 "Send event {} ({}) with offset {} to partition {}",
76                                 id,
77                                 event,
78                                 metadata.offset(),
79                                 metadata.partition());
80                         deleteOutboxEntry(id);
81                     }
82                 });
83
84                 last = id;
85             }
86         }
87         catch (Exception e) {
88             throw new RuntimeException("Fehler beim Senden des Events", e);
89         }
90     }
91
92     @Transactional
93     void deleteOutboxEntry(Long id)
94     {
95         try
96         {
97             int result = jdbcTemplate.update("DELETE FROM events WHERE id = ?", id);
98             LOG.debug("entry {} {} from outbox", id, result == 1 ? "deleted" : "has already been deleted");
99         }
100         catch (DataAccessException e)
101         {
102             LOG.error("Execption while deleting row from outbox: {}!", id, e);
103         }
104     }
105
106     @PreDestroy
107     public void stop(){
108         LOG.info("Closing the KafkaProducer...");
109         try {
110             producer.close(5, TimeUnit.SECONDS);
111             LOG.debug("Successfully closed the KafkaProducer");
112         }
113         catch (Exception e) {
114             LOG.warn("Exception while closing the KafkaProducer!", e);
115         }
116     }
117 }