projects
/
demos
/
spring
/
data-jdbc
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
WIP
[demos/spring/data-jdbc]
/
src
/
main
/
java
/
de
/
juplo
/
kafka
/
outbox
/
polling
/
OutboxListener.java
diff --git
a/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java
b/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java
index
d8f9643
..
5b731a2
100644
(file)
--- a/
src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java
+++ b/
src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java
@@
-8,7
+8,6
@@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@@
-28,6
+27,7
@@
public class OutboxListener {
private final String topic;
private final KafkaProducer<Long, String> producer;
private final String topic;
private final KafkaProducer<Long, String> producer;
+ private long last = 0;
public OutboxListener(
JdbcTemplate jdbcTemplate,
public OutboxListener(
JdbcTemplate jdbcTemplate,
@@
-47,12
+47,15
@@
public class OutboxListener {
@TransactionalEventListener
@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
@TransactionalEventListener
@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
- public void onOutboxEvent(UserEvent userEvent)
+ public
synchronized
void onOutboxEvent(UserEvent userEvent)
{
List<Map<String, Object>> result =
{
List<Map<String, Object>> result =
- jdbcTemplate.queryForList("SELECT id, event, username FROM events ORDER BY id ASC");
+ jdbcTemplate.queryForList(
+ "SELECT id, event, username FROM events WHERE id > ? ORDER BY id ASC",
+ last);
- try {
+ try
+ {
for (Map<String, Object> entry : result)
{
Long id = (Long)entry.get("id");
for (Map<String, Object> entry : result)
{
Long id = (Long)entry.get("id");
@@
-75,9
+78,11
@@
public class OutboxListener {
deleteOutboxEntry(id);
}
});
deleteOutboxEntry(id);
}
});
- }
- } catch (Exception e) {
+ last = id;
+ }
+ }
+ catch (Exception e) {
throw new RuntimeException("Fehler beim Senden des Events", e);
}
}
throw new RuntimeException("Fehler beim Senden des Events", e);
}
}