version: '3.2'
services:
zookeeper:
- image: confluentinc/cp-zookeeper:7.0.2
+ image: confluentinc/cp-zookeeper:7.1.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- 2181:2181
kafka:
- image: confluentinc/cp-kafka:7.0.2
+ image: confluentinc/cp-kafka:7.1.3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
- kafka-ui:
- image: provectuslabs/kafka-ui:0.3.3
- ports:
- - 8080:8080
- environment:
- KAFKA_CLUSTERS_0_NAME: local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
+ setup:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+ kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+ "
cli:
image: juplo/toolbox
producer:
image: juplo/endless-producer:1.0-SNAPSHOT
ports:
- - 8080:8880
+ - 8080:8080
environment:
+ server.port: 8080
producer.bootstrap-server: kafka:9092
producer.client-id: producer
producer.topic: test
- producer.throttle-ms: 10
+ producer.throttle-ms: 200
consumer:
image: juplo/endless-consumer:1.0-SNAPSHOT
ports:
- - 8081:8881
+ - 8081:8080
environment:
+ server.port: 8080
consumer.bootstrap-server: kafka:9092
consumer.client-id: my-group
consumer.client-id: consumer
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.6.5</version>
+ <version>2.7.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
consumer.stop();
}
+
@GetMapping("seen")
- public Map<Integer, Map<String, Integer>> seen()
+ public Map<Integer, Map<String, Long>> seen()
{
return consumer.getSeen();
}
+
@ExceptionHandler
@ResponseStatus(HttpStatus.BAD_REQUEST)
public ErrorResponse illegalStateException(IllegalStateException e)