X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Ftrion%2Fkafka%2Foutbox%2FOutboxConsumer.java;h=e33a28ba163a762fcc21244dcc1513fc6faf9331;hb=27982748e3e226a7dcb06397a3855e3ed2181ec8;hp=7bb6d4f594a30c86c97a8fa794fe4b430043c57f;hpb=a498c7ae070b064dc7ea9a453b389ecff5c190ce;p=demos%2Fkafka%2Foutbox diff --git a/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java b/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java index 7bb6d4f..e33a28b 100644 --- a/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java +++ b/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java @@ -1,9 +1,6 @@ package de.trion.kafka.outbox; import com.fasterxml.jackson.databind.ObjectMapper; -import de.lvm.tx.Event; -import de.lvm.tx.Command; -import de.lvm.tx.Command.Action; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -17,14 +14,10 @@ import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; -import javax.swing.*; -import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Arrays; import java.util.Properties; -import static de.lvm.tx.Event.Type.*; @Component public class OutboxConsumer implements ApplicationRunner, Runnable { @@ -78,30 +71,6 @@ public class OutboxConsumer implements ApplicationRunner, Runnable { { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { - byte code = record.headers().lastHeader("messageType").value()[0]; - Action action = Action.from(code); - - if (action == null) - { - LOG.debug("Ignoring unknown action {} for {}", code, record.value()); - continue; - } - - switch(action) { - case SAVE_DLZ: - dlzSaveReceived(toCommand(record.value())); - continue; - default: - LOG.debug("Ignoring message {}", record.value()); - } - byte[] bytes = record.headers().lastHeader("messageType").value(); - String type = new String(bytes, StandardCharsets.UTF_8); - - if (type.endsWith("DlzAction")) { - dlzSaveReceived(toCommand(record.value())); - continue; - } - LOG.debug("Ignoring command {}", record.value()); } } @@ -123,35 +92,6 @@ public class OutboxConsumer implements ApplicationRunner, Runnable { } } - public Command toCommand(String message) throws IOException { - Command command = mapper.readValue(message, Command.class); - LOG.info("{}: {}", command.getAction(), command.getVorgangId()); - return command; - } - - public void dlzSaveReceived(Command command) throws InterruptedException { - try - { - String result = - service.bearbeiteVorgang( - command.getVorgangId(), - command.getVbId(), - command.getData()); - reply(command, result); - } - catch (Exception e) { - LOG.error("Exception during processing!", e); - } - } - - public void reply(Command command, String message) { - String vorgangId = command.getVorgangId(); - String vbId = command.getVbId(); - Event event = new Event(DLZ_SAVED, vorgangId, vbId); - event.getZustand().put(Event.DLZ, message); - sender.send(event); - } - @Override public void run(ApplicationArguments args) {