- kafka-3
producer:
- image: juplo/spring-producer:1.0-SNAPSHOT
+ image: juplo/spring-producer:1.0-long-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: producer
command: kafka:9092 test my-group consumer
peter:
- image: juplo/simple-consumer:1.0-SNAPSHOT
- command: kafka:9092 test my-group peter
+ image: juplo/spring-consumer:1.1-long-SNAPSHOT
+ environment:
+ juplo.bootstrap-server: kafka:9092
+ juplo.client-id: peter
+ juplo.consumer.topic: test
ute:
- image: juplo/simple-consumer:1.0-SNAPSHOT
- command: kafka:9092 test my-group ute
+ image: juplo/spring-consumer:1.1-long-SNAPSHOT
+ environment:
+ juplo.bootstrap-server: kafka:9092
+ juplo.client-id: ute
+ juplo.consumer.topic: test
volumes:
zookeeper-data:
<artifactId>spring-producer</artifactId>
<name>Spring Producer</name>
<description>A Simple Producer, based on Spring Boot, that sends messages via Kafka</description>
- <version>1.0-SNAPSHOT</version>
+ <version>1.0-long-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
public ExampleProducer exampleProducer(
ApplicationProperties properties,
KeyGenerator<String> keyGenerator,
- ValueGenerator<String> valueGenerator,
- Producer<String, String> kafkaProducer,
+ ValueGenerator<Long> valueGenerator,
+ Producer<String, Long> kafkaProducer,
ConfigurableApplicationContext applicationContext)
{
return
}
@Bean
- ValueGenerator<String> messageGenerator()
+ ValueGenerator<Long> messageGenerator()
{
- return i -> Long.toString(i);
+ return i -> i;
}
@Bean(destroyMethod = "")
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ public KafkaProducer<String, Long> kafkaProducer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis());
props.put("compression.type", properties.getProducerProperties().getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", LongSerializer.class.getName());
return new KafkaProducer<>(props);
}