From de37a02d8dccae426557c3600a2d18f1c4293985 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 13 Jul 2020 20:54:46 +0200 Subject: [PATCH] WIP --- Dockerfile | 5 + docker-compose.yml | 44 +++++++ pom.xml | 14 ++- .../de/juplo/boot/data/jdbc/Application.java | 17 --- .../de/juplo/boot/data/jdbc/UserEvent.java | 20 ---- .../boot/data/jdbc/UserEventListener.java | 19 --- .../kafka/outbox/polling/Application.java | 53 +++++++++ .../outbox/polling/ApplicationProperties.java | 35 ++++++ .../kafka/outbox/polling/OutboxListener.java | 110 ++++++++++++++++++ .../jdbc => kafka/outbox/polling}/User.java | 2 +- .../outbox/polling}/UserController.java | 7 +- .../juplo/kafka/outbox/polling/UserEvent.java | 59 ++++++++++ .../outbox/polling/UserEventListener.java | 37 ++++++ .../outbox/polling}/UserRepository.java | 2 +- src/main/resources/schema.sql | 1 + .../outbox/polling}/ApplicationTests.java | 2 +- 16 files changed, 360 insertions(+), 67 deletions(-) create mode 100644 Dockerfile create mode 100644 docker-compose.yml delete mode 100644 src/main/java/de/juplo/boot/data/jdbc/Application.java delete mode 100644 src/main/java/de/juplo/boot/data/jdbc/UserEvent.java delete mode 100644 src/main/java/de/juplo/boot/data/jdbc/UserEventListener.java create mode 100644 src/main/java/de/juplo/kafka/outbox/polling/Application.java create mode 100644 src/main/java/de/juplo/kafka/outbox/polling/ApplicationProperties.java create mode 100644 src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java rename src/main/java/de/juplo/{boot/data/jdbc => kafka/outbox/polling}/User.java (92%) rename src/main/java/de/juplo/{boot/data/jdbc => kafka/outbox/polling}/UserController.java (95%) create mode 100644 src/main/java/de/juplo/kafka/outbox/polling/UserEvent.java create mode 100644 src/main/java/de/juplo/kafka/outbox/polling/UserEventListener.java rename src/main/java/de/juplo/{boot/data/jdbc => kafka/outbox/polling}/UserRepository.java (90%) rename src/test/java/de/juplo/{boot/data/jdbc => kafka/outbox/polling}/ApplicationTests.java (88%) diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..89e2ebb --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:8-jre-alpine +VOLUME /tmp +COPY target/*.jar /opt/app.jar +ENTRYPOINT [ "/usr/bin/java", "-jar", "/opt/app.jar" ] +CMD [] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..23d023f --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,44 @@ +version: "2" + +services: + zookeeper: + image: "confluentinc/cp-zookeeper:latest" + hostname: zookeeper + networks: + - tx + ports: + - 2181:2181 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: "confluentinc/cp-kafka:latest" + hostname: kafka + networks: + - tx + depends_on: + - zookeeper + ports: + - 9092:9092 + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092 + KAFKA_LISTENERS: INSIDE://:9093,OUTSIDE://:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE + + outbox: + image: "outbox:latest" + networks: + - tx + ports: + - 8080:8080 + environment: + GPS_BOOTSTRAP_SERVERS: kafka:9093 + depends_on: + - kafka + +networks: + tx: + driver: bridge diff --git a/pom.xml b/pom.xml index 10293a1..221b4e3 100644 --- a/pom.xml +++ b/pom.xml @@ -8,14 +8,14 @@ 2.1.5.RELEASE - de.juplo.boot.data - jdbc + de.juplo.kafka.outbox + polling 0.0.1-SNAPSHOT - data-jdbc - Simple web-app example for spring-boot-data-jdbc + Polling-Outbox-Pattern + Implementierung des Outbox-Patterns auf Basis von JDBC - 1.8 + 1.9 @@ -39,6 +39,10 @@ org.projectlombok lombok + + org.apache.kafka + kafka-clients + com.h2database diff --git a/src/main/java/de/juplo/boot/data/jdbc/Application.java b/src/main/java/de/juplo/boot/data/jdbc/Application.java deleted file mode 100644 index d7301e1..0000000 --- a/src/main/java/de/juplo/boot/data/jdbc/Application.java +++ /dev/null @@ -1,17 +0,0 @@ -package de.juplo.boot.data.jdbc; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - -@SpringBootApplication -public class Application { - - private final static Logger LOG = LoggerFactory.getLogger(Application.class); - - - public static void main(String[] args) { - SpringApplication.run(Application.class, args); - } -} diff --git a/src/main/java/de/juplo/boot/data/jdbc/UserEvent.java b/src/main/java/de/juplo/boot/data/jdbc/UserEvent.java deleted file mode 100644 index fc8877c..0000000 --- a/src/main/java/de/juplo/boot/data/jdbc/UserEvent.java +++ /dev/null @@ -1,20 +0,0 @@ -package de.juplo.boot.data.jdbc; - -import org.springframework.context.ApplicationEvent; - - -public class UserEvent extends ApplicationEvent -{ - public enum Type { CREATED, LOGIN, LOGOUT, DELETED } - - final Type type; - final String user; - - - public UserEvent(Object source, Type type, String user) - { - super(source); - this.type = type; - this.user = user; - } -} diff --git a/src/main/java/de/juplo/boot/data/jdbc/UserEventListener.java b/src/main/java/de/juplo/boot/data/jdbc/UserEventListener.java deleted file mode 100644 index 5f22e0f..0000000 --- a/src/main/java/de/juplo/boot/data/jdbc/UserEventListener.java +++ /dev/null @@ -1,19 +0,0 @@ -package de.juplo.boot.data.jdbc; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; -import org.springframework.transaction.event.TransactionalEventListener; - -@Component -public class UserEventListener -{ - private static final Logger LOG = LoggerFactory.getLogger(UserEventListener.class); - - - @TransactionalEventListener - public void onUserEvent(UserEvent event) - { - LOG.info("{}: {}", event.type, event.user); - } -} diff --git a/src/main/java/de/juplo/kafka/outbox/polling/Application.java b/src/main/java/de/juplo/kafka/outbox/polling/Application.java new file mode 100644 index 0000000..7605374 --- /dev/null +++ b/src/main/java/de/juplo/kafka/outbox/polling/Application.java @@ -0,0 +1,53 @@ +package de.juplo.kafka.outbox.polling; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.web.servlet.config.annotation.CorsRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +public class Application { + + private final static Logger LOG = LoggerFactory.getLogger(Application.class); + + + @Autowired + ApplicationProperties properties; + + + @Bean + public String bootstrapServers() { return properties.bootstrapServers; } + + @Bean + public String topic() { + return properties.topic; + } + + @Bean + public String consumerGroup() { + return properties.consumerGroup; + } + + @Bean + public WebMvcConfigurer corsConfigurer() { + return new WebMvcConfigurer() { + @Override + public void addCorsMappings(CorsRegistry registry) { + registry + .addMapping("/**") + .allowedOrigins("http://localhost:4200"); + } + }; + } + + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/outbox/polling/ApplicationProperties.java b/src/main/java/de/juplo/kafka/outbox/polling/ApplicationProperties.java new file mode 100644 index 0000000..8b0bbff --- /dev/null +++ b/src/main/java/de/juplo/kafka/outbox/polling/ApplicationProperties.java @@ -0,0 +1,35 @@ +package de.juplo.kafka.outbox.polling; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("outbox.polling") +public class ApplicationProperties { + public String bootstrapServers = "localhost:9092"; + public String topic = "outbox-polling"; + public String consumerGroup = "polling"; + + + public String getBootstrapServers() { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } +} diff --git a/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java b/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java new file mode 100644 index 0000000..d8f9643 --- /dev/null +++ b/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java @@ -0,0 +1,110 @@ +package de.juplo.kafka.outbox.polling; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.namedparam.SqlParameterSource; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.event.TransactionalEventListener; + +import javax.annotation.PreDestroy; +import java.util.*; +import java.util.concurrent.TimeUnit; + +@Component +public class OutboxListener { + + private static final Logger LOG = LoggerFactory.getLogger(OutboxListener.class); + + + private final JdbcTemplate jdbcTemplate; + private final String topic; + private final KafkaProducer producer; + + + public OutboxListener( + JdbcTemplate jdbcTemplate, + String bootstrapServers, + String topic) + { + this.jdbcTemplate = jdbcTemplate; + this.topic = topic; + + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); + props.put("key.serializer", LongSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + producer = new KafkaProducer<>(props); + } + + + @TransactionalEventListener + @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true) + public void onOutboxEvent(UserEvent userEvent) + { + List> result = + jdbcTemplate.queryForList("SELECT id, event, username FROM events ORDER BY id ASC"); + + try { + for (Map entry : result) + { + Long id = (Long)entry.get("id"); + UserEvent.Type type = UserEvent.Type.ofInt((Short)entry.get("event")); + String username = (String)entry.get("username"); + String event = username + ":" + type.name(); + + ProducerRecord record = new ProducerRecord<>(topic, id, event); + producer.send(record, (metadata, e) -> { + if (e != null) { + LOG.error("Could not send event {} ({}): ", id, event, e); + } + else { + LOG.debug( + "Send event {} ({}) with offset {} to partition {}", + id, + event, + metadata.offset(), + metadata.partition()); + deleteOutboxEntry(id); + } + }); + } + + } catch (Exception e) { + throw new RuntimeException("Fehler beim Senden des Events", e); + } + } + + @Transactional + void deleteOutboxEntry(Long id) + { + try + { + int result = jdbcTemplate.update("DELETE FROM events WHERE id = ?", id); + LOG.debug("entry {} {} from outbox", id, result == 1 ? "deleted" : "has already been deleted"); + } + catch (DataAccessException e) + { + LOG.error("Execption while deleting row from outbox: {}!", id, e); + } + } + + @PreDestroy + public void stop(){ + LOG.info("Closing the KafkaProducer..."); + try { + producer.close(5, TimeUnit.SECONDS); + LOG.debug("Successfully closed the KafkaProducer"); + } + catch (Exception e) { + LOG.warn("Exception while closing the KafkaProducer!", e); + } + } +} diff --git a/src/main/java/de/juplo/boot/data/jdbc/User.java b/src/main/java/de/juplo/kafka/outbox/polling/User.java similarity index 92% rename from src/main/java/de/juplo/boot/data/jdbc/User.java rename to src/main/java/de/juplo/kafka/outbox/polling/User.java index 6d4d552..5181175 100644 --- a/src/main/java/de/juplo/boot/data/jdbc/User.java +++ b/src/main/java/de/juplo/kafka/outbox/polling/User.java @@ -1,4 +1,4 @@ -package de.juplo.boot.data.jdbc; +package de.juplo.kafka.outbox.polling; import lombok.*; import org.springframework.data.annotation.Id; diff --git a/src/main/java/de/juplo/boot/data/jdbc/UserController.java b/src/main/java/de/juplo/kafka/outbox/polling/UserController.java similarity index 95% rename from src/main/java/de/juplo/boot/data/jdbc/UserController.java rename to src/main/java/de/juplo/kafka/outbox/polling/UserController.java index 0a4a62f..798ebe7 100644 --- a/src/main/java/de/juplo/boot/data/jdbc/UserController.java +++ b/src/main/java/de/juplo/kafka/outbox/polling/UserController.java @@ -1,4 +1,4 @@ -package de.juplo.boot.data.jdbc; +package de.juplo.kafka.outbox.polling; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,8 +16,9 @@ import java.io.IOException; import java.nio.charset.Charset; import java.time.LocalDateTime; -import static de.juplo.boot.data.jdbc.UserEvent.Type.CREATED; -import static de.juplo.boot.data.jdbc.UserEvent.Type.DELETED; +import static de.juplo.kafka.outbox.polling.UserEvent.Type.CREATED; +import static de.juplo.kafka.outbox.polling.UserEvent.Type.DELETED; + @RestController @Transactional diff --git a/src/main/java/de/juplo/kafka/outbox/polling/UserEvent.java b/src/main/java/de/juplo/kafka/outbox/polling/UserEvent.java new file mode 100644 index 0000000..15db4b7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/outbox/polling/UserEvent.java @@ -0,0 +1,59 @@ +package de.juplo.kafka.outbox.polling; + +import org.springframework.context.ApplicationEvent; + + +public class UserEvent extends ApplicationEvent +{ + public enum Type + { + CREATED(1), + LOGIN(2), + LOGOUT(3), + DELETED(4); + + public final int num; + + + Type(int num) + { + this.num = num; + } + + + public static Type ofInt(int ordinal) + { + switch (ordinal) + { + case 1: return CREATED; + case 2: return LOGIN; + case 3: return LOGOUT; + case 4: return DELETED; + default: + throw new RuntimeException("Unknown ordinal: " + ordinal); + } + } + + public static int toInt(Type type) + { + return type.toInt(); + } + + public int toInt() + { + return this.num; + } + } + + + final Type type; + final String user; + + + public UserEvent(Object source, Type type, String user) + { + super(source); + this.type = type; + this.user = user; + } +} diff --git a/src/main/java/de/juplo/kafka/outbox/polling/UserEventListener.java b/src/main/java/de/juplo/kafka/outbox/polling/UserEventListener.java new file mode 100644 index 0000000..b61fcca --- /dev/null +++ b/src/main/java/de/juplo/kafka/outbox/polling/UserEventListener.java @@ -0,0 +1,37 @@ +package de.juplo.kafka.outbox.polling; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.event.EventListener; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.jdbc.core.namedparam.SqlParameterSource; +import org.springframework.stereotype.Component; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; + +import java.util.Map; + +@Component +public class UserEventListener +{ + private static final Logger LOG = LoggerFactory.getLogger(UserEventListener.class); + + private final NamedParameterJdbcTemplate jdbcTemplate; + + + public UserEventListener(NamedParameterJdbcTemplate jdbcTemplate) + { + this.jdbcTemplate = jdbcTemplate; + } + + + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + public void onUserEvent(UserEvent event) + { + LOG.info("{}: {}", event.type, event.user); + SqlParameterSource parameters = + new MapSqlParameterSource(Map.of("event", event.type.toInt(), "username", event.user)); + jdbcTemplate.update("INSERT INTO events(event, username) VALUES(:event, :username)", parameters); + } +} diff --git a/src/main/java/de/juplo/boot/data/jdbc/UserRepository.java b/src/main/java/de/juplo/kafka/outbox/polling/UserRepository.java similarity index 90% rename from src/main/java/de/juplo/boot/data/jdbc/UserRepository.java rename to src/main/java/de/juplo/kafka/outbox/polling/UserRepository.java index 1d07359..73b3f9e 100644 --- a/src/main/java/de/juplo/boot/data/jdbc/UserRepository.java +++ b/src/main/java/de/juplo/kafka/outbox/polling/UserRepository.java @@ -1,4 +1,4 @@ -package de.juplo.boot.data.jdbc; +package de.juplo.kafka.outbox.polling; import org.springframework.data.jdbc.repository.query.Query; import org.springframework.data.repository.CrudRepository; diff --git a/src/main/resources/schema.sql b/src/main/resources/schema.sql index 353c36b..b2e329f 100644 --- a/src/main/resources/schema.sql +++ b/src/main/resources/schema.sql @@ -1 +1,2 @@ CREATE TABLE user(id BIGINT PRIMARY KEY AUTO_INCREMENT, username VARCHAR(255), created TIMESTAMP, logged_in BIT) +CREATE TABLE events(id BIGINT PRIMARY KEY AUTO_INCREMENT, event SMALLINT, username varchar(255)) diff --git a/src/test/java/de/juplo/boot/data/jdbc/ApplicationTests.java b/src/test/java/de/juplo/kafka/outbox/polling/ApplicationTests.java similarity index 88% rename from src/test/java/de/juplo/boot/data/jdbc/ApplicationTests.java rename to src/test/java/de/juplo/kafka/outbox/polling/ApplicationTests.java index c77d5b7..52aed2e 100644 --- a/src/test/java/de/juplo/boot/data/jdbc/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/outbox/polling/ApplicationTests.java @@ -1,4 +1,4 @@ -package de.juplo.boot.data.jdbc; +package de.juplo.kafka.outbox.polling; import org.junit.Test; import org.junit.runner.RunWith; -- 2.20.1