echo -n Bereits konfiguriert:
cat INITIALIZED
kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+ kafka-topics --bootstrap-server kafka:9092 --describe --topic state
else
kafka-topics --bootstrap-server kafka:9092 \
--delete \
--if-exists \
--topic test
+ kafka-topics --bootstrap-server kafka:9092 \
+ --delete \
+ --if-exists \
+ --topic state \
+
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic test \
--config min.insync.replicas=2 \
&& echo Das Topic \'test\' wurde erfolgreich angelegt: \
&& kafka-topics --bootstrap-server kafka:9092 --describe --topic test \
- && date > INITIALIZED
+ && \
+ kafka-topics --bootstrap-server kafka:9092 \
+ --create \
+ --topic state \
+ --partitions 2 \
+ --replication-factor 3 \
+ --config min.insync.replicas=2 \
+ --config cleanup.policy=compact \
+ && echo Das Topic \'state\' wurde erfolgreich angelegt: \
+ && kafka-topics --bootstrap-server kafka:9092 --describe --topic state \
+ && \
+ date > INITIALIZED
fi
stop_grace_period: 0s
depends_on:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.StickyAssignor;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Bean
public ExampleConsumer exampleConsumer(
Consumer<String, String> kafkaConsumer,
+ Producer<String, Long> kafkaProducer,
ApplicationProperties properties)
{
return
new ExampleConsumer(
properties.getClientId(),
properties.getConsumerProperties().getTopic(),
- kafkaConsumer);
+ kafkaConsumer,
+ properties.getProducerProperties().getTopic(),
+ kafkaProducer);
}
@Bean
return new KafkaConsumer<>(props);
}
+
+ @Bean
+ public KafkaProducer<String, Long> kafkaProducer(ApplicationProperties properties)
+ {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("client.id", properties.getClientId());
+ props.put("acks", properties.getProducerProperties().getAcks());
+ props.put("batch.size", properties.getProducerProperties().getBatchSize());
+ props.put("metadata.maxage.ms", 5000); // 5 Sekunden
+ props.put("delivery.timeout.ms", 20000); // 20 Sekunden
+ props.put("request.timeout.ms", 10000); // 10 Sekunden
+ props.put("linger.ms", properties.getProducerProperties().getLingerMs());
+ props.put("compression.type", properties.getProducerProperties().getCompressionType());
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", LongSerializer.class.getName());
+
+ return new KafkaProducer<>(props);
+ }
}
@NotNull
private ConsumerProperties consumer;
+ @NotNull
+ private ProducerProperties producer;
public ConsumerProperties getConsumerProperties()
return consumer;
}
+ public ProducerProperties getProducerProperties()
+ {
+ return producer;
+ }
+
@Validated
@Getter
enum OffsetReset { latest, earliest, none }
}
+
+ @Validated
+ @Getter
+ @Setter
+ static class ProducerProperties
+ {
+ @NotNull
+ @NotEmpty
+ private String topic;
+ @NotNull
+ @NotEmpty
+ private String acks;
+ @NotNull
+ private Integer batchSize;
+ @NotNull
+ private Integer lingerMs;
+ @NotNull
+ @NotEmpty
+ private String compressionType;
+ }
}
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Phaser;
@Slf4j
-public class ExampleConsumer implements Runnable
+public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
{
private final String id;
private final String topic;
private final Thread workerThread;
private final Map<String, Long> counterState = new HashMap<>();
+ private final String stateTopic;
+ Producer<String, Long> producer;
private volatile boolean running = false;
+ private final Phaser phaser = new Phaser(1);
+ private volatile int[] seen;
+ private volatile int[] acked;
+ private volatile boolean[] done;
private long consumed = 0;
public ExampleConsumer(
String clientId,
String topic,
- Consumer<String, String> consumer)
+ Consumer<String, String> consumer,
+ String stateTopic,
+ Producer<String, Long> producer)
{
this.id = clientId;
this.topic = topic;
this.consumer = consumer;
+ this.stateTopic = stateTopic;
+ this.producer = producer;
workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
workerThread.start();
{
try
{
+ log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
+ int numPartitions = consumer.partitionsFor(topic).size();
+ log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+ seen = new int[numPartitions];
+ acked = new int[numPartitions];
+ done = new boolean[numPartitions];
+
log.info("{} - Subscribing to topic {}", id, topic);
consumer.subscribe(Arrays.asList(topic));
running = true;
consumer.poll(Duration.ofSeconds(1));
log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<String, String> record : records)
- {
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
- }
+ records
+ .partitions()
+ .forEach(partition ->
+ {
+ seen[partition.partition()] = 0;
+ acked[partition.partition()] = 0;
+ done[partition.partition()] = false;
+
+ for (ConsumerRecord<String, String> record : records)
+ {
+ handleRecord(
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.key(),
+ record.value());
+ }
+
+ done[partition.partition()] = true;
+ });
+
+ int phase = phaser.arriveAndAwaitAdvance();
+ log.info("{} - Phase {} is done!", id, phase);
}
}
catch(WakeupException e)
log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
Long counter = computeCount(key);
log.info("{} - current value for counter {}: {}", id, key, counter);
+ sendCounterState(partition, key, counter);
}
private synchronized Long computeCount(String key)
return counterState;
}
+ void sendCounterState(int partition, String key, Long counter)
+ {
+ seen[partition]++;
+ ProducerRecord<String, Long> record = new ProducerRecord<>(stateTopic, key, counter);
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (exception == null)
+ {
+ acked[partition]++;
+ if (done[partition] && !(acked[partition] < seen[partition]))
+ {
+ phaser.arrive();
+ }
+ }
+ else
+ {
+ // Errors are ignored (for now):
+ // The next occurrence of the key will issue a new update of the counter state
+ log.error("{} - {}", id, exception.toString());
+ }
+ }));
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ phaser.bulkRegister(partitions.size());
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(partition -> phaser.arriveAndDeregister());
+ }
+
public void shutdown() throws InterruptedException
{
log.info("{} joining the worker-thread...", id);
topic: test
auto-offset-reset: earliest
auto-commit-interval: 5s
+ producer:
+ topic: state
+ acks: -1
+ batch-size: 16384
+ linger-ms: 0
+ compression-type: gzip
management:
endpoint:
shutdown:
topic: ${juplo.consumer.topic}
auto-offset-reset: ${juplo.consumer.auto-offset-reset}
auto-commit-interval: ${juplo.consumer.auto-commit-interval}
+ producer:
+ topic: ${juplo.producer.topic}
+ acks: ${juplo.producer.acks}
+ batch-size: ${juplo.producer.batch-size}
+ linger-ms: ${juplo.producer.linger-ms}
+ compression-type: ${juplo.producer.compression-type}
logging:
level:
root: INFO
import java.time.Duration;
-import static de.juplo.kafka.ApplicationTests.PARTITIONS;
-import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static de.juplo.kafka.ApplicationTests.*;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest",
- "juplo.consumer.topic=" + TOPIC })
+ "juplo.consumer.topic=" + TOPIC_IN})
@AutoConfigureMockMvc
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS)
public class ApplicationTests
{
- static final String TOPIC = "FOO";
+ static final String TOPIC_IN = "FOO";
+ static final String TOPIC_OUT = "BAR";
static final int PARTITIONS = 10;
@Autowired