while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
-# tag::success[]
echo -n 'Hallo Welt!' | http -v :8080/foo
-# end::success[]
-# tag::failure[]
dd if=/dev/zero bs=1024 count=1024 | http -v :8080/bar
-# end::failure[]
-# tag::timeout[]
docker-compose stop kafka-1 kafka-2 kafka-3
echo -n 'Hallo again...' | http -v --timeout 30 :8080/foo
-# end::timeout[]
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>pl.project13.maven</groupId>
- <artifactId>git-commit-id-plugin</artifactId>
- </plugin>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
final ProducerRecord<String, String> record = new ProducerRecord<>(
topic, // Topic
- partition, // Partition
key, // Key
value // Value
);
- record.headers().add("source", id.getBytes());
- if (correlationId != null)
- {
- record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
- }
-
- producer.send(record, (metadata, e) ->
- {
- long now = System.currentTimeMillis();
- if (e == null)
- {
- // HANDLE SUCCESS
- produced++;
- result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
- log.debug(
- "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
- id,
- record.key(),
- record.value(),
- metadata.partition(),
- metadata.offset(),
- metadata.timestamp(),
- now - time
- );
- }
- else
- {
- // HANDLE ERROR
- result.setErrorResult(new ProduceFailure(e));
- log.error(
- "{} - ERROR key={} timestamp={} latency={}ms: {}",
- id,
- record.key(),
- metadata == null ? -1 : metadata.timestamp(),
- now - time,
- e.toString()
- );
- }
- });
-
- long now = System.currentTimeMillis();
- log.trace(
- "{} - Queued message with key={} latency={}ms",
- id,
- record.key(),
- now - time
- );
+ // TODO: Nachricht versenden und Feedback geben
+ // Tipp:
+ // result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
+ // result.setErrorResult(new ProduceFailure(e));
return result;
}