Merge der überarbeiteten Compose-Konfiguration (Branch 'headers') headers-vorlage headers--vorlage---lvm-2-tage
authorKai Moritz <kai@juplo.de>
Sun, 7 Aug 2022 14:01:43 +0000 (16:01 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 7 Aug 2022 14:01:43 +0000 (16:01 +0200)
1  2 
src/main/java/de/juplo/kafka/RestProducer.java

@@@ -54,8 -54,62 +54,58 @@@ public class RestProduce
    {
      DeferredResult<ProduceResult> result = new DeferredResult<>();
  
-     // TODO: Ergänzen Sie die Logik Ihres REST-Producers und
-     //       ergänzen sie die versendten Nachrichten um die Header
+     final long time = System.currentTimeMillis();
+     final ProducerRecord<String, String> record = new ProducerRecord<>(
+         topic,  // Topic
+         partition, // Partition
+         key,    // Key
+         value   // Value
+     );
 -    record.headers().add("source", id.getBytes());
 -    if (correlationId != null)
 -    {
 -      record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
 -    }
++    // TODO: Fügen Sie die Header zu der Nachricht hinzu
+     producer.send(record, (metadata, e) ->
+     {
+       long now = System.currentTimeMillis();
+       if (e == null)
+       {
+         // HANDLE SUCCESS
+         produced++;
+         result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
+         log.debug(
+             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+             id,
+             record.key(),
+             record.value(),
+             metadata.partition(),
+             metadata.offset(),
+             metadata.timestamp(),
+             now - time
+         );
+       }
+       else
+       {
+         // HANDLE ERROR
+         result.setErrorResult(new ProduceFailure(e));
+         log.error(
+             "{} - ERROR key={} timestamp={} latency={}ms: {}",
+             id,
+             record.key(),
+             metadata == null ? -1 : metadata.timestamp(),
+             now - time,
+             e.toString()
+         );
+       }
+     });
+     long now = System.currentTimeMillis();
+     log.trace(
+         "{} - Queued message with key={} latency={}ms",
+         id,
+         record.key(),
+         now - time
+     );
  
      return result;
    }