@Bean
public ExampleProducer exampleProducer(
ApplicationProperties properties,
+ KeyGenerator<String> keyGenerator,
+ ValueGenerator<String> valueGenerator,
Producer<String, String> kafkaProducer,
ConfigurableApplicationContext applicationContext)
{
properties.getProducerProperties().getThrottle() == null
? Duration.ofMillis(500)
: properties.getProducerProperties().getThrottle(),
+ keyGenerator,
+ valueGenerator,
kafkaProducer,
() -> applicationContext.close());
}
+ @Bean
+ KeyGenerator<String> keyGenerator()
+ {
+ return i -> Long.toString(i%10);
+ }
+
+ @Bean
+ ValueGenerator<String> messageGenerator()
+ {
+ return i -> Long.toString(i);
+ }
+
@Bean(destroyMethod = "")
public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
{
@Slf4j
-public class ExampleProducer implements Runnable
+public class ExampleProducer<K, V> implements Runnable
{
private final String id;
private final String topic;
private final Duration throttle;
- private final Producer<String, String> producer;
+ private final KeyGenerator<K> keyGenerator;
+ private final ValueGenerator<V> valueGenerator;
+ private final Producer<K, V> producer;
private final Thread workerThread;
private final Runnable closeCallback;
String id,
String topic,
Duration throttle,
- Producer<String, String> producer,
+ KeyGenerator<K> keyGenerator,
+ ValueGenerator<V> valueGenerator,
+ Producer<K, V> producer,
Runnable closeCallback)
{
this.id = id;
this.topic = topic;
this.throttle = throttle;
+ this.keyGenerator = keyGenerator;
+ this.valueGenerator = valueGenerator;
this.producer = producer;
workerThread = new Thread(this, "ExampleProducer Worker-Thread");
{
for (; running; i++)
{
- send(Long.toString(i%10), Long.toString(i));
+ send(keyGenerator.generateKeyFor(i), valueGenerator.generateValueFor(i));
if (throttle.isPositive())
{
}
}
- void send(String key, String value)
+ void send(K key, V value)
{
final long time = System.currentTimeMillis();
- final ProducerRecord<String, String> record = new ProducerRecord<>(
+ final ProducerRecord<K, V> record = new ProducerRecord<>(
topic, // Topic
key, // Key
value // Value