projects
/
demos
/
spring
/
data-jdbc
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (from parent 1:
de37a02
)
WIP
author
Kai Moritz
<kai@juplo.de>
Wed, 16 Sep 2020 20:23:19 +0000
(22:23 +0200)
committer
Kai Moritz
<kai@juplo.de>
Sat, 24 Oct 2020 08:59:33 +0000
(10:59 +0200)
src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java
b/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java
index
d8f9643
..
5b731a2
100644
(file)
--- a/
src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java
+++ b/
src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java
@@
-8,7
+8,6
@@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@@
-28,6
+27,7
@@
public class OutboxListener {
private final String topic;
private final KafkaProducer<Long, String> producer;
private final String topic;
private final KafkaProducer<Long, String> producer;
+ private long last = 0;
public OutboxListener(
JdbcTemplate jdbcTemplate,
public OutboxListener(
JdbcTemplate jdbcTemplate,
@@
-47,12
+47,15
@@
public class OutboxListener {
@TransactionalEventListener
@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
@TransactionalEventListener
@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
- public void onOutboxEvent(UserEvent userEvent)
+ public
synchronized
void onOutboxEvent(UserEvent userEvent)
{
List<Map<String, Object>> result =
{
List<Map<String, Object>> result =
- jdbcTemplate.queryForList("SELECT id, event, username FROM events ORDER BY id ASC");
+ jdbcTemplate.queryForList(
+ "SELECT id, event, username FROM events WHERE id > ? ORDER BY id ASC",
+ last);
- try {
+ try
+ {
for (Map<String, Object> entry : result)
{
Long id = (Long)entry.get("id");
for (Map<String, Object> entry : result)
{
Long id = (Long)entry.get("id");
@@
-75,9
+78,11
@@
public class OutboxListener {
deleteOutboxEntry(id);
}
});
deleteOutboxEntry(id);
}
});
- }
- } catch (Exception e) {
+ last = id;
+ }
+ }
+ catch (Exception e) {
throw new RuntimeException("Fehler beim Senden des Events", e);
}
}
throw new RuntimeException("Fehler beim Senden des Events", e);
}
}