From: Kai Moritz Date: Wed, 16 Sep 2020 20:23:19 +0000 (+0200) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=4a66e93b1577844adbe86029c2901576539870c6;p=demos%2Fspring%2Fdata-jdbc WIP --- diff --git a/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java b/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java index d8f9643..5b731a2 100644 --- a/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java +++ b/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java @@ -8,7 +8,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.namedparam.SqlParameterSource; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -28,6 +27,7 @@ public class OutboxListener { private final String topic; private final KafkaProducer producer; + private long last = 0; public OutboxListener( JdbcTemplate jdbcTemplate, @@ -47,12 +47,15 @@ public class OutboxListener { @TransactionalEventListener @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true) - public void onOutboxEvent(UserEvent userEvent) + public synchronized void onOutboxEvent(UserEvent userEvent) { List> result = - jdbcTemplate.queryForList("SELECT id, event, username FROM events ORDER BY id ASC"); + jdbcTemplate.queryForList( + "SELECT id, event, username FROM events WHERE id > ? ORDER BY id ASC", + last); - try { + try + { for (Map entry : result) { Long id = (Long)entry.get("id"); @@ -75,9 +78,11 @@ public class OutboxListener { deleteOutboxEntry(id); } }); - } - } catch (Exception e) { + last = id; + } + } + catch (Exception e) { throw new RuntimeException("Fehler beim Senden des Events", e); } }