WIP
authorKai Moritz <kai@juplo.de>
Wed, 16 Sep 2020 20:23:19 +0000 (22:23 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 24 Oct 2020 08:59:33 +0000 (10:59 +0200)
src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java

index d8f9643..5b731a2 100644 (file)
@@ -8,7 +8,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
@@ -28,6 +27,7 @@ public class OutboxListener {
     private final String topic;
     private final KafkaProducer<Long, String> producer;
 
+    private long last = 0;
 
     public OutboxListener(
             JdbcTemplate jdbcTemplate,
@@ -47,12 +47,15 @@ public class OutboxListener {
 
     @TransactionalEventListener
     @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
-    public void onOutboxEvent(UserEvent userEvent)
+    public synchronized void onOutboxEvent(UserEvent userEvent)
     {
         List<Map<String, Object>> result =
-            jdbcTemplate.queryForList("SELECT id, event, username FROM events ORDER BY id ASC");
+            jdbcTemplate.queryForList(
+                    "SELECT id, event, username FROM events WHERE id > ? ORDER BY id ASC",
+                    last);
 
-        try {
+        try
+        {
             for (Map<String, Object> entry : result)
             {
                 Long id = (Long)entry.get("id");
@@ -75,9 +78,11 @@ public class OutboxListener {
                         deleteOutboxEntry(id);
                     }
                 });
-            }
 
-        } catch (Exception e) {
+                last = id;
+            }
+        }
+        catch (Exception e) {
             throw new RuntimeException("Fehler beim Senden des Events", e);
         }
     }