--- /dev/null
+package de.juplo.kafka.outbox.polling;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.event.TransactionalEventListener;
+
+import javax.annotation.PreDestroy;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class OutboxListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OutboxListener.class);
+
+
+ private final JdbcTemplate jdbcTemplate;
+ private final String topic;
+ private final KafkaProducer<Long, String> producer;
+
+
+ public OutboxListener(
+ JdbcTemplate jdbcTemplate,
+ String bootstrapServers,
+ String topic)
+ {
+ this.jdbcTemplate = jdbcTemplate;
+ this.topic = topic;
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", bootstrapServers);
+ props.put("key.serializer", LongSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+ producer = new KafkaProducer<>(props);
+ }
+
+
+ @TransactionalEventListener
+ @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
+ public void onOutboxEvent(UserEvent userEvent)
+ {
+ List<Map<String, Object>> result =
+ jdbcTemplate.queryForList("SELECT id, event, username FROM events ORDER BY id ASC");
+
+ try {
+ for (Map<String, Object> entry : result)
+ {
+ Long id = (Long)entry.get("id");
+ UserEvent.Type type = UserEvent.Type.ofInt((Short)entry.get("event"));
+ String username = (String)entry.get("username");
+ String event = username + ":" + type.name();
+
+ ProducerRecord<Long, String> record = new ProducerRecord<>(topic, id, event);
+ producer.send(record, (metadata, e) -> {
+ if (e != null) {
+ LOG.error("Could not send event {} ({}): ", id, event, e);
+ }
+ else {
+ LOG.debug(
+ "Send event {} ({}) with offset {} to partition {}",
+ id,
+ event,
+ metadata.offset(),
+ metadata.partition());
+ deleteOutboxEntry(id);
+ }
+ });
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException("Fehler beim Senden des Events", e);
+ }
+ }
+
+ @Transactional
+ void deleteOutboxEntry(Long id)
+ {
+ try
+ {
+ int result = jdbcTemplate.update("DELETE FROM events WHERE id = ?", id);
+ LOG.debug("entry {} {} from outbox", id, result == 1 ? "deleted" : "has already been deleted");
+ }
+ catch (DataAccessException e)
+ {
+ LOG.error("Execption while deleting row from outbox: {}!", id, e);
+ }
+ }
+
+ @PreDestroy
+ public void stop(){
+ LOG.info("Closing the KafkaProducer...");
+ try {
+ producer.close(5, TimeUnit.SECONDS);
+ LOG.debug("Successfully closed the KafkaProducer");
+ }
+ catch (Exception e) {
+ LOG.warn("Exception while closing the KafkaProducer!", e);
+ }
+ }
+}