From c58cbfe62963850f499fa5fa010101ba6697651e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 22 Nov 2022 22:01:41 +0100 Subject: [PATCH] Vorlage --- README.sh | 6 --- pom.xml | 4 -- .../java/de/juplo/kafka/RestProducer.java | 52 ++----------------- 3 files changed, 4 insertions(+), 58 deletions(-) diff --git a/README.sh b/README.sh index 3dc3476..b806289 100755 --- a/README.sh +++ b/README.sh @@ -29,15 +29,9 @@ docker-compose up -d producer while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done -# tag::success[] echo -n 'Hallo Welt!' | http -v :8080/foo -# end::success[] -# tag::failure[] dd if=/dev/zero bs=1024 count=1024 | http -v :8080/bar -# end::failure[] -# tag::timeout[] docker-compose stop kafka-1 kafka-2 kafka-3 echo -n 'Hallo again...' | http -v --timeout 30 :8080/foo -# end::timeout[] diff --git a/pom.xml b/pom.xml index e7ea677..544b236 100644 --- a/pom.xml +++ b/pom.xml @@ -78,10 +78,6 @@ - - pl.project13.maven - git-commit-id-plugin - io.fabric8 docker-maven-plugin diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index debe366..6e709ef 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -36,58 +36,14 @@ public class RestProducer final ProducerRecord record = new ProducerRecord<>( topic, // Topic - partition, // Partition key, // Key value // Value ); - record.headers().add("source", id.getBytes()); - if (correlationId != null) - { - record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray()); - } - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - produced++; - result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); - log.debug( - "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - result.setErrorResult(new ProduceFailure(e)); - log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", - id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued message with key={} latency={}ms", - id, - record.key(), - now - time - ); + // TODO: Nachricht versenden und Feedback geben + // Tipp: + // result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); + // result.setErrorResult(new ProduceFailure(e)); return result; } -- 2.20.1