import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
private final String topic;
private final KafkaProducer<Long, String> producer;
+ private long last = 0;
public OutboxListener(
JdbcTemplate jdbcTemplate,
@TransactionalEventListener
@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
- public void onOutboxEvent(UserEvent userEvent)
+ public synchronized void onOutboxEvent(UserEvent userEvent)
{
List<Map<String, Object>> result =
- jdbcTemplate.queryForList("SELECT id, event, username FROM events ORDER BY id ASC");
+ jdbcTemplate.queryForList(
+ "SELECT id, event, username FROM events WHERE id > ? ORDER BY id ASC",
+ last);
- try {
+ try
+ {
for (Map<String, Object> entry : result)
{
Long id = (Long)entry.get("id");
deleteOutboxEntry(id);
}
});
- }
- } catch (Exception e) {
+ last = id;
+ }
+ }
+ catch (Exception e) {
throw new RuntimeException("Fehler beim Senden des Events", e);
}
}