projects
/
demos
/
kafka
/
training
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
057035f
)
Der Producer versendet direkt in der for-Schleife
author
Kai Moritz
<kai@juplo.de>
Sat, 12 Nov 2022 11:32:11 +0000
(12:32 +0100)
committer
Kai Moritz
<kai@juplo.de>
Sat, 12 Nov 2022 11:32:11 +0000
(12:32 +0100)
src/main/java/de/juplo/kafka/Application.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/Application.java
b/src/main/java/de/juplo/kafka/Application.java
index
9781a7a
..
13ccd8f
100644
(file)
--- a/
src/main/java/de/juplo/kafka/Application.java
+++ b/
src/main/java/de/juplo/kafka/Application.java
@@
-11,7
+11,6
@@
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFuture;
-
@Slf4j
@SpringBootApplication
public class Application implements ApplicationRunner
@Slf4j
@SpringBootApplication
public class Application implements ApplicationRunner
@@
-19,32
+18,25
@@
public class Application implements ApplicationRunner
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
-
- void send(String key, String value)
- {
- ListenableFuture<SendResult<String, String>> listenableFuture =
- kafkaTemplate.send("test", key, value);
-
- listenableFuture.addCallback(
- result -> log.info(
- "Sent {}={} to partition={}, offset={}",
- result.getProducerRecord().key(),
- result.getProducerRecord().value(),
- result.getRecordMetadata().partition(),
- result.getRecordMetadata().offset()),
- e -> log.error("ERROR sendig message", e));
- }
-
@Override
public void run(ApplicationArguments args)
{
for (int i = 0; i < 100; i++)
{
@Override
public void run(ApplicationArguments args)
{
for (int i = 0; i < 100; i++)
{
- send(Long.toString(i%10), Long.toString(i));
+ ListenableFuture<SendResult<String, String>> listenableFuture =
+ kafkaTemplate.send("test", Long.toString(i%10), Long.toString(i));
+
+ listenableFuture.addCallback(
+ result -> log.info(
+ "Sent {}={} to partition={}, offset={}",
+ result.getProducerRecord().key(),
+ result.getProducerRecord().value(),
+ result.getRecordMetadata().partition(),
+ result.getRecordMetadata().offset()),
+ e -> log.error("ERROR sendig message", e));
}
}
}
}
-
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);