1 package de.juplo.kafka.outbox.polling;
3 import org.apache.kafka.clients.producer.KafkaProducer;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.LongSerializer;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.dao.DataAccessException;
10 import org.springframework.jdbc.core.JdbcTemplate;
11 import org.springframework.stereotype.Component;
12 import org.springframework.transaction.annotation.Propagation;
13 import org.springframework.transaction.annotation.Transactional;
14 import org.springframework.transaction.event.TransactionalEventListener;
16 import javax.annotation.PreDestroy;
18 import java.util.concurrent.TimeUnit;
21 public class OutboxListener {
23 private static final Logger LOG = LoggerFactory.getLogger(OutboxListener.class);
26 private final JdbcTemplate jdbcTemplate;
27 private final String topic;
28 private final KafkaProducer<Long, String> producer;
30 private long last = 0;
32 public OutboxListener(
33 JdbcTemplate jdbcTemplate,
34 String bootstrapServers,
37 this.jdbcTemplate = jdbcTemplate;
40 Properties props = new Properties();
41 props.put("bootstrap.servers", bootstrapServers);
42 props.put("key.serializer", LongSerializer.class.getName());
43 props.put("value.serializer", StringSerializer.class.getName());
44 producer = new KafkaProducer<>(props);
48 @TransactionalEventListener
49 @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
50 public synchronized void onOutboxEvent(UserEvent userEvent)
52 List<Map<String, Object>> result =
53 jdbcTemplate.queryForList(
54 "SELECT id, event, username FROM events WHERE id > ? ORDER BY id ASC",
59 for (Map<String, Object> entry : result)
61 Long id = (Long)entry.get("id");
62 UserEvent.Type type = UserEvent.Type.ofInt((Short)entry.get("event"));
63 String username = (String)entry.get("username");
64 String event = username + ":" + type.name();
66 LOG.debug("Sending event {} ({})", id, event);
68 ProducerRecord<Long, String> record = new ProducerRecord<>(topic, id, event);
69 producer.send(record, (metadata, e) -> {
71 LOG.error("Could not send event {} ({}): ", id, event, e);
75 "Send event {} ({}) with offset {} to partition {}",
79 metadata.partition());
80 deleteOutboxEntry(id);
88 throw new RuntimeException("Fehler beim Senden des Events", e);
93 void deleteOutboxEntry(Long id)
97 int result = jdbcTemplate.update("DELETE FROM events WHERE id = ?", id);
98 LOG.debug("entry {} {} from outbox", id, result == 1 ? "deleted" : "has already been deleted");
100 catch (DataAccessException e)
102 LOG.error("Execption while deleting row from outbox: {}!", id, e);
108 LOG.info("Closing the KafkaProducer...");
110 producer.close(5, TimeUnit.SECONDS);
111 LOG.debug("Successfully closed the KafkaProducer");
113 catch (Exception e) {
114 LOG.warn("Exception while closing the KafkaProducer!", e);