From 2271c5b01003adc13d5d29afade3c14c85614e1d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 12 Jul 2020 14:30:05 +0200 Subject: [PATCH] WIP --- pom.xml | 14 +-- .../de/trion/kafka/outbox/Application.java | 36 ------ .../kafka/outbox/ApplicationProperties.java | 35 ------ .../de/trion/kafka/outbox/OutboxConsumer.java | 114 ------------------ .../de/trion/kafka/outbox/OutboxProducer.java | 71 ----------- .../de/trion/kafka/outbox/OutboxService.java | 65 ---------- 6 files changed, 5 insertions(+), 330 deletions(-) delete mode 100644 src/main/java/de/trion/kafka/outbox/ApplicationProperties.java delete mode 100644 src/main/java/de/trion/kafka/outbox/OutboxConsumer.java delete mode 100644 src/main/java/de/trion/kafka/outbox/OutboxProducer.java delete mode 100644 src/main/java/de/trion/kafka/outbox/OutboxService.java diff --git a/pom.xml b/pom.xml index e55601f..322ea86 100644 --- a/pom.xml +++ b/pom.xml @@ -31,24 +31,20 @@ org.springframework.boot spring-boot-starter-data-jdbc - - com.h2database - h2 - runtime - org.springframework.boot spring-boot-starter-json - org.projectlombok lombok + - org.apache.kafka - kafka-clients - + com.h2database + h2 + runtime + org.springframework.boot diff --git a/src/main/java/de/trion/kafka/outbox/Application.java b/src/main/java/de/trion/kafka/outbox/Application.java index 878e75e..f6865f5 100644 --- a/src/main/java/de/trion/kafka/outbox/Application.java +++ b/src/main/java/de/trion/kafka/outbox/Application.java @@ -2,51 +2,15 @@ package de.trion.kafka.outbox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.web.servlet.config.annotation.CorsRegistry; -import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; @SpringBootApplication -@EnableConfigurationProperties(ApplicationProperties.class) public class Application { private final static Logger LOG = LoggerFactory.getLogger(Application.class); - @Autowired - ApplicationProperties properties; - - - @Bean - public String bootstrapServers() { return properties.bootstrapServers; } - - @Bean - public String topic() { - return properties.topic; - } - - @Bean - public String consumerGroup() { - return properties.consumerGroup; - } - - @Bean - public WebMvcConfigurer corsConfigurer() { - return new WebMvcConfigurer() { - @Override - public void addCorsMappings(CorsRegistry registry) { - registry - .addMapping("/**") - .allowedOrigins("http://localhost:4200"); - } - }; - } - - public static void main(String[] args) { SpringApplication.run(Application.class, args); } diff --git a/src/main/java/de/trion/kafka/outbox/ApplicationProperties.java b/src/main/java/de/trion/kafka/outbox/ApplicationProperties.java deleted file mode 100644 index 7ba4c06..0000000 --- a/src/main/java/de/trion/kafka/outbox/ApplicationProperties.java +++ /dev/null @@ -1,35 +0,0 @@ -package de.trion.kafka.outbox; - -import org.springframework.boot.context.properties.ConfigurationProperties; - -@ConfigurationProperties("outbox.polling") -public class ApplicationProperties { - public String bootstrapServers = "localhost:9092"; - public String topic = "outbox-polling"; - public String consumerGroup = "polling"; - - - public String getBootstrapServers() { - return bootstrapServers; - } - - public void setBootstrapServers(String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - } - - public String getTopic() { - return topic; - } - - public void setTopic(String topic) { - this.topic = topic; - } - - public String getConsumerGroup() { - return consumerGroup; - } - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } -} diff --git a/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java b/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java deleted file mode 100644 index e33a28b..0000000 --- a/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java +++ /dev/null @@ -1,114 +0,0 @@ -package de.trion.kafka.outbox; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.stereotype.Component; - -import javax.annotation.PreDestroy; -import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; - - -@Component -public class OutboxConsumer implements ApplicationRunner, Runnable { - - private final static Logger LOG = LoggerFactory.getLogger(OutboxConsumer.class); - - private final OutboxService service; - private final OutboxProducer sender; - private final ObjectMapper mapper; - private final String topic; - private final KafkaConsumer consumer; - private final Thread thread; - - private long internalState = 1; - - - public OutboxConsumer( - OutboxService service, - OutboxProducer sender, - ObjectMapper mapper, - String bootstrapServers, - String consumerGroup, - String topic) { - - this.service = service; - this.sender = sender; - this.mapper = mapper; - this.topic = topic; - - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServers); - props.put("group.id", consumerGroup); - props.put("auto.commit.interval.ms", 15000); - props.put("metadata.max.age.ms", 1000); - props.put("key.deserializer", LongDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - consumer = new KafkaConsumer<>(props); - - thread = new Thread(this); - } - - - @Override - public void run() { - try - { - LOG.info("Subscribing to topic " + topic); - consumer.subscribe(Arrays.asList(topic)); - - while (true) - { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - for (ConsumerRecord record : records) { - LOG.debug("Ignoring command {}", record.value()); - } - } - } - catch (WakeupException e) {} - catch (Exception e) { - LOG.error("Unexpected exception!", e); - } - finally - { - LOG.info("Closing the KafkaConsumer..."); - try { - consumer.close(Duration.ofSeconds(5)); - LOG.debug("Successfully closed the KafkaConsumer"); - } - catch (Exception e) { - LOG.warn("Exception while closing the KafkaConsumer!", e); - } - } - } - - - @Override - public void run(ApplicationArguments args) { - thread.start(); - try { - thread.join(); - LOG.info("Successfully joined the consumer-thread"); - } - catch (InterruptedException e) { - LOG.info("Main-thread was interrupted while joining the consumer-thread"); - } - } - - @PreDestroy - public void stop() - { - LOG.info("Stopping the KafkaConsumer..."); - consumer.wakeup(); - } -} diff --git a/src/main/java/de/trion/kafka/outbox/OutboxProducer.java b/src/main/java/de/trion/kafka/outbox/OutboxProducer.java deleted file mode 100644 index d493aee..0000000 --- a/src/main/java/de/trion/kafka/outbox/OutboxProducer.java +++ /dev/null @@ -1,71 +0,0 @@ -package de.trion.kafka.outbox; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import javax.annotation.PreDestroy; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -@Component -public class OutboxProducer { - - private final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class); - - private final ObjectMapper mapper; - private final String topic; - private final KafkaProducer producer; - - - public OutboxProducer(ObjectMapper mapper, String bootstrapServers, String topic) { - this.mapper = mapper; - this.topic = topic; - - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServers); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - producer = new KafkaProducer<>(props); - } - - - public void send(UserEvent event) { - try { - String json = mapper.writeValueAsString(event); - ProducerRecord record = new ProducerRecord<>(topic, event.user, json); - producer.send(record, (metadata, e) -> { - if (e != null) { - LOG.error("Could not send event {}!", json, e); - } - else { - LOG.debug( - "{}: send event {} with offset {} to partition {}", - event.user, - event.id, - metadata.offset(), - metadata.partition()); - } - }); - } catch (Exception e) { - throw new RuntimeException("Fehler beim Senden des Events " + event.id, e); - } - } - - - @PreDestroy - public void stop(){ - LOG.info("Closing the KafkaProducer..."); - try { - producer.close(5, TimeUnit.SECONDS); - LOG.debug("Successfully closed the KafkaProducer"); - } - catch (Exception e) { - LOG.warn("Exception while closing the KafkaProducer!", e); - } - } -} diff --git a/src/main/java/de/trion/kafka/outbox/OutboxService.java b/src/main/java/de/trion/kafka/outbox/OutboxService.java deleted file mode 100644 index e003839..0000000 --- a/src/main/java/de/trion/kafka/outbox/OutboxService.java +++ /dev/null @@ -1,65 +0,0 @@ -package de.trion.kafka.outbox; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Component; -import org.springframework.web.context.request.async.DeferredResult; - -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -@Component -public class OutboxService { - - private static final Logger LOG = LoggerFactory.getLogger(OutboxService.class); - - - private final Map state = new HashMap<>(); - private final Map requests = new HashMap<>(); - - private long counter = 1; - - - public OutboxService() {} - - - public String bearbeiteVorgang(String vorgangId, String vbId, String data) { - if (vorgangId == null) - throw new IllegalArgumentException("vorgangId must not be null!"); - - // Fehler beim Sichern simulieren - Random r = new Random(); - int i = r.nextInt(10); - if (i == 0) - throw new RuntimeException("FEHLER!!!!!!"); - - String result = vorgangId + "|vbId=" + vbId + "|" + counter++ + ", rand=" + i + ": " + data; - - if (state.containsKey(vorgangId)) - LOG.info("Bearbeite Vorgang {}: alt={}, neu={}", vorgangId, state.get(vorgangId), data); - else - LOG.info("Bearbeite Vorgang {}: neu={}", vorgangId, data); - - process(vorgangId, result); - return result; - } - - public synchronized void process(String vorgangId, DeferredResult result) { - String data = state.get(vorgangId); - if (data != null) { - result.setResult(ResponseEntity.ok(data)); - } - else { - requests.put(vorgangId, result); - } - } - - private synchronized void process(String vorgangId, String result) { - state.put(vorgangId, result); - DeferredResult request = requests.get(vorgangId); - if (request != null) - request.setResult(ResponseEntity.ok(result)); - } -} -- 2.20.1