import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@Autowired
ConfigurableApplicationContext context;
- ListenableFuture<Integer> consumerJob;
+ CompletableFuture<Void> consumerJob;
@Override
public void run(ApplicationArguments args)
{
log.info("Starting ExampleProducer");
- consumerJob = taskExecutor.submitListenable(exampleProducer);
- consumerJob.addCallback(
- exitStatus ->
- {
- log.info("ExampleProducer exited normally, exit-status: {}", exitStatus);
- SpringApplication.exit(context, () -> exitStatus);
- },
- t ->
- {
- log.error("ExampleProducer exited abnormally!", t);
- SpringApplication.exit(context, () -> 2);
- });
+ consumerJob = taskExecutor.submitCompletable(exampleProducer);
+ consumerJob.thenAccept(none -> SpringApplication.exit(context, () -> 0));
+ consumerJob.exceptionally(t ->
+ {
+ log.error("ExampleProducer exited abnormally!", t);
+ SpringApplication.exit(context, () -> 1);
+ return null;
+ });
}
@Slf4j
@RequiredArgsConstructor
-public class ExampleProducer implements Callable<Integer>
+public class ExampleProducer implements Runnable
{
private final String id;
private final String topic;
@Override
- public Integer call()
+ public void run()
{
long i = 0;
catch (Exception e)
{
log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced);
- return 1;
+ throw new RuntimeException(e);
}
finally
{
producer.close();
log.info("{}: Produced {} messages in total, exiting!", id, produced);
}
-
- return 0;
}
void send(String key, String value)