Korrekturen & Verbesserungen aus producer-supersimple übernommen
[demos/kafka/training] / src / main / java / de / juplo / kafka / Application.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.boot.ApplicationArguments;
6 import org.springframework.boot.ApplicationRunner;
7 import org.springframework.boot.SpringApplication;
8 import org.springframework.boot.autoconfigure.SpringBootApplication;
9 import org.springframework.kafka.core.KafkaTemplate;
10 import org.springframework.kafka.support.SendResult;
11 import org.springframework.util.concurrent.ListenableFuture;
12
13
14 @Slf4j
15 @SpringBootApplication
16 public class Application implements ApplicationRunner
17 {
18   @Autowired
19   KafkaTemplate<String, String> kafkaTemplate;
20
21   @Override
22   public void run(ApplicationArguments args)
23   {
24     for (int i = 0; true; i++)
25     {
26       ListenableFuture<SendResult<String, String>> listenableFuture =
27           kafkaTemplate.send("test", Long.toString(i%10), Long.toString(i));
28
29       listenableFuture.addCallback(
30           result -> log.info(
31               "Sent {}={} to partition={}, offset={}",
32               result.getProducerRecord().key(),
33               result.getProducerRecord().value(),
34               result.getRecordMetadata().partition(),
35               result.getRecordMetadata().offset()),
36           e -> log.error("ERROR sendig message", e));
37
38       try
39       {
40         Thread.sleep(500);
41       }
42       catch (InterruptedException e)
43       {
44         return;
45       }
46     }
47   }
48
49   public static void main(String[] args)
50   {
51     SpringApplication.run(Application.class, args);
52   }
53 }