Moved postage of messages into a reusable standalone implementation
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxRepository.java
1 package de.juplo.kafka.outbox.delivery;
2
3 import lombok.AllArgsConstructor;
4 import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
5 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
6 import org.springframework.stereotype.Repository;
7
8 import java.sql.Timestamp;
9 import java.time.ZonedDateTime;
10 import java.util.List;
11
12
13 @Repository
14 @AllArgsConstructor
15 public class OutboxRepository
16 {
17   private static final String SQL_QUERY =
18       "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
19   private static final String SQL_UPDATE =
20       "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
21   private static final String SQL_DELETE =
22       "DELETE FROM outbox WHERE id = :id";
23
24   private final NamedParameterJdbcTemplate jdbcTemplate;
25
26
27   public void save(String key, String value, ZonedDateTime issued)
28   {
29     MapSqlParameterSource parameters = new MapSqlParameterSource();
30     parameters.addValue("key", key);
31     parameters.addValue("value", value);
32     parameters.addValue("issued", Timestamp.from(issued.toInstant()));
33     jdbcTemplate.update(SQL_UPDATE, parameters);
34   }
35
36   public int delete(Long id)
37   {
38     MapSqlParameterSource parameters = new MapSqlParameterSource();
39     parameters.addValue("id", id);
40     return jdbcTemplate.update(SQL_DELETE, parameters);
41   }
42
43   public List<OutboxItem> fetch(Long sequenceNumber)
44   {
45     MapSqlParameterSource parameters = new MapSqlParameterSource();
46     parameters.addValue("sequenceNumber", sequenceNumber);
47     return
48         jdbcTemplate.query(
49             SQL_QUERY,
50             parameters,
51             (resultSet, rowNumber) ->
52             {
53               return
54                   OutboxItem
55                       .builder()
56                       .sequenceNumber(resultSet.getLong(1))
57                       .key(resultSet.getString(2))
58                       .value(resultSet.getString(3))
59                       .build();
60             });
61   }
62 }