</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>endless-producer</artifactId>
- <name>Endless Producer: a Simple Producer that endlessly writes numbers into a topic</name>
+ <artifactId>endless-long-producer</artifactId>
+ <name>Endless Long-Producer</name>
+ <description>A Simple Producer that endlessly writes numbers into a topic (as long's!)</description>
<version>1.0-SNAPSHOT</version>
<dependencies>
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import javax.annotation.PreDestroy;
private final String id;
private final String topic;
private final int throttleMs;
- private final KafkaProducer<String, String> producer;
+ private final KafkaProducer<String, Long> producer;
private boolean running = false;
private long i = 0;
props.put("client.id", clientId);
props.put("acks", acks);
props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", LongSerializer.class.getName());
this.producer = new KafkaProducer<>(props);
}
{
for (; running; i++)
{
- send(Long.toString(i%10), Long.toString(i));
+ send(Long.toString(i%10), i);
if (throttleMs > 0)
{
}
}
- void send(String key, String value)
+ void send(String key, Long value)
{
final long time = System.currentTimeMillis();
- final ProducerRecord<String, String> record = new ProducerRecord<>(
+ final ProducerRecord<String, Long> record = new ProducerRecord<>(
topic, // Topic
key, // Key
value // Value