The script will...
-* compile the component,
-* package it as Docker-Images,
-* start up the component and a PostreSQL as containers in a [Compose-Setup](docker-compose.yml),
+* compile the two components,
+* package them as Docker-Images,
+* start up the components and a minimal Kafka Cluster as containers in a [Compose-Setup](docker-compose.yml),
* execute example-queries (CREATE / DELETE) against the API of [the example-project](https://juplo.de/implementing-the-outbox-pattern-with-kafka-part-0-the-example/) and
-* tail the logs of the containers `jdbc` to show what is going on.
+* tail the logs of the containers `jdbc` and `kafkacat` to show what is going on.
You can verify the expected outcome of the demonstration by running a command like the following:
- $ docker-compose exec postgres psql -Uoutbox -c'SELECT * FROM outbox;' -Ppager=0 outbox | grep peter
- 1 | peter1 | "CREATED" | 2021-05-16 13:20:36.849
- 10 | peter2 | "CREATED" | 2021-05-16 13:20:42.141
- 19 | peter3 | "CREATED" | 2021-05-16 13:20:47.136
- 28 | peter4 | "CREATED" | 2021-05-16 13:20:52.087
- 37 | peter5 | "CREATED" | 2021-05-16 13:20:57.512
- 46 | peter6 | "CREATED" | 2021-05-16 13:21:02.493
- 55 | peter7 | "CREATED" | 2021-05-16 13:21:07.503
+ $ docker-compose logs kafkacat | grep peter
+ kafkacat_1 | peter1:"CREATED"
+ kafkacat_1 | peter2:"CREATED"
+ kafkacat_1 | peter3:"CREATED"
+ kafkacat_1 | peter4:"CREATED"
+ kafkacat_1 | peter5:"CREATED"
+ kafkacat_1 | peter6:"CREATED"
+ kafkacat_1 | peter7:"CREATED"
$
-The example-output shows, that the CREATE-event for users with "peter" in their username are only stored exactly once in the outbox-table, although the script issues several requests for each of these users.
+The example-output shows, that the CREATE-event for users with "peter" in their username are only issued exactly once, although the script issues several requests for each of these users.
Be aware, that the outcome of the script will be different, if you run it several times.
In order to reproduce the same behaviour, you have to shut down the Compose-Setup before rerunning the script:
docker-compose down -v
mvn clean
docker image rm juplo/data-jdbc:polling-outbox-2-SNAPSHOT
+ docker image rm juplo/polling-outbox:polling-outbox-2-SNAPSHOT
exit
fi
+docker-compose up -d zookeeper kafka
+
if [[
$(docker image ls -q juplo/data-jdbc:polling-outbox-2-SNAPSHOT) == "" ||
+ $(docker image ls -q juplo/polling-outbox:polling-outbox-2-SNAPSHOT) == "" ||
"$1" = "build"
]]
then
else
echo "Using image existing images:"
docker image ls juplo/data-jdbc:polling-outbox-2-SNAPSHOT
+ docker image ls juplo/polling-outbox:polling-outbox-2-SNAPSHOT
fi
-docker-compose up -d jdbc
+while ! [[ $(docker-compose exec kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]];
+do
+ echo "Waiting for kafka...";
+ sleep 1;
+done
+
+docker-compose exec kafka kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --replication-factor 1 --partitions 3 --topic outbox
+
+
+docker-compose up -d jdbc outbox kafkacat
while ! [[ $(http :8080/actuator/health 2>/dev/null | jq -r .status) == "UP" ]];
do
done
-docker-compose logs --tail=0 -f jdbc &
+docker-compose logs --tail=0 -f jdbc kafkacat &
for i in `seq 1 7`;
do
done;
docker-compose exec postgres psql -Uoutbox -c'SELECT * FROM outbox;' -Ppager=0 outbox
-docker-compose stop
+# "kill" the executions of "docker-compose logs ..."
+docker-compose stop jdbc kafkacat
services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:6.0.1
+ ports:
+ - 2181:2181
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+
+ kafka:
+ image: confluentinc/cp-kafka:6.0.1
+ ports:
+ - 9092:9092
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
+ KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ depends_on:
+ - zookeeper
+
+ kafkacat:
+ image: confluentinc/cp-kafkacat:6.0.1
+ command: "kafkacat -C -b kafka:9093 -q -t outbox -K:"
+ tty: true
+ depends_on:
+ - kafka
+
jdbc:
image: juplo/data-jdbc:polling-outbox-2-SNAPSHOT
ports:
depends_on:
- postgres
+ outbox:
+ image: juplo/polling-outbox:polling-outbox-2-SNAPSHOT
+ environment:
+ spring.profiles.active: prod
+ depends_on:
+ - postgres
+ - kafka
+
postgres:
image: postgres:13
--- /dev/null
+*
+!target/*.jar
--- /dev/null
+FROM openjdk:11-jre
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
+CMD []
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.3.2.RELEASE</version>
+ <relativePath/> <!-- lookup parent from repository -->
+ </parent>
+
+ <groupId>de.juplo.kafka.outbox</groupId>
+ <artifactId>polling-outbox</artifactId>
+ <version>polling-outbox-2-SNAPSHOT</version>
+ <name>Outbox (Polling)</name>
+ <description>Simple example-implementation of the Outbox-Pattern (polling variant)</description>
+
+ <properties>
+ <java.version>11</java.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jdbc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-json</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.33.0</version>
+ <configuration>
+ <images>
+ <image>
+ <name>juplo/%a:%v</name>
+ </image>
+ </images>
+ </configuration>
+ <executions>
+ <execution>
+ <id>build</id>
+ <phase>package</phase>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+import java.util.concurrent.CountDownLatch;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+@Slf4j
+public class Application
+{
+ public static void main(String[] args) throws Exception
+ {
+ SpringApplication.run(Application.class, args);
+
+ final CountDownLatch closeLatch = new CountDownLatch(1);
+
+ Runtime
+ .getRuntime()
+ .addShutdownHook(new Thread()
+ {
+ @Override
+ public void run()
+ {
+ log.info("Closing application...");
+ closeLatch.countDown();
+ }
+ });
+
+ closeLatch.await();
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("de.juplo.kafka.outbox")
+@Getter
+@Setter
+public class ApplicationProperties
+{
+ String bootstrapServers = "localhost:9092";
+ String topic = "outbox";
+}
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Value;
+
+
+@Data
+@Value
+@Builder
+public class OutboxItem
+{
+ private final Long sequenceNumber;
+ private final String key;
+ private final String value;
+}
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+
+
+@Component
+public class OutboxProducer
+{
+ final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
+
+
+ private final OutboxRepository repository;
+ private final KafkaProducer<String, String> producer;
+ private final String topic;
+
+ private long sequenceNumber = 0l;
+
+ public OutboxProducer(
+ ApplicationProperties properties,
+ OutboxRepository repository)
+ {
+ this.repository = repository;
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", properties.bootstrapServers);
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ this.producer = new KafkaProducer<>(props);
+ this.topic = properties.topic;
+ }
+
+ @Scheduled(fixedDelay = 500)
+ public void poll()
+ {
+ List<OutboxItem> items;
+ do
+ {
+ items = repository.fetch(sequenceNumber);
+ LOG.debug("Polled {} new items", items.size());
+ for (OutboxItem item : items)
+ send(item);
+ }
+ while (items.size() > 0);
+ }
+
+ void send(OutboxItem item)
+ {
+ final ProducerRecord<String, String> record =
+ new ProducerRecord<>(topic, item.getKey(), item.getValue());
+
+ sequenceNumber = item.getSequenceNumber();
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong(item.getSequenceNumber());
+ record.headers().add("SEQ#", buffer.array());
+
+ producer.send(record, (metadata, e) ->
+ {
+ if (metadata != null)
+ {
+ int deleted = repository.delete(item.getSequenceNumber());
+ LOG.info(
+ "{}/{}:{} - {}:{}={} - deleted: {}",
+ metadata.topic(),
+ metadata.partition(),
+ metadata.offset(),
+ item.getSequenceNumber(),
+ record.key(),
+ record.value(),
+ deleted);
+ }
+ else
+ {
+ // HANDLE ERROR
+ LOG.error(
+ "{}/{} - {}:{}={} -> ",
+ record.topic(),
+ record.partition(),
+ item.getSequenceNumber(),
+ record.key(),
+ record.value(),
+ e);
+ }
+ });
+ }
+
+
+ @PreDestroy
+ public void close()
+ {
+ producer.close(Duration.ofSeconds(5));
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import lombok.AllArgsConstructor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Repository;
+
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+import java.util.List;
+
+
+@Repository
+@AllArgsConstructor
+public class OutboxRepository
+{
+ private static final String SQL_QUERY =
+ "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
+ private static final String SQL_UPDATE =
+ "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
+ private static final String SQL_DELETE =
+ "DELETE FROM outbox WHERE id = :id";
+
+ private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+ public void save(String key, String value, ZonedDateTime issued)
+ {
+ MapSqlParameterSource parameters = new MapSqlParameterSource();
+ parameters.addValue("key", key);
+ parameters.addValue("value", value);
+ parameters.addValue("issued", Timestamp.from(issued.toInstant()));
+ jdbcTemplate.update(SQL_UPDATE, parameters);
+ }
+
+ public int delete(Long id)
+ {
+ MapSqlParameterSource parameters = new MapSqlParameterSource();
+ parameters.addValue("id", id);
+ return jdbcTemplate.update(SQL_DELETE, parameters);
+ }
+
+ public List<OutboxItem> fetch(Long sequenceNumber)
+ {
+ MapSqlParameterSource parameters = new MapSqlParameterSource();
+ parameters.addValue("sequenceNumber", sequenceNumber);
+ return
+ jdbcTemplate.query(
+ SQL_QUERY,
+ parameters,
+ (resultSet, rowNumber) ->
+ {
+ return
+ OutboxItem
+ .builder()
+ .sequenceNumber(resultSet.getLong(1))
+ .key(resultSet.getString(2))
+ .value(resultSet.getString(3))
+ .build();
+ });
+ }
+}
--- /dev/null
+management:
+ endpoints:
+ web:
+ exposure:
+ include: "*"
+
+spring:
+ flyway:
+ locations: classpath:db/migration/h2
+
+logging:
+ level:
+ de:
+ juplo:
+ kafka:
+ outbox: DEBUG
+
+---
+
+spring:
+ profiles: prod
+
+ datasource:
+ url: jdbc:postgresql://postgres:5432/outbox
+ username: outbox
+ password: outbox
+ flyway:
+ locations: classpath:db/migration/postgres
+
+de:
+ juplo:
+ kafka:
+ outbox:
+ bootstrap-servers: kafka:9093
+
+---
+
+spring:
+ profiles: dev
+
+ datasource:
+ url: jdbc:postgresql://localhost:5432/outbox
+ username: outbox
+ password: outbox
+ flyway:
+ locations: classpath:db/migration/postgres
+
+de:
+ juplo:
+ kafka:
+ outbox:
+ bootstrap-servers: localhost:9092
+
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class ApplicationTests
+{
+ @Test
+ public void contextLoads()
+ {
+ }
+}
<modules>
<module>jdbc</module>
+ <module>outbox</module>
</modules>
</project>