import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
-import java.util.List;
-
@Slf4j
@SpringBootApplication
public class Application implements ApplicationRunner
{
- public final static String ARG_NUM = "num";
-
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
-
- void send(String key, String value)
- {
- ListenableFuture<SendResult<String, String>> listenableFuture =
- kafkaTemplate.sendDefault(key, value);
-
- listenableFuture.addCallback(
- result -> log.debug(
- "Sent {}={} to partition={}, offset={}",
- result.getProducerRecord().key(),
- result.getProducerRecord().value(),
- result.getRecordMetadata().partition(),
- result.getRecordMetadata().offset()),
- e -> log.error("ERROR sendig message", e));
- }
-
@Override
public void run(ApplicationArguments args)
{
- int num = 10;
-
- if (args.containsOption(ARG_NUM))
- {
- List<String> numArgs = args.getOptionValues(ARG_NUM);
- if (numArgs.size() > 1)
- {
- log.error(
- "Only one occurence of argument {} is allowed, but found: {}",
- ARG_NUM,
- numArgs.size());
- return;
- }
-
- try
- {
- num = Integer.parseInt(numArgs.get(0));
- }
- catch (NumberFormatException e)
- {
- log.error("{} is not a number: {}", numArgs.get(0), e.getMessage());
- }
- }
-
- for (int i = 0; i < num; i++)
+ for (int i = 0; i < 100; i++)
{
- send(Long.toString(i%10), Long.toString(i));
- try
- {
- Thread.sleep(500);
- }
- catch (InterruptedException e)
- {
- log.info("Interrupted after sending {} messages", i);
- return;
- }
+ // tag::callback[]
+ ListenableFuture<SendResult<String, String>> listenableFuture =
+ kafkaTemplate.send("test", Long.toString(i%10), Long.toString(i));
+
+ listenableFuture.addCallback(
+ result -> log.info(
+ "Sent {}={} to partition={}, offset={}",
+ result.getProducerRecord().key(),
+ result.getProducerRecord().value(),
+ result.getRecordMetadata().partition(),
+ result.getRecordMetadata().offset()),
+ e -> log.error("ERROR sendig message", e));
+ // end::callback[]
}
}
-
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);