--- /dev/null
+FROM openjdk:8-jre-alpine
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+ENTRYPOINT [ "/usr/bin/java", "-jar", "/opt/app.jar" ]
+CMD []
--- /dev/null
+version: "2"
+
+services:
+ zookeeper:
+ image: "confluentinc/cp-zookeeper:latest"
+ hostname: zookeeper
+ networks:
+ - tx
+ ports:
+ - 2181:2181
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+
+ kafka:
+ image: "confluentinc/cp-kafka:latest"
+ hostname: kafka
+ networks:
+ - tx
+ depends_on:
+ - zookeeper
+ ports:
+ - 9092:9092
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
+ KAFKA_LISTENERS: INSIDE://:9093,OUTSIDE://:9092
+ KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
+
+ outbox:
+ image: "outbox:latest"
+ networks:
+ - tx
+ ports:
+ - 8080:8080
+ environment:
+ GPS_BOOTSTRAP_SERVERS: kafka:9093
+ depends_on:
+ - kafka
+
+networks:
+ tx:
+ driver: bridge
</dependencies>
<build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>io.fabric8</groupId>
- <artifactId>docker-maven-plugin</artifactId>
- <version>0.33.0</version>
- </plugin>
- </plugins>
+ <plugins>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.33.0</version>
+ <configuration>
+ <images>
+ <image>
+ <name>%a:%l</name>
+ </image>
+ </images>
+ </configuration>
+ <executions>
+ <execution>
+ <id>build</id>
+ <phase>package</phase>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
</build>
</project>
package de.trion.kafka.outbox;
import com.fasterxml.jackson.databind.ObjectMapper;
-import de.lvm.tx.Event;
-import de.lvm.tx.Command;
-import de.lvm.tx.Command.Action;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
-import javax.swing.*;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
-import static de.lvm.tx.Event.Type.*;
@Component
public class OutboxConsumer implements ApplicationRunner, Runnable {
{
ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Long, String> record : records) {
- byte code = record.headers().lastHeader("messageType").value()[0];
- Action action = Action.from(code);
-
- if (action == null)
- {
- LOG.debug("Ignoring unknown action {} for {}", code, record.value());
- continue;
- }
-
- switch(action) {
- case SAVE_DLZ:
- dlzSaveReceived(toCommand(record.value()));
- continue;
- default:
- LOG.debug("Ignoring message {}", record.value());
- }
- byte[] bytes = record.headers().lastHeader("messageType").value();
- String type = new String(bytes, StandardCharsets.UTF_8);
-
- if (type.endsWith("DlzAction")) {
- dlzSaveReceived(toCommand(record.value()));
- continue;
- }
-
LOG.debug("Ignoring command {}", record.value());
}
}
}
}
- public Command toCommand(String message) throws IOException {
- Command command = mapper.readValue(message, Command.class);
- LOG.info("{}: {}", command.getAction(), command.getVorgangId());
- return command;
- }
-
- public void dlzSaveReceived(Command command) throws InterruptedException {
- try
- {
- String result =
- service.bearbeiteVorgang(
- command.getVorgangId(),
- command.getVbId(),
- command.getData());
- reply(command, result);
- }
- catch (Exception e) {
- LOG.error("Exception during processing!", e);
- }
- }
-
- public void reply(Command command, String message) {
- String vorgangId = command.getVorgangId();
- String vbId = command.getVbId();
- Event event = new Event(DLZ_SAVED, vorgangId, vbId);
- event.getZustand().put(Event.DLZ, message);
- sender.send(event);
- }
-
@Override
public void run(ApplicationArguments args) {
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
-import org.springframework.web.context.request.async.DeferredResult;
+import org.springframework.web.util.UriComponents;
+import org.springframework.web.util.UriComponentsBuilder;
+
+import java.time.LocalDateTime;
@RestController
+@Transactional
+@RequestMapping("/users")
public class OutboxController {
private static final Logger LOG = LoggerFactory.getLogger(OutboxController.class);
- private final OutboxService service;
+ private final UserRepository repository;
- public OutboxController(OutboxService service) {
- this.service = service;
+ public OutboxController(UserRepository repository) {
+ this.repository = repository;
}
- @PostMapping("/create")
- public ResponseEntity<Void> getVorgang(@RequestBody String user) {
+ @PostMapping()
+ public ResponseEntity<Void> getVorgang(
+ UriComponentsBuilder builder,
+ @RequestBody String username) {
+ User user = new User(username, LocalDateTime.now(), false);
+ repository.save(user);
+ // TODO: Not-Unique Fehler auslösen
+ UriComponents uri = builder.path("/{username}").buildAndExpand(username);
+ return ResponseEntity.created(uri.toUri()).build();
}
}
public interface UserRepository extends CrudRepository<User, Long> {
@Query("select * from User u where u.username = :username")
- User findByEmailAddress(@Param("email") String username);
+ User findByUsername(@Param("email") String username);
}