1 package de.juplo.kafka.outbox.polling;
3 import org.apache.kafka.clients.producer.KafkaProducer;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.LongSerializer;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.dao.DataAccessException;
10 import org.springframework.jdbc.core.JdbcTemplate;
11 import org.springframework.jdbc.core.namedparam.SqlParameterSource;
12 import org.springframework.stereotype.Component;
13 import org.springframework.transaction.annotation.Propagation;
14 import org.springframework.transaction.annotation.Transactional;
15 import org.springframework.transaction.event.TransactionalEventListener;
17 import javax.annotation.PreDestroy;
19 import java.util.concurrent.TimeUnit;
22 public class OutboxListener {
24 private static final Logger LOG = LoggerFactory.getLogger(OutboxListener.class);
27 private final JdbcTemplate jdbcTemplate;
28 private final String topic;
29 private final KafkaProducer<Long, String> producer;
32 public OutboxListener(
33 JdbcTemplate jdbcTemplate,
34 String bootstrapServers,
37 this.jdbcTemplate = jdbcTemplate;
40 Properties props = new Properties();
41 props.put("bootstrap.servers", bootstrapServers);
42 props.put("key.serializer", LongSerializer.class.getName());
43 props.put("value.serializer", StringSerializer.class.getName());
44 producer = new KafkaProducer<>(props);
48 @TransactionalEventListener
49 @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
50 public void onOutboxEvent(UserEvent userEvent)
52 List<Map<String, Object>> result =
53 jdbcTemplate.queryForList("SELECT id, event, username FROM events ORDER BY id ASC");
56 for (Map<String, Object> entry : result)
58 Long id = (Long)entry.get("id");
59 UserEvent.Type type = UserEvent.Type.ofInt((Short)entry.get("event"));
60 String username = (String)entry.get("username");
61 String event = username + ":" + type.name();
63 ProducerRecord<Long, String> record = new ProducerRecord<>(topic, id, event);
64 producer.send(record, (metadata, e) -> {
66 LOG.error("Could not send event {} ({}): ", id, event, e);
70 "Send event {} ({}) with offset {} to partition {}",
74 metadata.partition());
75 deleteOutboxEntry(id);
80 } catch (Exception e) {
81 throw new RuntimeException("Fehler beim Senden des Events", e);
86 void deleteOutboxEntry(Long id)
90 int result = jdbcTemplate.update("DELETE FROM events WHERE id = ?", id);
91 LOG.debug("entry {} {} from outbox", id, result == 1 ? "deleted" : "has already been deleted");
93 catch (DataAccessException e)
95 LOG.error("Execption while deleting row from outbox: {}!", id, e);
101 LOG.info("Closing the KafkaProducer...");
103 producer.close(5, TimeUnit.SECONDS);
104 LOG.debug("Successfully closed the KafkaProducer");
106 catch (Exception e) {
107 LOG.warn("Exception while closing the KafkaProducer!", e);