Sent messages are deleted individually after a crash
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxRepository.java
1 package de.juplo.kafka.outbox.delivery;
2
3 import lombok.AllArgsConstructor;
4 import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
5 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
6 import org.springframework.stereotype.Repository;
7
8 import java.sql.Timestamp;
9 import java.time.ZonedDateTime;
10 import java.util.List;
11 import java.util.Set;
12
13
14 @Repository
15 @AllArgsConstructor
16 public class OutboxRepository
17 {
18   private static final String SQL_QUERY =
19       "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
20   private static final String SQL_UPDATE =
21       "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
22   private static final String SQL_DELETE =
23       "DELETE FROM outbox WHERE id IN (:ids)";
24
25   private final NamedParameterJdbcTemplate jdbcTemplate;
26
27
28   public void save(String key, String value, ZonedDateTime issued)
29   {
30     MapSqlParameterSource parameters = new MapSqlParameterSource();
31     parameters.addValue("key", key);
32     parameters.addValue("value", value);
33     parameters.addValue("issued", Timestamp.from(issued.toInstant()));
34     jdbcTemplate.update(SQL_UPDATE, parameters);
35   }
36
37   public int delete(Set<Long> ids)
38   {
39     if (ids == null || ids.isEmpty())
40       return 0;
41     MapSqlParameterSource parameters = new MapSqlParameterSource();
42     parameters.addValue("ids", ids);
43     return jdbcTemplate.update(SQL_DELETE, parameters);
44   }
45
46   public List<OutboxItem> fetch(Long sequenceNumber)
47   {
48     MapSqlParameterSource parameters = new MapSqlParameterSource();
49     parameters.addValue("sequenceNumber", sequenceNumber);
50     return
51         jdbcTemplate.query(
52             SQL_QUERY,
53             parameters,
54             (resultSet, rowNumber) ->
55             {
56               return
57                   OutboxItem
58                       .builder()
59                       .sequenceNumber(resultSet.getLong(1))
60                       .key(resultSet.getString(2))
61                       .value(resultSet.getString(3))
62                       .build();
63             });
64   }
65 }