1 package de.juplo.kafka.outbox.delivery;
3 import lombok.AllArgsConstructor;
4 import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
5 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
6 import org.springframework.stereotype.Repository;
8 import java.sql.Timestamp;
9 import java.time.ZonedDateTime;
10 import java.util.List;
16 public class OutboxRepository
18 private static final String SQL_QUERY =
19 "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
20 private static final String SQL_UPDATE =
21 "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
22 private static final String SQL_DELETE =
23 "DELETE FROM outbox WHERE id IN (:ids)";
25 private final NamedParameterJdbcTemplate jdbcTemplate;
28 public void save(String key, String value, ZonedDateTime issued)
30 MapSqlParameterSource parameters = new MapSqlParameterSource();
31 parameters.addValue("key", key);
32 parameters.addValue("value", value);
33 parameters.addValue("issued", Timestamp.from(issued.toInstant()));
34 jdbcTemplate.update(SQL_UPDATE, parameters);
37 public int delete(Set<Long> ids)
39 if (ids == null || ids.isEmpty())
41 MapSqlParameterSource parameters = new MapSqlParameterSource();
42 parameters.addValue("ids", ids);
43 return jdbcTemplate.update(SQL_DELETE, parameters);
46 public List<OutboxItem> fetch(Long sequenceNumber)
48 MapSqlParameterSource parameters = new MapSqlParameterSource();
49 parameters.addValue("sequenceNumber", sequenceNumber);
54 (resultSet, rowNumber) ->
59 .sequenceNumber(resultSet.getLong(1))
60 .key(resultSet.getString(2))
61 .value(resultSet.getString(3))