{
DeferredResult<ProduceResult> result = new DeferredResult<>();
- // TODO: Ergänzen Sie die Logik Ihres REST-Producers und
- // ergänzen sie die versendten Nachrichten um die Header
+ final long time = System.currentTimeMillis();
+
+ final ProducerRecord<String, String> record = new ProducerRecord<>(
+ topic, // Topic
+ partition, // Partition
+ key, // Key
+ value // Value
+ );
+
- record.headers().add("source", id.getBytes());
- if (correlationId != null)
- {
- record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
- }
++ // TODO: Fügen Sie die Header zu der Nachricht hinzu
+
+ producer.send(record, (metadata, e) ->
+ {
+ long now = System.currentTimeMillis();
+ if (e == null)
+ {
+ // HANDLE SUCCESS
+ produced++;
+ result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
+ log.debug(
+ "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+ id,
+ record.key(),
+ record.value(),
+ metadata.partition(),
+ metadata.offset(),
+ metadata.timestamp(),
+ now - time
+ );
+ }
+ else
+ {
+ // HANDLE ERROR
+ result.setErrorResult(new ProduceFailure(e));
+ log.error(
+ "{} - ERROR key={} timestamp={} latency={}ms: {}",
+ id,
+ record.key(),
+ metadata == null ? -1 : metadata.timestamp(),
+ now - time,
+ e.toString()
+ );
+ }
+ });
+
+ long now = System.currentTimeMillis();
+ log.trace(
+ "{} - Queued message with key={} latency={}ms",
+ id,
+ record.key(),
+ now - time
+ );
return result;
}