#!/bin/bash
-IMAGE=juplo/endless-consumer:1.0-SNAPSHOT
+IMAGE=juplo/sumup-requests:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
exit
fi
-docker-compose up -d zookeeper kafka cli
+docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli
if [[
$(docker image ls -q $IMAGE) == "" ||
"$1" = "build"
]]
then
- mvn install || exit
+ docker-compose rm -svf requests
+ mvn clean install || exit
else
echo "Using image existing images:"
docker image ls $IMAGE
echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
docker-compose up setup
-docker-compose up -d producer consumer
-sleep 15
-docker-compose stop producer consumer
-docker-compose logs consumer
+docker-compose up -d
+
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done
+while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests..."; sleep 1; done
+
+
+echo 66 | http -v :8080/foo
+
+sleep 5
+
+kafkacat -b :9092 -t out -o 0 -e -f 'p=%p|o=%o|k=%k|v=%s\n'
ports:
- 2181:2181
- kafka:
+ kafka-1:
image: confluentinc/cp-kafka:7.1.3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081
+ KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081
+ KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+ ports:
+ - 9081:9081
+ depends_on:
+ - zookeeper
+
+ kafka-2:
+ image: confluentinc/cp-kafka:7.1.3
+ environment:
+ KAFKA_BROKER_ID: 2
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082
- KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082
+ KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
ports:
- 9092:9082
- 9082:9082
+ networks:
+ default:
+ aliases:
+ - kafka
+ depends_on:
+ - zookeeper
+
+ kafka-3:
+ image: confluentinc/cp-kafka:7.1.3
+ environment:
+ KAFKA_BROKER_ID: 3
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083
+ KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083
+ KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+ ports:
+ - 9083:9083
depends_on:
- zookeeper
image: juplo/toolbox
command: >
bash -c "
- kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
- kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+ kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in
+ kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out
+ kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+ kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 1 --replication-factor 1
+ kafka-topics --bootstrap-server kafka:9092 --describe --topic in
+ kafka-topics --bootstrap-server kafka:9092 --describe --topic out
"
cli:
image: juplo/toolbox
command: sleep infinity
- producer:
- image: juplo/endless-long-producer:1.0-SNAPSHOT
+ gateway:
+ image: juplo/sumup-gateway:1.0-SNAPSHOT
ports:
- 8080:8080
environment:
server.port: 8080
- producer.bootstrap-server: kafka:9092
- producer.client-id: producer
- producer.topic: test
- producer.throttle-ms: 200
-
+ sumup.gateway.bootstrap-server: kafka:9092
+ sumup.gateway.client-id: gateway
+ sumup.gateway.topic: in
- consumer:
- image: juplo/endless-consumer:1.0-SNAPSHOT
+ requests:
+ image: juplo/sumup-requests:1.0-SNAPSHOT
ports:
- 8081:8080
environment:
server.port: 8080
- consumer.bootstrap-server: kafka:9092
- consumer.client-id: consumer
+ sumup.requests.bootstrap-server: kafka:9092
+ sumup.requests.client-id: requests
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>endless-consumer</artifactId>
+ <artifactId>sumup-requests</artifactId>
+ <name>SumUp-Requests</name>
+ <description>A service that reads computation requests from an incomming topic and generates according messages for the SumUp-Consumer on an outgoing topic</description>
<version>1.0-SNAPSHOT</version>
- <name>Endless Consumer: a Simple Consumer-Group that reads and print the topic</name>
<dependencies>
<dependency>
package de.juplo.kafka;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class ApplicationConfiguration
{
@Bean
- public RecordHandler noopRecordHandler()
+ public SumUpRecordHandler sumUpRecordHandler(
+ KafkaProducer<String, String> kafkaProducer,
+ ApplicationProperties properties)
{
- return record -> {};
+ return new SumUpRecordHandler(
+ kafkaProducer,
+ properties.getClientId(),
+ properties.getTopicOut());
}
@Bean
- public EndlessConsumer<String, Long> endlessConsumer(
- KafkaConsumer<String, Long> kafkaConsumer,
+ public EndlessConsumer<String, Integer> endlessConsumer(
+ KafkaConsumer<String, Integer> kafkaConsumer,
ExecutorService executor,
- RecordHandler noopRecordHandler,
+ SumUpRecordHandler sumUpRecordHandler,
ApplicationProperties properties)
{
return
new EndlessConsumer<>(
executor,
properties.getClientId(),
- properties.getTopic(),
+ properties.getTopicIn(),
kafkaConsumer,
- noopRecordHandler);
+ sumUpRecordHandler);
}
@Bean
}
@Bean(destroyMethod = "close")
- public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
+ public KafkaConsumer<String, Integer> kafkaConsumer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
props.put("metadata.max.age.ms", "1000");
props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", LongDeserializer.class.getName());
+ props.put("value.deserializer", IntegerDeserializer.class.getName());
return new KafkaConsumer<>(props);
}
+
+ @Bean(destroyMethod = "close")
+ public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("client.id", properties.getClientId());
+ props.put("acks", properties.getAcks());
+ props.put("batch.size", properties.getBatchSize());
+ props.put("delivery.timeout.ms", 20000); // 20 Sekunden
+ props.put("request.timeout.ms", 10000); // 10 Sekunden
+ props.put("linger.ms", properties.getLingerMs());
+ props.put("compression.type", properties.getCompressionType());
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ return new KafkaProducer<>(props);
+ }
}
@RequiredArgsConstructor
public class ApplicationHealthIndicator implements HealthIndicator
{
- private final EndlessConsumer<String, Long> consumer;
+ private final EndlessConsumer<String, Integer> consumer;
@Override
import java.time.Duration;
-@ConfigurationProperties(prefix = "consumer")
+@ConfigurationProperties(prefix = "sumup.requests")
@Validated
@Getter
@Setter
private String clientId;
@NotNull
@NotEmpty
- private String topic;
+ private String topicIn;
@NotNull
@NotEmpty
private String autoOffsetReset;
@NotNull
private Duration commitInterval;
+ @NotNull
+ @NotEmpty
+ private String topicOut;
+ @NotNull
+ @NotEmpty
+ private String acks;
+ @NotNull
+ private Integer batchSize;
+ @NotNull
+ private Integer lingerMs;
+ @NotNull
+ @NotEmpty
+ private String compressionType;
}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class SumUpRecordHandler implements RecordHandler<String, Integer>
+{
+ private final Producer<String, String> producer;
+ private final String id;
+ private final String topic;
+
+
+ @Override
+ public void accept(ConsumerRecord<String, Integer> record)
+ {
+ String key = record.key();
+ int number = record.value();
+
+ send(key, "START");
+ for (int i = 1; i <= number; i++)
+ {
+ send(key, Integer.toString(i));
+ }
+ send(key, "END");
+ }
+
+ private void send(String key, String value)
+ {
+ final long time = System.currentTimeMillis();
+
+ final ProducerRecord<String, String> record = new ProducerRecord<>(
+ topic, // Topic
+ key, // Key
+ value // Value
+ );
+
+ producer.send(record, (metadata, e) ->
+ {
+ long now = System.currentTimeMillis();
+ if (e == null)
+ {
+ // HANDLE SUCCESS
+ log.debug(
+ "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+ id,
+ record.key(),
+ record.value(),
+ metadata.partition(),
+ metadata.offset(),
+ metadata.timestamp(),
+ now - time
+ );
+ }
+ else
+ {
+ // HANDLE ERROR
+ log.error(
+ "{} - ERROR key={} timestamp={} latency={}ms: {}",
+ id,
+ record.key(),
+ metadata == null ? -1 : metadata.timestamp(),
+ now - time,
+ e.toString()
+ );
+ }
+ });
+
+ long now = System.currentTimeMillis();
+ log.trace(
+ "{} - Queued message with key={} latency={}ms",
+ id,
+ record.key(),
+ now - time
+ );
+ }
+}
-consumer:
- bootstrap-server: :9092
- group-id: my-group
- client-id: DEV
- topic: test
- auto-offset-reset: earliest
- commit-interval: 5s
+sumup:
+ requests:
+ bootstrap-server: :9092
+ group-id: my-group
+ client-id: DEV
+ topic-in: in
+ topic-out: out
+ auto-offset-reset: earliest
+ commit-interval: 5s
+ acks: -1
+ batch-size: 16384
+ linger-ms: 0
+ compression-type: gzip
management:
endpoint:
shutdown:
import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.kafka.test.context.EmbeddedKafka;
-import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static de.juplo.kafka.ApplicationTests.INPUT_TOPIC;
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
- "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
- "consumer.topic=" + TOPIC,
+ "sumup.requests.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "sumup.requests.topic=" + INPUT_TOPIC,
"spring.mongodb.embedded.version=4.4.13" })
-@EmbeddedKafka(topics = TOPIC)
+@EmbeddedKafka(topics = INPUT_TOPIC)
public class ApplicationIT
{
public static final String TOPIC = "FOO";
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static de.juplo.kafka.ApplicationTests.PARTITIONS;
-import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static de.juplo.kafka.ApplicationTests.*;
import static org.assertj.core.api.Assertions.*;
import static org.awaitility.Awaitility.*;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestPropertySource(
properties = {
- "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
- "consumer.topic=" + TOPIC,
- "consumer.commit-interval=1s" })
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+ "sumup.requests.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "sumup.requests.topic-in=" + INPUT_TOPIC,
+ "sumup.requests.commit-interval=1s" })
+@EmbeddedKafka(topics = { INPUT_TOPIC, OUTPUT_TOPIC }, partitions = PARTITIONS)
@EnableAutoConfiguration
@Slf4j
class ApplicationTests
{
- public static final String TOPIC = "FOO";
+ public static final String INPUT_TOPIC = "FOO";
+ public static final String OUTPUT_TOPIC = "BAR";
public static final int PARTITIONS = 10;
@Autowired
Serializer valueSerializer;
@Autowired
- KafkaProducer<String, Bytes> kafkaProducer;
+ KafkaProducer<String, Bytes> testProducer;
@Autowired
- KafkaConsumer<String, Long> kafkaConsumer;
+ KafkaConsumer<String, Integer> kafkaConsumer;
@Autowired
KafkaConsumer<Bytes, Bytes> offsetConsumer;
@Autowired
@Autowired
RecordHandler noopRecordHandler;
- EndlessConsumer<String, Long> endlessConsumer;
+ EndlessConsumer<String, Integer> endlessConsumer;
Map<TopicPartition, Long> oldOffsets;
Map<TopicPartition, Long> newOffsets;
- Set<ConsumerRecord<String, Long>> receivedRecords;
+ Set<ConsumerRecord<String, Integer>> receivedRecords;
/** Tests methods */
{
send100Messages((partition, key, counter) ->
{
- Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter));
- return new ProducerRecord<>(TOPIC, partition, key, value);
+ Bytes value = new Bytes(valueSerializer.serialize(INPUT_TOPIC, counter));
+ return new ProducerRecord<>(INPUT_TOPIC, partition, key, value);
});
await("100 records received")
send100Messages((partition, key, counter) ->
{
Bytes value = counter == 77
- ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
- : new Bytes(valueSerializer.serialize(TOPIC, counter));
- return new ProducerRecord<>(TOPIC, partition, key, value);
+ ? new Bytes(stringSerializer.serialize(INPUT_TOPIC, "BOOM!"))
+ : new Bytes(valueSerializer.serialize(INPUT_TOPIC, counter));
+ return new ProducerRecord<>(INPUT_TOPIC, partition, key, value);
});
await("Consumer failed")
return
IntStream
.range(0, PARTITIONS)
- .mapToObj(partition -> new TopicPartition(TOPIC, partition))
+ .mapToObj(partition -> new TopicPartition(INPUT_TOPIC, partition))
.collect(Collectors.toList());
}
public interface RecordGenerator<K, V>
{
- public ProducerRecord<String, Bytes> generate(int partition, String key, long counter);
+ public ProducerRecord<String, Bytes> generate(int partition, String key, int counter);
}
void send100Messages(RecordGenerator recordGenerator)
{
- long i = 0;
+ int i = 0;
for (int partition = 0; partition < 10; partition++)
{
ProducerRecord<String, Bytes> record =
recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i);
- kafkaProducer.send(record, (metadata, e) ->
+ testProducer.send(record, (metadata, e) ->
{
if (metadata != null)
{
newOffsets.put(tp, offset - 1);
});
- TestRecordHandler<String, Long> captureOffsetAndExecuteTestHandler =
- new TestRecordHandler<String, Long>(noopRecordHandler) {
+ TestRecordHandler<String, Integer> captureOffsetAndExecuteTestHandler =
+ new TestRecordHandler<String, Integer>(noopRecordHandler) {
@Override
- public void onNewRecord(ConsumerRecord<String, Long> record)
+ public void onNewRecord(ConsumerRecord<String, Integer> record)
{
newOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new EndlessConsumer<>(
executor,
properties.getClientId(),
- properties.getTopic(),
+ properties.getTopicIn(),
kafkaConsumer,
captureOffsetAndExecuteTestHandler);
public static class Configuration
{
@Bean
- Serializer<Long> serializer()
+ Serializer<Integer> valueSerializer()
{
- return new LongSerializer();
+ return new IntegerSerializer();
}
@Bean
- KafkaProducer<String, Bytes> kafkaProducer(ApplicationProperties properties)
+ KafkaProducer<String, Bytes> testProducer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
return new KafkaConsumer<>(props);
}
+
+ @Bean
+ KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ return new KafkaProducer<>(props);
+ }
}
}