From: Kai Moritz Date: Sun, 12 Jul 2020 08:21:44 +0000 (+0200) Subject: WIP X-Git-Tag: wip-initial~19 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=27982748e3e226a7dcb06397a3855e3ed2181ec8;p=demos%2Fkafka%2Foutbox WIP --- diff --git a/.gitignore b/.gitignore index c507849..f686482 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ target .idea +outbox.iml diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..89e2ebb --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:8-jre-alpine +VOLUME /tmp +COPY target/*.jar /opt/app.jar +ENTRYPOINT [ "/usr/bin/java", "-jar", "/opt/app.jar" ] +CMD [] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..23d023f --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,44 @@ +version: "2" + +services: + zookeeper: + image: "confluentinc/cp-zookeeper:latest" + hostname: zookeeper + networks: + - tx + ports: + - 2181:2181 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: "confluentinc/cp-kafka:latest" + hostname: kafka + networks: + - tx + depends_on: + - zookeeper + ports: + - 9092:9092 + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092 + KAFKA_LISTENERS: INSIDE://:9093,OUTSIDE://:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE + + outbox: + image: "outbox:latest" + networks: + - tx + ports: + - 8080:8080 + environment: + GPS_BOOTSTRAP_SERVERS: kafka:9093 + depends_on: + - kafka + +networks: + tx: + driver: bridge diff --git a/pom.xml b/pom.xml index ae3fdf4..a69fe6c 100644 --- a/pom.xml +++ b/pom.xml @@ -50,17 +50,29 @@ - - - org.springframework.boot - spring-boot-maven-plugin - - - io.fabric8 - docker-maven-plugin - 0.33.0 - - + + + io.fabric8 + docker-maven-plugin + 0.33.0 + + + + %a:%l + + + + + + build + package + + build + + + + + diff --git a/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java b/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java index 7bb6d4f..e33a28b 100644 --- a/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java +++ b/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java @@ -1,9 +1,6 @@ package de.trion.kafka.outbox; import com.fasterxml.jackson.databind.ObjectMapper; -import de.lvm.tx.Event; -import de.lvm.tx.Command; -import de.lvm.tx.Command.Action; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -17,14 +14,10 @@ import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; -import javax.swing.*; -import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Arrays; import java.util.Properties; -import static de.lvm.tx.Event.Type.*; @Component public class OutboxConsumer implements ApplicationRunner, Runnable { @@ -78,30 +71,6 @@ public class OutboxConsumer implements ApplicationRunner, Runnable { { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { - byte code = record.headers().lastHeader("messageType").value()[0]; - Action action = Action.from(code); - - if (action == null) - { - LOG.debug("Ignoring unknown action {} for {}", code, record.value()); - continue; - } - - switch(action) { - case SAVE_DLZ: - dlzSaveReceived(toCommand(record.value())); - continue; - default: - LOG.debug("Ignoring message {}", record.value()); - } - byte[] bytes = record.headers().lastHeader("messageType").value(); - String type = new String(bytes, StandardCharsets.UTF_8); - - if (type.endsWith("DlzAction")) { - dlzSaveReceived(toCommand(record.value())); - continue; - } - LOG.debug("Ignoring command {}", record.value()); } } @@ -123,35 +92,6 @@ public class OutboxConsumer implements ApplicationRunner, Runnable { } } - public Command toCommand(String message) throws IOException { - Command command = mapper.readValue(message, Command.class); - LOG.info("{}: {}", command.getAction(), command.getVorgangId()); - return command; - } - - public void dlzSaveReceived(Command command) throws InterruptedException { - try - { - String result = - service.bearbeiteVorgang( - command.getVorgangId(), - command.getVbId(), - command.getData()); - reply(command, result); - } - catch (Exception e) { - LOG.error("Exception during processing!", e); - } - } - - public void reply(Command command, String message) { - String vorgangId = command.getVorgangId(); - String vbId = command.getVbId(); - Event event = new Event(DLZ_SAVED, vorgangId, vbId); - event.getZustand().put(Event.DLZ, message); - sender.send(event); - } - @Override public void run(ApplicationArguments args) { diff --git a/src/main/java/de/trion/kafka/outbox/OutboxController.java b/src/main/java/de/trion/kafka/outbox/OutboxController.java index 934ca1f..78d4999 100644 --- a/src/main/java/de/trion/kafka/outbox/OutboxController.java +++ b/src/main/java/de/trion/kafka/outbox/OutboxController.java @@ -2,26 +2,38 @@ package de.trion.kafka.outbox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.*; -import org.springframework.web.context.request.async.DeferredResult; +import org.springframework.web.util.UriComponents; +import org.springframework.web.util.UriComponentsBuilder; + +import java.time.LocalDateTime; @RestController +@Transactional +@RequestMapping("/users") public class OutboxController { private static final Logger LOG = LoggerFactory.getLogger(OutboxController.class); - private final OutboxService service; + private final UserRepository repository; - public OutboxController(OutboxService service) { - this.service = service; + public OutboxController(UserRepository repository) { + this.repository = repository; } - @PostMapping("/create") - public ResponseEntity getVorgang(@RequestBody String user) { + @PostMapping() + public ResponseEntity getVorgang( + UriComponentsBuilder builder, + @RequestBody String username) { + User user = new User(username, LocalDateTime.now(), false); + repository.save(user); + // TODO: Not-Unique Fehler auslösen + UriComponents uri = builder.path("/{username}").buildAndExpand(username); + return ResponseEntity.created(uri.toUri()).build(); } } diff --git a/src/main/java/de/trion/kafka/outbox/UserRepository.java b/src/main/java/de/trion/kafka/outbox/UserRepository.java index a12c2e7..6791180 100644 --- a/src/main/java/de/trion/kafka/outbox/UserRepository.java +++ b/src/main/java/de/trion/kafka/outbox/UserRepository.java @@ -7,5 +7,5 @@ import org.springframework.jdbc.core.JdbcTemplate; public interface UserRepository extends CrudRepository { @Query("select * from User u where u.username = :username") - User findByEmailAddress(@Param("email") String username); + User findByUsername(@Param("email") String username); }