@Slf4j
public class ExampleProducer
{
- private final String id;
- private final String topic;
- private final Producer<String, String> producer;
+ private String id;
+ private String topic;
+ private Producer<String, String> producer;
private volatile boolean running = true;
private volatile boolean done = false;
private long produced = 0;
- public ExampleProducer(
- String broker,
- String topic,
- String clientId)
+ public static void main(String[] args) throws Exception
{
+ String broker = "localhost:9092";
+ String clientId = "DEV";
+ String topic = "test";
+
Properties props = new Properties();
props.put("bootstrap.servers", broker);
props.put("client.id", clientId); // Nur zur Wiedererkennung
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
- this.id = clientId;
- this.topic = topic;
- producer = new KafkaProducer<>(props);
- }
+ Producer<String, String> producer = new KafkaProducer<>(props);
- public void run()
- {
+ String id = clientId;
long i = 0;
+ boolean running = true;
+ boolean done = false;
+ long produced = 0;
try
{
for (; running; i++)
{
- send(Long.toString(i%10), Long.toString(i));
+ final ProducerRecord<String, String> record = new ProducerRecord<>(
+ topic, // Topic
+ Long.toString(i%10), // Key
+ Long.toString(i) // Value
+ );
+
+ producer.send(record);
Thread.sleep(500);
}
}
- catch (Exception e)
- {
- log.error("{} - Unexpected error!", id, e);
- }
finally
{
- log.info("{}: Closing the KafkaProducer", id);
- producer.close();
log.info("{}: Produced {} messages in total, exiting!", id, produced);
- done = true;
}
}
-
- void send(String key, String value)
- {
- final long time = System.currentTimeMillis();
-
- final ProducerRecord<String, String> record = new ProducerRecord<>(
- topic, // Topic
- key, // Key
- value // Value
- );
-
- producer.send(record, (metadata, e) ->
- {
- long now = System.currentTimeMillis();
- if (e == null)
- {
- // HANDLE SUCCESS
- log.debug(
- "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms",
- id,
- key,
- value,
- metadata.partition(),
- metadata.offset(),
- metadata.timestamp(),
- now - time
- );
- }
- else
- {
- // HANDLE ERROR
- log.error(
- "{} - ERROR for message {}={}, latency={}ms: {}",
- id,
- key,
- value,
- now - time,
- e.toString()
- );
- }
- });
-
- long now = System.currentTimeMillis();
- produced++;
- log.trace(
- "{} - Queued message {}={}, latency={}ms",
- id,
- key,
- value,
- now - time
- );
- }
-
-
- public static void main(String[] args) throws Exception
- {
- if (args.length != 3)
- {
- log.error("Three arguments required!");
- log.error("arg[0]: Broker-Address");
- log.error("arg[1]: Topic");
- log.error("arg[2]: Unique Client-ID");
- System.exit(1);
- return;
- }
-
- log.info(
- "Running ExampleProducer: broker={}, topic={}, client-id={}",
- args[0],
- args[1],
- args[2]);
-
- ExampleProducer instance = new ExampleProducer(args[0], args[1], args[2]);
-
- Runtime.getRuntime().addShutdownHook(new Thread(() ->
- {
- instance.running = false;
- while (!instance.done)
- {
- log.info("Waiting for main-thread...");
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {}
- }
- log.info("Shutdown completed.");
- }));
-
- instance.run();
- }
}