From 996911bbed45e0211e48976e3cb3971631361e5b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 May 2021 17:20:27 +0200 Subject: [PATCH] Added a simple implementation of the polling outbox pattern --- README.md | 26 ++--- README.sh | 21 +++- docker-compose.yml | 35 ++++++ outbox/.dockerignore | 2 + outbox/Dockerfile | 5 + outbox/pom.xml | 89 +++++++++++++++ .../de/juplo/kafka/outbox/Application.java | 36 ++++++ .../kafka/outbox/ApplicationProperties.java | 15 +++ .../de/juplo/kafka/outbox/OutboxItem.java | 16 +++ .../de/juplo/kafka/outbox/OutboxProducer.java | 107 ++++++++++++++++++ .../juplo/kafka/outbox/OutboxRepository.java | 62 ++++++++++ outbox/src/main/resources/application.yml | 53 +++++++++ .../juplo/kafka/outbox/ApplicationTests.java | 16 +++ pom.xml | 1 + 14 files changed, 468 insertions(+), 16 deletions(-) create mode 100644 outbox/.dockerignore create mode 100644 outbox/Dockerfile create mode 100644 outbox/pom.xml create mode 100644 outbox/src/main/java/de/juplo/kafka/outbox/Application.java create mode 100644 outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java create mode 100644 outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java create mode 100644 outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java create mode 100644 outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java create mode 100644 outbox/src/main/resources/application.yml create mode 100644 outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java diff --git a/README.md b/README.md index 2748ed9..2c61a4e 100644 --- a/README.md +++ b/README.md @@ -9,25 +9,25 @@ Execute [README.sh](README.sh) in a shell to demonstrate the example: The script will... -* compile the component, -* package it as Docker-Images, -* start up the component and a PostreSQL as containers in a [Compose-Setup](docker-compose.yml), +* compile the two components, +* package them as Docker-Images, +* start up the components and a minimal Kafka Cluster as containers in a [Compose-Setup](docker-compose.yml), * execute example-queries (CREATE / DELETE) against the API of [the example-project](https://juplo.de/implementing-the-outbox-pattern-with-kafka-part-0-the-example/) and -* tail the logs of the containers `jdbc` to show what is going on. +* tail the logs of the containers `jdbc` and `kafkacat` to show what is going on. You can verify the expected outcome of the demonstration by running a command like the following: - $ docker-compose exec postgres psql -Uoutbox -c'SELECT * FROM outbox;' -Ppager=0 outbox | grep peter - 1 | peter1 | "CREATED" | 2021-05-16 13:20:36.849 - 10 | peter2 | "CREATED" | 2021-05-16 13:20:42.141 - 19 | peter3 | "CREATED" | 2021-05-16 13:20:47.136 - 28 | peter4 | "CREATED" | 2021-05-16 13:20:52.087 - 37 | peter5 | "CREATED" | 2021-05-16 13:20:57.512 - 46 | peter6 | "CREATED" | 2021-05-16 13:21:02.493 - 55 | peter7 | "CREATED" | 2021-05-16 13:21:07.503 + $ docker-compose logs kafkacat | grep peter + kafkacat_1 | peter1:"CREATED" + kafkacat_1 | peter2:"CREATED" + kafkacat_1 | peter3:"CREATED" + kafkacat_1 | peter4:"CREATED" + kafkacat_1 | peter5:"CREATED" + kafkacat_1 | peter6:"CREATED" + kafkacat_1 | peter7:"CREATED" $ -The example-output shows, that the CREATE-event for users with "peter" in their username are only stored exactly once in the outbox-table, although the script issues several requests for each of these users. +The example-output shows, that the CREATE-event for users with "peter" in their username are only issued exactly once, although the script issues several requests for each of these users. Be aware, that the outcome of the script will be different, if you run it several times. In order to reproduce the same behaviour, you have to shut down the Compose-Setup before rerunning the script: diff --git a/README.sh b/README.sh index 89aa269..c9f133c 100755 --- a/README.sh +++ b/README.sh @@ -5,11 +5,15 @@ then docker-compose down -v mvn clean docker image rm juplo/data-jdbc:polling-outbox-2-SNAPSHOT + docker image rm juplo/polling-outbox:polling-outbox-2-SNAPSHOT exit fi +docker-compose up -d zookeeper kafka + if [[ $(docker image ls -q juplo/data-jdbc:polling-outbox-2-SNAPSHOT) == "" || + $(docker image ls -q juplo/polling-outbox:polling-outbox-2-SNAPSHOT) == "" || "$1" = "build" ]] then @@ -17,9 +21,19 @@ then else echo "Using image existing images:" docker image ls juplo/data-jdbc:polling-outbox-2-SNAPSHOT + docker image ls juplo/polling-outbox:polling-outbox-2-SNAPSHOT fi -docker-compose up -d jdbc +while ! [[ $(docker-compose exec kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; +do + echo "Waiting for kafka..."; + sleep 1; +done + +docker-compose exec kafka kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --replication-factor 1 --partitions 3 --topic outbox + + +docker-compose up -d jdbc outbox kafkacat while ! [[ $(http :8080/actuator/health 2>/dev/null | jq -r .status) == "UP" ]]; do @@ -28,7 +42,7 @@ do done -docker-compose logs --tail=0 -f jdbc & +docker-compose logs --tail=0 -f jdbc kafkacat & for i in `seq 1 7`; do @@ -49,4 +63,5 @@ do done; docker-compose exec postgres psql -Uoutbox -c'SELECT * FROM outbox;' -Ppager=0 outbox -docker-compose stop +# "kill" the executions of "docker-compose logs ..." +docker-compose stop jdbc kafkacat diff --git a/docker-compose.yml b/docker-compose.yml index d04c665..3600d5f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,6 +2,33 @@ version: "3" services: + zookeeper: + image: confluentinc/cp-zookeeper:6.0.1 + ports: + - 2181:2181 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: confluentinc/cp-kafka:6.0.1 + ports: + - 9092:9092 + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + depends_on: + - zookeeper + + kafkacat: + image: confluentinc/cp-kafkacat:6.0.1 + command: "kafkacat -C -b kafka:9093 -q -t outbox -K:" + tty: true + depends_on: + - kafka + jdbc: image: juplo/data-jdbc:polling-outbox-2-SNAPSHOT ports: @@ -11,6 +38,14 @@ services: depends_on: - postgres + outbox: + image: juplo/polling-outbox:polling-outbox-2-SNAPSHOT + environment: + spring.profiles.active: prod + depends_on: + - postgres + - kafka + postgres: image: postgres:13 diff --git a/outbox/.dockerignore b/outbox/.dockerignore new file mode 100644 index 0000000..1ad9963 --- /dev/null +++ b/outbox/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/outbox/Dockerfile b/outbox/Dockerfile new file mode 100644 index 0000000..16ee25e --- /dev/null +++ b/outbox/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:11-jre +VOLUME /tmp +COPY target/*.jar /opt/app.jar +ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] +CMD [] diff --git a/outbox/pom.xml b/outbox/pom.xml new file mode 100644 index 0000000..2d57a06 --- /dev/null +++ b/outbox/pom.xml @@ -0,0 +1,89 @@ + + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.3.2.RELEASE + + + + de.juplo.kafka.outbox + polling-outbox + polling-outbox-2-SNAPSHOT + Outbox (Polling) + Simple example-implementation of the Outbox-Pattern (polling variant) + + + 11 + + + + + org.springframework.boot + spring-boot-starter-data-jdbc + + + org.springframework.boot + spring-boot-starter-json + + + org.apache.kafka + kafka-clients + + + org.projectlombok + lombok + + + org.postgresql + postgresql + + + org.springframework.boot + spring-boot-starter-test + test + + + com.h2database + h2 + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + io.fabric8 + docker-maven-plugin + 0.33.0 + + + + juplo/%a:%v + + + + + + build + package + + build + + + + + + + + diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/Application.java b/outbox/src/main/java/de/juplo/kafka/outbox/Application.java new file mode 100644 index 0000000..a63d714 --- /dev/null +++ b/outbox/src/main/java/de/juplo/kafka/outbox/Application.java @@ -0,0 +1,36 @@ +package de.juplo.kafka.outbox; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +import java.util.concurrent.CountDownLatch; + + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +@Slf4j +public class Application +{ + public static void main(String[] args) throws Exception + { + SpringApplication.run(Application.class, args); + + final CountDownLatch closeLatch = new CountDownLatch(1); + + Runtime + .getRuntime() + .addShutdownHook(new Thread() + { + @Override + public void run() + { + log.info("Closing application..."); + closeLatch.countDown(); + } + }); + + closeLatch.await(); + } +} diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java b/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java new file mode 100644 index 0000000..1a5dca6 --- /dev/null +++ b/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.outbox; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("de.juplo.kafka.outbox") +@Getter +@Setter +public class ApplicationProperties +{ + String bootstrapServers = "localhost:9092"; + String topic = "outbox"; +} diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java new file mode 100644 index 0000000..99deafa --- /dev/null +++ b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.outbox; + +import lombok.Builder; +import lombok.Data; +import lombok.Value; + + +@Data +@Value +@Builder +public class OutboxItem +{ + private final Long sequenceNumber; + private final String key; + private final String value; +} diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java new file mode 100644 index 0000000..627ca05 --- /dev/null +++ b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java @@ -0,0 +1,107 @@ +package de.juplo.kafka.outbox; + +import org.apache.kafka.common.serialization.StringSerializer; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + + +@Component +public class OutboxProducer +{ + final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class); + + + private final OutboxRepository repository; + private final KafkaProducer producer; + private final String topic; + + private long sequenceNumber = 0l; + + public OutboxProducer( + ApplicationProperties properties, + OutboxRepository repository) + { + this.repository = repository; + + Properties props = new Properties(); + props.put("bootstrap.servers", properties.bootstrapServers); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + this.producer = new KafkaProducer<>(props); + this.topic = properties.topic; + } + + @Scheduled(fixedDelay = 500) + public void poll() + { + List items; + do + { + items = repository.fetch(sequenceNumber); + LOG.debug("Polled {} new items", items.size()); + for (OutboxItem item : items) + send(item); + } + while (items.size() > 0); + } + + void send(OutboxItem item) + { + final ProducerRecord record = + new ProducerRecord<>(topic, item.getKey(), item.getValue()); + + sequenceNumber = item.getSequenceNumber(); + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(item.getSequenceNumber()); + record.headers().add("SEQ#", buffer.array()); + + producer.send(record, (metadata, e) -> + { + if (metadata != null) + { + int deleted = repository.delete(item.getSequenceNumber()); + LOG.info( + "{}/{}:{} - {}:{}={} - deleted: {}", + metadata.topic(), + metadata.partition(), + metadata.offset(), + item.getSequenceNumber(), + record.key(), + record.value(), + deleted); + } + else + { + // HANDLE ERROR + LOG.error( + "{}/{} - {}:{}={} -> ", + record.topic(), + record.partition(), + item.getSequenceNumber(), + record.key(), + record.value(), + e); + } + }); + } + + + @PreDestroy + public void close() + { + producer.close(Duration.ofSeconds(5)); + } +} diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java new file mode 100644 index 0000000..03a68ef --- /dev/null +++ b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java @@ -0,0 +1,62 @@ +package de.juplo.kafka.outbox; + +import lombok.AllArgsConstructor; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; +import java.time.ZonedDateTime; +import java.util.List; + + +@Repository +@AllArgsConstructor +public class OutboxRepository +{ + private static final String SQL_QUERY = + "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC"; + private static final String SQL_UPDATE = + "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)"; + private static final String SQL_DELETE = + "DELETE FROM outbox WHERE id = :id"; + + private final NamedParameterJdbcTemplate jdbcTemplate; + + + public void save(String key, String value, ZonedDateTime issued) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("key", key); + parameters.addValue("value", value); + parameters.addValue("issued", Timestamp.from(issued.toInstant())); + jdbcTemplate.update(SQL_UPDATE, parameters); + } + + public int delete(Long id) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("id", id); + return jdbcTemplate.update(SQL_DELETE, parameters); + } + + public List fetch(Long sequenceNumber) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("sequenceNumber", sequenceNumber); + return + jdbcTemplate.query( + SQL_QUERY, + parameters, + (resultSet, rowNumber) -> + { + return + OutboxItem + .builder() + .sequenceNumber(resultSet.getLong(1)) + .key(resultSet.getString(2)) + .value(resultSet.getString(3)) + .build(); + }); + } +} diff --git a/outbox/src/main/resources/application.yml b/outbox/src/main/resources/application.yml new file mode 100644 index 0000000..2a8502a --- /dev/null +++ b/outbox/src/main/resources/application.yml @@ -0,0 +1,53 @@ +management: + endpoints: + web: + exposure: + include: "*" + +spring: + flyway: + locations: classpath:db/migration/h2 + +logging: + level: + de: + juplo: + kafka: + outbox: DEBUG + +--- + +spring: + profiles: prod + + datasource: + url: jdbc:postgresql://postgres:5432/outbox + username: outbox + password: outbox + flyway: + locations: classpath:db/migration/postgres + +de: + juplo: + kafka: + outbox: + bootstrap-servers: kafka:9093 + +--- + +spring: + profiles: dev + + datasource: + url: jdbc:postgresql://localhost:5432/outbox + username: outbox + password: outbox + flyway: + locations: classpath:db/migration/postgres + +de: + juplo: + kafka: + outbox: + bootstrap-servers: localhost:9092 + diff --git a/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java b/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java new file mode 100644 index 0000000..cb5bc43 --- /dev/null +++ b/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.outbox; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@SpringBootTest +public class ApplicationTests +{ + @Test + public void contextLoads() + { + } +} diff --git a/pom.xml b/pom.xml index 3e46567..8825add 100644 --- a/pom.xml +++ b/pom.xml @@ -15,6 +15,7 @@ jdbc + outbox -- 2.20.1