1 package de.juplo.kafka.outbox.delivery;
3 import lombok.AllArgsConstructor;
4 import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
5 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
6 import org.springframework.stereotype.Repository;
8 import java.sql.Timestamp;
9 import java.time.ZonedDateTime;
10 import java.util.List;
15 public class OutboxRepository
17 private static final String SQL_QUERY =
18 "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
19 private static final String SQL_UPDATE =
20 "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
21 private static final String SQL_DELETE =
22 "DELETE FROM outbox WHERE id <= :id";
24 private final NamedParameterJdbcTemplate jdbcTemplate;
27 public void save(String key, String value, ZonedDateTime issued)
29 MapSqlParameterSource parameters = new MapSqlParameterSource();
30 parameters.addValue("key", key);
31 parameters.addValue("value", value);
32 parameters.addValue("issued", Timestamp.from(issued.toInstant()));
33 jdbcTemplate.update(SQL_UPDATE, parameters);
36 public int delete(Long id)
38 MapSqlParameterSource parameters = new MapSqlParameterSource();
39 parameters.addValue("id", id);
40 return jdbcTemplate.update(SQL_DELETE, parameters);
43 public List<OutboxItem> fetch(Long sequenceNumber)
45 MapSqlParameterSource parameters = new MapSqlParameterSource();
46 parameters.addValue("sequenceNumber", sequenceNumber);
51 (resultSet, rowNumber) ->
56 .sequenceNumber(resultSet.getLong(1))
57 .key(resultSet.getString(2))
58 .value(resultSet.getString(3))