WIP
[demos/kafka/outbox] / src / main / java / de / juplo / kafka / outbox / polling / OutboxListener.java
1 package de.juplo.kafka.outbox.polling;
2
3 import org.apache.kafka.clients.producer.KafkaProducer;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.LongSerializer;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.dao.DataAccessException;
10 import org.springframework.jdbc.core.JdbcTemplate;
11 import org.springframework.jdbc.core.namedparam.SqlParameterSource;
12 import org.springframework.stereotype.Component;
13 import org.springframework.transaction.annotation.Propagation;
14 import org.springframework.transaction.annotation.Transactional;
15 import org.springframework.transaction.event.TransactionalEventListener;
16
17 import javax.annotation.PreDestroy;
18 import java.util.*;
19 import java.util.concurrent.TimeUnit;
20
21 @Component
22 public class OutboxListener {
23
24     private static final Logger LOG = LoggerFactory.getLogger(OutboxListener.class);
25
26
27     private final JdbcTemplate jdbcTemplate;
28     private final String topic;
29     private final KafkaProducer<Long, String> producer;
30
31
32     public OutboxListener(
33             JdbcTemplate jdbcTemplate,
34             String bootstrapServers,
35             String topic)
36     {
37         this.jdbcTemplate = jdbcTemplate;
38         this.topic = topic;
39
40         Properties props = new Properties();
41         props.put("bootstrap.servers", bootstrapServers);
42         props.put("key.serializer", LongSerializer.class.getName());
43         props.put("value.serializer", StringSerializer.class.getName());
44         producer = new KafkaProducer<>(props);
45     }
46
47
48     @TransactionalEventListener
49     @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
50     public void onOutboxEvent(UserEvent userEvent)
51     {
52         List<Map<String, Object>> result =
53             jdbcTemplate.queryForList("SELECT id, event, username FROM events ORDER BY id ASC");
54
55         try {
56             for (Map<String, Object> entry : result)
57             {
58                 Long id = (Long)entry.get("id");
59                 UserEvent.Type type = UserEvent.Type.ofInt((Short)entry.get("event"));
60                 String username = (String)entry.get("username");
61                 String event = username + ":" + type.name();
62
63                 ProducerRecord<Long, String> record = new ProducerRecord<>(topic, id, event);
64                 producer.send(record, (metadata, e) -> {
65                     if (e != null) {
66                         LOG.error("Could not send event {} ({}): ", id, event, e);
67                     }
68                     else {
69                         LOG.debug(
70                                 "Send event {} ({}) with offset {} to partition {}",
71                                 id,
72                                 event,
73                                 metadata.offset(),
74                                 metadata.partition());
75                         deleteOutboxEntry(id);
76                     }
77                 });
78             }
79
80         } catch (Exception e) {
81             throw new RuntimeException("Fehler beim Senden des Events", e);
82         }
83     }
84
85     @Transactional
86     void deleteOutboxEntry(Long id)
87     {
88         try
89         {
90             int result = jdbcTemplate.update("DELETE FROM events WHERE id = ?", id);
91             LOG.debug("entry {} {} from outbox", id, result == 1 ? "deleted" : "has already been deleted");
92         }
93         catch (DataAccessException e)
94         {
95             LOG.error("Execption while deleting row from outbox: {}!", id, e);
96         }
97     }
98
99     @PreDestroy
100     public void stop(){
101         LOG.info("Closing the KafkaProducer...");
102         try {
103             producer.close(5, TimeUnit.SECONDS);
104             LOG.debug("Successfully closed the KafkaProducer");
105         }
106         catch (Exception e) {
107             LOG.warn("Exception while closing the KafkaProducer!", e);
108         }
109     }
110 }