Fehlerkorrektur aus first-contact gemerged
authorKai Moritz <kai@juplo.de>
Fri, 25 Mar 2022 14:28:21 +0000 (15:28 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 25 Mar 2022 14:28:21 +0000 (15:28 +0100)
1  2 
src/main/java/de/juplo/kafka/EndlessProducer.java

index 33a3815,0000000..7a5b324
mode 100644,000000..100644
--- /dev/null
@@@ -1,178 -1,0 +1,178 @@@
-         "test", // Topic
 +package de.juplo.kafka;
 +
 +import lombok.extern.slf4j.Slf4j;
 +import org.apache.kafka.clients.producer.KafkaProducer;
 +import org.apache.kafka.clients.producer.ProducerRecord;
 +import org.apache.kafka.common.serialization.StringSerializer;
 +
 +import javax.annotation.PreDestroy;
 +import java.util.Properties;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
 +
 +
 +@Slf4j
 +public class EndlessProducer implements Runnable
 +{
 +  private final ExecutorService executor;
 +  private final String id;
 +  private final String topic;
 +  private final int throttleMs;
 +  private final KafkaProducer<String, String> producer;
 +
 +  private boolean running = false;
 +  private long i = 0;
 +  private long produced = 0;
 +
 +  public EndlessProducer(
 +      ExecutorService executor,
 +      String bootstrapServer,
 +      String clientId,
 +      String topic,
 +      String acks,
 +      int throttleMs)
 +  {
 +    this.executor = executor;
 +    this.id = clientId;
 +    this.topic = topic;
 +    this.throttleMs = throttleMs;
 +
 +    Properties props = new Properties();
 +    props.put("bootstrap.servers", bootstrapServer);
 +    props.put("client.id", clientId);
 +    props.put("acks", acks);
 +    props.put("key.serializer", StringSerializer.class.getName());
 +    props.put("value.serializer", StringSerializer.class.getName());
 +
 +    this.producer = new KafkaProducer<>(props);
 +  }
 +
 +  @Override
 +  public void run()
 +  {
 +    try
 +    {
 +      for (; running; i++)
 +      {
 +        send(Long.toString(i%10), Long.toString(i));
 +
 +        if (throttleMs > 0)
 +        {
 +          try
 +          {
 +            Thread.sleep(throttleMs);
 +          }
 +          catch (InterruptedException e)
 +          {
 +            log.warn("{} - Interrupted while throttling!", e);
 +          }
 +        }
 +      }
 +
 +      log.info("{} - Done", id);
 +    }
 +    catch (Exception e)
 +    {
 +      log.error("{} - Unexpected Exception:", id, e);
 +    }
 +    finally
 +    {
 +      synchronized (this)
 +      {
 +        running = false;
 +        log.info("{} - Stopped - produced {} messages so far", id, produced);
 +      }
 +    }
 +  }
 +
 +  void send(String key, String value)
 +  {
 +    final long time = System.currentTimeMillis();
 +
 +    final ProducerRecord<String, String> record = new ProducerRecord<>(
++        topic,  // Topic
 +        key,    // Key
 +        value   // Value
 +    );
 +
 +    producer.send(record, (metadata, e) ->
 +    {
 +      long now = System.currentTimeMillis();
 +      if (e == null)
 +      {
 +        // HANDLE SUCCESS
 +        produced++;
 +        log.debug(
 +            "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
 +            id,
 +            record.key(),
 +            record.value(),
 +            metadata.partition(),
 +            metadata.offset(),
 +            metadata.timestamp(),
 +            now - time
 +        );
 +      }
 +      else
 +      {
 +        // HANDLE ERROR
 +        log.error(
 +            "{} - ERROR key={} timestamp={} latency={}ms: {}",
 +            id,
 +            record.key(),
 +            metadata == null ? -1 : metadata.timestamp(),
 +            now - time,
 +            e.toString()
 +        );
 +      }
 +    });
 +
 +    long now = System.currentTimeMillis();
 +    log.trace(
 +        "{} - Queued #{} key={} latency={}ms",
 +        id,
 +        value,
 +        record.key(),
 +        now - time
 +    );
 +  }
 +
 +  public synchronized void start()
 +  {
 +    if (running)
 +      throw new IllegalStateException("Producer instance " + id + " is already running!");
 +
 +    log.info("{} - Starting - produced {} messages before", id, produced);
 +    running = true;
 +    executor.submit(this);
 +  }
 +
 +  public synchronized void stop() throws ExecutionException, InterruptedException
 +  {
 +    if (!running)
 +      throw new IllegalStateException("Producer instance " + id + " is not running!");
 +
 +    log.info("{} - Stopping...", id);
 +    running = false;
 +  }
 +
 +  @PreDestroy
 +  public void destroy() throws ExecutionException, InterruptedException
 +  {
 +    log.info("{} - Destroy!", id);
 +    try
 +    {
 +      stop();
 +    }
 +    catch (IllegalStateException e)
 +    {
 +      log.info("{} - Was already stopped", id);
 +    }
 +    finally
 +    {
 +      log.info("{} - Closing the KafkaProducer", id);
 +      producer.close();
 +      log.info("{}: Produced {} messages in total, exiting!", id, produced);
 +    }
 +  }
 +}