X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fpolling%2FOutboxListener.java;h=5b731a254eb9e348f0d699b7c234c21032a2f273;hb=4a66e93b1577844adbe86029c2901576539870c6;hp=d8f964392e020b404553680b57299e3e8fd1139a;hpb=de37a02d8dccae426557c3600a2d18f1c4293985;p=demos%2Fspring%2Fdata-jdbc diff --git a/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java b/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java index d8f9643..5b731a2 100644 --- a/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java +++ b/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java @@ -8,7 +8,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.namedparam.SqlParameterSource; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -28,6 +27,7 @@ public class OutboxListener { private final String topic; private final KafkaProducer producer; + private long last = 0; public OutboxListener( JdbcTemplate jdbcTemplate, @@ -47,12 +47,15 @@ public class OutboxListener { @TransactionalEventListener @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true) - public void onOutboxEvent(UserEvent userEvent) + public synchronized void onOutboxEvent(UserEvent userEvent) { List> result = - jdbcTemplate.queryForList("SELECT id, event, username FROM events ORDER BY id ASC"); + jdbcTemplate.queryForList( + "SELECT id, event, username FROM events WHERE id > ? ORDER BY id ASC", + last); - try { + try + { for (Map entry : result) { Long id = (Long)entry.get("id"); @@ -75,9 +78,11 @@ public class OutboxListener { deleteOutboxEntry(id); } }); - } - } catch (Exception e) { + last = id; + } + } + catch (Exception e) { throw new RuntimeException("Fehler beim Senden des Events", e); } }