WIP
[demos/spring/data-jdbc] / src / main / java / de / juplo / kafka / outbox / polling / OutboxListener.java
diff --git a/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java b/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java
new file mode 100644 (file)
index 0000000..d8f9643
--- /dev/null
@@ -0,0 +1,110 @@
+package de.juplo.kafka.outbox.polling;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.event.TransactionalEventListener;
+
+import javax.annotation.PreDestroy;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class OutboxListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OutboxListener.class);
+
+
+    private final JdbcTemplate jdbcTemplate;
+    private final String topic;
+    private final KafkaProducer<Long, String> producer;
+
+
+    public OutboxListener(
+            JdbcTemplate jdbcTemplate,
+            String bootstrapServers,
+            String topic)
+    {
+        this.jdbcTemplate = jdbcTemplate;
+        this.topic = topic;
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", bootstrapServers);
+        props.put("key.serializer", LongSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+        producer = new KafkaProducer<>(props);
+    }
+
+
+    @TransactionalEventListener
+    @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
+    public void onOutboxEvent(UserEvent userEvent)
+    {
+        List<Map<String, Object>> result =
+            jdbcTemplate.queryForList("SELECT id, event, username FROM events ORDER BY id ASC");
+
+        try {
+            for (Map<String, Object> entry : result)
+            {
+                Long id = (Long)entry.get("id");
+                UserEvent.Type type = UserEvent.Type.ofInt((Short)entry.get("event"));
+                String username = (String)entry.get("username");
+                String event = username + ":" + type.name();
+
+                ProducerRecord<Long, String> record = new ProducerRecord<>(topic, id, event);
+                producer.send(record, (metadata, e) -> {
+                    if (e != null) {
+                        LOG.error("Could not send event {} ({}): ", id, event, e);
+                    }
+                    else {
+                        LOG.debug(
+                                "Send event {} ({}) with offset {} to partition {}",
+                                id,
+                                event,
+                                metadata.offset(),
+                                metadata.partition());
+                        deleteOutboxEntry(id);
+                    }
+                });
+            }
+
+        } catch (Exception e) {
+            throw new RuntimeException("Fehler beim Senden des Events", e);
+        }
+    }
+
+    @Transactional
+    void deleteOutboxEntry(Long id)
+    {
+        try
+        {
+            int result = jdbcTemplate.update("DELETE FROM events WHERE id = ?", id);
+            LOG.debug("entry {} {} from outbox", id, result == 1 ? "deleted" : "has already been deleted");
+        }
+        catch (DataAccessException e)
+        {
+            LOG.error("Execption while deleting row from outbox: {}!", id, e);
+        }
+    }
+
+    @PreDestroy
+    public void stop(){
+        LOG.info("Closing the KafkaProducer...");
+        try {
+            producer.close(5, TimeUnit.SECONDS);
+            LOG.debug("Successfully closed the KafkaProducer");
+        }
+        catch (Exception e) {
+            LOG.warn("Exception while closing the KafkaProducer!", e);
+        }
+    }
+}