#!/bin/bash
-IMAGE=juplo/spring-producer:2.0-SNAPSHOT
+IMAGE=juplo/spring-producer:2.0-long-SNAPSHOT
if [ "$1" = "cleanup" ]
then
- kafka-3
producer:
- image: juplo/spring-producer:2.0-SNAPSHOT
+ image: juplo/spring-producer:2.0-long-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: producer
juplo.producer.topic: test
consumer:
- image: juplo/simple-consumer:1.0-SNAPSHOT
- command: kafka:9092 test my-group consumer
+ image: juplo/spring-consumer:1.1-SNAPSHOT
+ environment:
+ juplo.bootstrap-server: kafka:9092
+ juplo.client-id: consumer
peter:
- image: juplo/simple-consumer:1.0-SNAPSHOT
- command: kafka:9092 test my-group peter
+ image: juplo/spring-consumer:1.1-long-SNAPSHOT
+ environment:
+ juplo.bootstrap-server: kafka:9092
+ juplo.client-id: peter
ute:
- image: juplo/simple-consumer:1.0-SNAPSHOT
- command: kafka:9092 test my-group ute
+ image: juplo/spring-consumer:1.1-long-SNAPSHOT
+ environment:
+ juplo.bootstrap-server: kafka:9092
+ juplo.client-id: ute
volumes:
zookeeper-data:
<artifactId>spring-producer</artifactId>
<name>Spring Producer</name>
<description>A Simple Producer, based on Spring Boot, that sends messages via Kafka</description>
- <version>2.0-SNAPSHOT</version>
+ <version>2.0-long-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
public ExampleProducer exampleProducer(
@Value("${spring.kafka.client-id}") String clientId,
ApplicationProperties properties,
- Producer<String, String> kafkaProducer,
+ Producer<String, Long> kafkaProducer,
ConfigurableApplicationContext applicationContext)
{
return
private final String id;
private final String topic;
private final Duration throttle;
- private final Producer<String, String> producer;
+ private final Producer<String, Long> producer;
private final Thread workerThread;
private final Runnable closeCallback;
String id,
String topic,
Duration throttle,
- Producer<String, String> producer,
+ Producer<String, Long> producer,
Runnable closeCallback)
{
this.id = id;
{
for (; running; i++)
{
- send(Long.toString(i%10), Long.toString(i));
+ send(Long.toString(i%10), i);
if (throttle.isPositive())
{
}
}
- void send(String key, String value)
+ void send(String key, long value)
{
final long time = System.currentTimeMillis();
- final ProducerRecord<String, String> record = new ProducerRecord<>(
+ final ProducerRecord<String, Long> record = new ProducerRecord<>(
topic, // Topic
key, // Key
value // Value
buffer-memory: 33554432
batch-size: 16384
compression-type: gzip
+ value-serializer: org.apache.kafka.common.serialization.LongSerializer
properties:
metadata.max.age.ms: 5000
request.timeout.ms: 5000