import org.apache.kafka.clients.consumer.StickyAssignor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@Bean
public ExampleConsumer exampleConsumer(
Consumer<String, String> kafkaConsumer,
- Producer<String, Long> kafkaProducer,
+ Producer<String, String> kafkaProducer,
ApplicationProperties properties,
ConfigurableApplicationContext applicationContext)
{
}
@Bean
- public KafkaProducer<String, Long> kafkaProducer(ApplicationProperties properties)
+ public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("linger.ms", properties.getProducerProperties().getLingerMs());
props.put("compression.type", properties.getProducerProperties().getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", LongSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
private final Map<String, Long> counterState = new HashMap<>();
private final String stateTopic;
- private final Producer<String, Long> producer;
+ private final Producer<String, String> producer;
private volatile boolean running = false;
private final Phaser phaser = new Phaser(1);
String topic,
Consumer<String, String> consumer,
String stateTopic,
- Producer<String, Long> producer,
+ Producer<String, String> producer,
Runnable closeCallback)
{
this.id = clientId;
void sendCounterState(int partition, String key, Long counter)
{
seen[partition]++;
- ProducerRecord<String, Long> record = new ProducerRecord<>(stateTopic, key, counter);
+ ProducerRecord<String, String> record = new ProducerRecord<>(stateTopic, key, counter.toString());
producer.send(record, ((metadata, exception) ->
{
if (exception == null)