- Properties props = new Properties();
- props.put("bootstrap.servers", bootstrapServer);
- props.put("client.id", clientId);
- props.put("acks", acks);
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- this.producer = new KafkaProducer<>(props);
- }
-
- @Override
- public void run()
- {
- try
- {
- for (; running; i++)
- {
- send(Long.toString(i%10), Long.toString(i));
-
- if (throttleMs > 0)
- {
- try
- {
- Thread.sleep(throttleMs);
- }
- catch (InterruptedException e)
- {
- log.warn("{} - Interrupted while throttling!", e);
- }
- }
- }
-
- log.info("{} - Done", id);
- }
- catch (Exception e)
- {
- log.error("{} - Unexpected Exception:", id, e);
- }
- finally