From: Kai Moritz Date: Sat, 31 Oct 2020 16:03:24 +0000 (+0100) Subject: Added a simple implementation of the polling outbox pattern X-Git-Tag: part-1~12 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=5df5895e4ec46756d335ea35c92c11146072c4d1;p=demos%2Fkafka%2Foutbox Added a simple implementation of the polling outbox pattern --- diff --git a/docker-compose.yml b/docker-compose.yml index bb11901..0125f95 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,6 +31,14 @@ services: depends_on: - postgres + outbox: + image: polling-outbox:latest + environment: + spring.profiles.active: prod + depends_on: + - postgres + - kafka + postgres: image: postgres:13 diff --git a/outbox/.dockerignore b/outbox/.dockerignore new file mode 100644 index 0000000..1ad9963 --- /dev/null +++ b/outbox/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/outbox/Dockerfile b/outbox/Dockerfile new file mode 100644 index 0000000..16ee25e --- /dev/null +++ b/outbox/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:11-jre +VOLUME /tmp +COPY target/*.jar /opt/app.jar +ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] +CMD [] diff --git a/outbox/pom.xml b/outbox/pom.xml new file mode 100644 index 0000000..f904537 --- /dev/null +++ b/outbox/pom.xml @@ -0,0 +1,89 @@ + + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.3.2.RELEASE + + + + de.juplo.kafka.outbox + polling-outbox + 0.0.1-SNAPSHOT + polling-outbox + Simple example-implementation of the Polling-Outbox-Pattern + + + 11 + + + + + org.springframework.boot + spring-boot-starter-data-jdbc + + + org.springframework.boot + spring-boot-starter-json + + + org.apache.kafka + kafka-clients + + + org.projectlombok + lombok + + + org.postgresql + postgresql + + + org.springframework.boot + spring-boot-starter-test + test + + + com.h2database + h2 + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + io.fabric8 + docker-maven-plugin + 0.33.0 + + + + %a:%l + + + + + + build + package + + build + + + + + + + + diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/Application.java b/outbox/src/main/java/de/juplo/kafka/outbox/Application.java new file mode 100644 index 0000000..a63d714 --- /dev/null +++ b/outbox/src/main/java/de/juplo/kafka/outbox/Application.java @@ -0,0 +1,36 @@ +package de.juplo.kafka.outbox; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +import java.util.concurrent.CountDownLatch; + + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +@Slf4j +public class Application +{ + public static void main(String[] args) throws Exception + { + SpringApplication.run(Application.class, args); + + final CountDownLatch closeLatch = new CountDownLatch(1); + + Runtime + .getRuntime() + .addShutdownHook(new Thread() + { + @Override + public void run() + { + log.info("Closing application..."); + closeLatch.countDown(); + } + }); + + closeLatch.await(); + } +} diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java b/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java new file mode 100644 index 0000000..1a5dca6 --- /dev/null +++ b/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.outbox; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("de.juplo.kafka.outbox") +@Getter +@Setter +public class ApplicationProperties +{ + String bootstrapServers = "localhost:9092"; + String topic = "outbox"; +} diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java new file mode 100644 index 0000000..99deafa --- /dev/null +++ b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.outbox; + +import lombok.Builder; +import lombok.Data; +import lombok.Value; + + +@Data +@Value +@Builder +public class OutboxItem +{ + private final Long sequenceNumber; + private final String key; + private final String value; +} diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java new file mode 100644 index 0000000..627ca05 --- /dev/null +++ b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java @@ -0,0 +1,107 @@ +package de.juplo.kafka.outbox; + +import org.apache.kafka.common.serialization.StringSerializer; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + + +@Component +public class OutboxProducer +{ + final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class); + + + private final OutboxRepository repository; + private final KafkaProducer producer; + private final String topic; + + private long sequenceNumber = 0l; + + public OutboxProducer( + ApplicationProperties properties, + OutboxRepository repository) + { + this.repository = repository; + + Properties props = new Properties(); + props.put("bootstrap.servers", properties.bootstrapServers); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + this.producer = new KafkaProducer<>(props); + this.topic = properties.topic; + } + + @Scheduled(fixedDelay = 500) + public void poll() + { + List items; + do + { + items = repository.fetch(sequenceNumber); + LOG.debug("Polled {} new items", items.size()); + for (OutboxItem item : items) + send(item); + } + while (items.size() > 0); + } + + void send(OutboxItem item) + { + final ProducerRecord record = + new ProducerRecord<>(topic, item.getKey(), item.getValue()); + + sequenceNumber = item.getSequenceNumber(); + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(item.getSequenceNumber()); + record.headers().add("SEQ#", buffer.array()); + + producer.send(record, (metadata, e) -> + { + if (metadata != null) + { + int deleted = repository.delete(item.getSequenceNumber()); + LOG.info( + "{}/{}:{} - {}:{}={} - deleted: {}", + metadata.topic(), + metadata.partition(), + metadata.offset(), + item.getSequenceNumber(), + record.key(), + record.value(), + deleted); + } + else + { + // HANDLE ERROR + LOG.error( + "{}/{} - {}:{}={} -> ", + record.topic(), + record.partition(), + item.getSequenceNumber(), + record.key(), + record.value(), + e); + } + }); + } + + + @PreDestroy + public void close() + { + producer.close(Duration.ofSeconds(5)); + } +} diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java new file mode 100644 index 0000000..03a68ef --- /dev/null +++ b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java @@ -0,0 +1,62 @@ +package de.juplo.kafka.outbox; + +import lombok.AllArgsConstructor; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; +import java.time.ZonedDateTime; +import java.util.List; + + +@Repository +@AllArgsConstructor +public class OutboxRepository +{ + private static final String SQL_QUERY = + "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC"; + private static final String SQL_UPDATE = + "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)"; + private static final String SQL_DELETE = + "DELETE FROM outbox WHERE id = :id"; + + private final NamedParameterJdbcTemplate jdbcTemplate; + + + public void save(String key, String value, ZonedDateTime issued) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("key", key); + parameters.addValue("value", value); + parameters.addValue("issued", Timestamp.from(issued.toInstant())); + jdbcTemplate.update(SQL_UPDATE, parameters); + } + + public int delete(Long id) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("id", id); + return jdbcTemplate.update(SQL_DELETE, parameters); + } + + public List fetch(Long sequenceNumber) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("sequenceNumber", sequenceNumber); + return + jdbcTemplate.query( + SQL_QUERY, + parameters, + (resultSet, rowNumber) -> + { + return + OutboxItem + .builder() + .sequenceNumber(resultSet.getLong(1)) + .key(resultSet.getString(2)) + .value(resultSet.getString(3)) + .build(); + }); + } +} diff --git a/outbox/src/main/resources/application.yml b/outbox/src/main/resources/application.yml new file mode 100644 index 0000000..2a8502a --- /dev/null +++ b/outbox/src/main/resources/application.yml @@ -0,0 +1,53 @@ +management: + endpoints: + web: + exposure: + include: "*" + +spring: + flyway: + locations: classpath:db/migration/h2 + +logging: + level: + de: + juplo: + kafka: + outbox: DEBUG + +--- + +spring: + profiles: prod + + datasource: + url: jdbc:postgresql://postgres:5432/outbox + username: outbox + password: outbox + flyway: + locations: classpath:db/migration/postgres + +de: + juplo: + kafka: + outbox: + bootstrap-servers: kafka:9093 + +--- + +spring: + profiles: dev + + datasource: + url: jdbc:postgresql://localhost:5432/outbox + username: outbox + password: outbox + flyway: + locations: classpath:db/migration/postgres + +de: + juplo: + kafka: + outbox: + bootstrap-servers: localhost:9092 + diff --git a/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java b/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java new file mode 100644 index 0000000..cb5bc43 --- /dev/null +++ b/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.outbox; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@SpringBootTest +public class ApplicationTests +{ + @Test + public void contextLoads() + { + } +} diff --git a/pom.xml b/pom.xml index d11a4d7..a270735 100644 --- a/pom.xml +++ b/pom.xml @@ -15,6 +15,7 @@ jdbc + outbox