Reorganisierten Code aus first-contact gemerged
authorKai Moritz <kai@juplo.de>
Fri, 25 Mar 2022 12:36:52 +0000 (13:36 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 25 Mar 2022 12:45:48 +0000 (13:45 +0100)
1  2 
src/main/java/de/juplo/kafka/EndlessProducer.java

@@@ -46,76 -27,23 +46,29 @@@ public class EndlessProducer implement
      props.put("key.serializer", StringSerializer.class.getName());
      props.put("value.serializer", StringSerializer.class.getName());
  
 -    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
 -    // end::create[]
 -
 -    this.id = clientId;
 -    this.topic = topic;
 -    this.producer = producer;
 +    this.producer = new KafkaProducer<>(props);
    }
  
 +  @Override
    public void run()
    {
 -    long i = 0;
 -
      try
      {
 -      for (; i < 100 ; i++)
 +      for (; running; i++)
        {
-         final long time = System.currentTimeMillis();
-         final ProducerRecord<String, String> record = new ProducerRecord<>(
-             topic,                 // Topic
-             Long.toString(i % 10), // Key
-             Long.toString(i)       // Value
-         );
-         producer.send(record, (metadata, e) ->
-         {
-           long now = System.currentTimeMillis();
-           if (e == null)
-           {
-             // HANDLE SUCCESS
-             produced++;
-             log.debug(
-                 "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
-                 id,
-                 record.key(),
-                 record.value(),
-                 metadata.partition(),
-                 metadata.offset(),
-                 metadata.timestamp(),
-                 now - time
-             );
-           }
-           else
-           {
-             // HANDLE ERROR
-             log.error(
-                 "{} - ERROR key={} timestamp={} latency={}ms: {}",
-                 id,
-                 record.key(),
-                 metadata == null ? -1 : metadata.timestamp(),
-                 now - time,
-                 e.toString()
-             );
-           }
-         });
-         long now = System.currentTimeMillis();
-         log.trace(
-             "{} - Queued #{} key={} latency={}ms",
-             id,
-             i,
-             record.key(),
-             now - time
-         );
+         send(Long.toString(i%10), Long.toString(i));
 +
 +        if (throttleMs > 0)
 +        {
 +          try
 +          {
 +            Thread.sleep(throttleMs);
 +          }
 +          catch (InterruptedException e)
 +          {
 +            log.warn("{} - Interrupted while throttling!", e);
 +          }
 +        }
        }
  
        log.info("{} - Done", id);
      }
    }
  
+   void send(String key, String value)
+   {
+     final long time = System.currentTimeMillis();
+     final ProducerRecord<String, String> record = new ProducerRecord<>(
+         "test", // Topic
+         key,    // Key
+         value   // Value
+     );
+     producer.send(record, (metadata, e) ->
+     {
+       long now = System.currentTimeMillis();
+       if (e == null)
+       {
+         // HANDLE SUCCESS
+         produced++;
+         log.debug(
+             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+             id,
+             record.key(),
+             record.value(),
+             metadata.partition(),
+             metadata.offset(),
+             metadata.timestamp(),
+             now - time
+         );
+       }
+       else
+       {
+         // HANDLE ERROR
+         log.error(
+             "{} - ERROR key={} timestamp={} latency={}ms: {}",
+             id,
+             record.key(),
+             metadata == null ? -1 : metadata.timestamp(),
+             now - time,
+             e.toString()
+         );
+       }
+     });
+     long now = System.currentTimeMillis();
+     log.trace(
+         "{} - Queued #{} key={} latency={}ms",
+         id,
+         value,
+         record.key(),
+         now - time
+     );
+   }
 +  public synchronized void start()
 +  {
 +    if (running)
 +      throw new IllegalStateException("Producer instance " + id + " is already running!");
 +
 +    log.info("{} - Starting - produced {} messages before", id, produced);
 +    running = true;
 +    future = executor.submit(this);
 +  }
 +
 +  public synchronized void stop() throws ExecutionException, InterruptedException
 +  {
 +    if (!running)
 +      throw new IllegalStateException("Producer instance " + id + " is not running!");
 +
 +    log.info("{} - Stopping...", id);
 +    running = false;
 +    future.get();
 +    log.info("{} - Stopped - produced {} messages so far", id, produced);
 +  }
  
 -  public static void main(String[] args) throws Exception
 +  @PreDestroy
 +  public void destroy() throws ExecutionException, InterruptedException
    {
 -    SimpleProducer producer = new SimpleProducer("P", "test");
 -    producer.run();
 +    log.info("{} - Destroy!", id);
 +    try
 +    {
 +      stop();
 +    }
 +    catch (IllegalStateException e)
 +    {
 +      log.info("{} - Was already stopped", id);
 +    }
 +    finally
 +    {
 +      log.info("{} - Closing the KafkaProducer", id);
 +      producer.close();
 +      log.info("{}: Produced {} messages in total, exiting!", id, produced);
 +    }
    }
  }