From: Kai Moritz Date: Sun, 7 Aug 2022 14:01:43 +0000 (+0200) Subject: Merge der überarbeiteten Compose-Konfiguration (Branch 'headers') X-Git-Tag: headers--vorlage---lvm-2-tage X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=3214484684d0bac86a7574439518f80375c8265d;p=demos%2Fkafka%2Ftraining Merge der überarbeiteten Compose-Konfiguration (Branch 'headers') --- 3214484684d0bac86a7574439518f80375c8265d diff --cc src/main/java/de/juplo/kafka/RestProducer.java index 4c04fd8,b430e35..2f2a1cb --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@@ -54,8 -54,62 +54,58 @@@ public class RestProduce { DeferredResult result = new DeferredResult<>(); - // TODO: Ergänzen Sie die Logik Ihres REST-Producers und - // ergänzen sie die versendten Nachrichten um die Header + final long time = System.currentTimeMillis(); + + final ProducerRecord record = new ProducerRecord<>( + topic, // Topic + partition, // Partition + key, // Key + value // Value + ); + - record.headers().add("source", id.getBytes()); - if (correlationId != null) - { - record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray()); - } ++ // TODO: Fügen Sie die Header zu der Nachricht hinzu + + producer.send(record, (metadata, e) -> + { + long now = System.currentTimeMillis(); + if (e == null) + { + // HANDLE SUCCESS + produced++; + result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); + log.debug( + "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", + id, + record.key(), + record.value(), + metadata.partition(), + metadata.offset(), + metadata.timestamp(), + now - time + ); + } + else + { + // HANDLE ERROR + result.setErrorResult(new ProduceFailure(e)); + log.error( + "{} - ERROR key={} timestamp={} latency={}ms: {}", + id, + record.key(), + metadata == null ? -1 : metadata.timestamp(), + now - time, + e.toString() + ); + } + }); + + long now = System.currentTimeMillis(); + log.trace( + "{} - Queued message with key={} latency={}ms", + id, + record.key(), + now - time + ); return result; }