Vorlage rest-producer--vorlage rest-producer--vorlage---lvm-2-tage--easy-path
authorKai Moritz <kai@juplo.de>
Tue, 22 Nov 2022 21:01:41 +0000 (22:01 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 6 Dec 2022 19:03:31 +0000 (20:03 +0100)
README.sh
pom.xml
src/main/java/de/juplo/kafka/RestProducer.java

index 3dc3476..b806289 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -29,15 +29,9 @@ docker-compose up -d producer
 
 while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
 
-# tag::success[]
 echo -n 'Hallo Welt!' | http -v :8080/foo
-# end::success[]
 
-# tag::failure[]
 dd if=/dev/zero bs=1024 count=1024  | http -v :8080/bar
-# end::failure[]
 
-# tag::timeout[]
 docker-compose stop kafka-1 kafka-2 kafka-3
 echo -n 'Hallo again...' | http -v --timeout 30 :8080/foo
-# end::timeout[]
diff --git a/pom.xml b/pom.xml
index e7ea677..544b236 100644 (file)
--- a/pom.xml
+++ b/pom.xml
           </execution>
         </executions>
       </plugin>
-      <plugin>
-        <groupId>pl.project13.maven</groupId>
-        <artifactId>git-commit-id-plugin</artifactId>
-      </plugin>
       <plugin>
         <groupId>io.fabric8</groupId>
         <artifactId>docker-maven-plugin</artifactId>
index debe366..6e709ef 100644 (file)
@@ -36,58 +36,14 @@ public class RestProducer
 
     final ProducerRecord<String, String> record = new ProducerRecord<>(
         topic,  // Topic
-        partition, // Partition
         key,    // Key
         value   // Value
     );
 
-    record.headers().add("source", id.getBytes());
-    if (correlationId != null)
-    {
-      record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
-    }
-
-    producer.send(record, (metadata, e) ->
-    {
-      long now = System.currentTimeMillis();
-      if (e == null)
-      {
-        // HANDLE SUCCESS
-        produced++;
-        result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
-        log.debug(
-            "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
-            id,
-            record.key(),
-            record.value(),
-            metadata.partition(),
-            metadata.offset(),
-            metadata.timestamp(),
-            now - time
-        );
-      }
-      else
-      {
-        // HANDLE ERROR
-        result.setErrorResult(new ProduceFailure(e));
-        log.error(
-            "{} - ERROR key={} timestamp={} latency={}ms: {}",
-            id,
-            record.key(),
-            metadata == null ? -1 : metadata.timestamp(),
-            now - time,
-            e.toString()
-        );
-      }
-    });
-
-    long now = System.currentTimeMillis();
-    log.trace(
-        "{} - Queued message with key={} latency={}ms",
-        id,
-        record.key(),
-        now - time
-    );
+    // TODO: Nachricht versenden und Feedback geben
+    // Tipp:
+    // result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
+    // result.setErrorResult(new ProduceFailure(e));
 
     return result;
   }