exit
fi
-docker-compose up -d zookeeper kafka cli mongo express
+docker-compose up -d zookeeper kafka cli
if [[
$(docker image ls -q $IMAGE) == "" ||
depends_on:
- zookeeper
- mongo:
- image: mongo:4.4.13
- ports:
- - 27017:27017
- environment:
- MONGO_INITDB_ROOT_USERNAME: juplo
- MONGO_INITDB_ROOT_PASSWORD: training
-
- express:
- image: mongo-express
- ports:
- - 8090:8081
- environment:
- ME_CONFIG_MONGODB_ADMINUSERNAME: juplo
- ME_CONFIG_MONGODB_ADMINPASSWORD: training
- ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/
- depends_on:
- - mongo
-
kafka-ui:
image: provectuslabs/kafka-ui:0.3.3
ports:
server.port: 8080
consumer.bootstrap-server: kafka:9092
consumer.client-id: consumer
- spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
- spring.data.mongodb.database: juplo
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-mongodb</artifactId>
- </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>de.flapdoodle.embed</groupId>
- <artifactId>de.flapdoodle.embed.mongo</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import java.time.Clock;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Bean
public KeyCountingRebalanceListener keyCountingRebalanceListener(
KeyCountingRecordHandler keyCountingRecordHandler,
- PartitionStatisticsRepository repository,
ApplicationProperties properties)
{
return new KeyCountingRebalanceListener(
keyCountingRecordHandler,
- repository,
- properties.getClientId(),
- Clock.systemDefaultZone(),
- properties.getCommitInterval());
+ properties.getClientId());
}
@Bean
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
- private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener;
+ private final ConsumerRebalanceListener consumerRebalanceListener;
private final RecordHandler<K, V> handler;
private final Lock lock = new ReentrantLock();
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
+ consumer.subscribe(Arrays.asList(topic), consumerRebalanceListener);
while (true)
{
consumed++;
}
-
- pollIntervalAwareRebalanceListener.beforeNextPoll();
}
}
catch(WakeupException e)
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
@RequiredArgsConstructor
@Slf4j
-public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+public class KeyCountingRebalanceListener implements ConsumerRebalanceListener
{
private final KeyCountingRecordHandler handler;
- private final PartitionStatisticsRepository repository;
private final String id;
- private final Clock clock;
- private final Duration commitInterval;
-
- private Instant lastCommit = Instant.EPOCH;
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
Integer partition = tp.partition();
log.info("{} - adding partition: {}", id, partition);
- StatisticsDocument document =
- repository
- .findById(Integer.toString(partition))
- .orElse(new StatisticsDocument(partition));
- handler.addPartition(partition, document.statistics);
+ handler.addPartition(partition, new HashMap<>());
});
}
partition,
key);
}
- repository.save(new StatisticsDocument(partition, removed));
});
}
-
-
- @Override
- public void beforeNextPoll()
- {
- if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
- {
- log.debug("Storing data, last commit: {}", lastCommit);
- handler.getSeen().forEach((partiton, statistics) -> repository.save(
- new StatisticsDocument(
- partiton,
- statistics)));
- lastCommit = clock.instant();
- }
- }
}
+++ /dev/null
-package de.juplo.kafka;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-import java.util.Optional;
-
-
-public interface PartitionStatisticsRepository extends MongoRepository<StatisticsDocument, String>
-{
- public Optional<StatisticsDocument> findById(String partition);
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-
-
-public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener
-{
- default void beforeNextPoll() {}
-}
public interface RecordHandler<K, V> extends Consumer<ConsumerRecord<K,V>>
{
- default void beforeNextPoll() {}
}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.ToString;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.mongodb.core.mapping.Document;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Document(collection = "statistics")
-@ToString
-public class StatisticsDocument
-{
- @Id
- public String id;
- public Map<String, Long> statistics;
-
- public StatisticsDocument()
- {
- }
-
- public StatisticsDocument(Integer partition)
- {
- this.id = Integer.toString(partition);
- this.statistics = new HashMap<>();
- }
-
- public StatisticsDocument(Integer partition, Map<String, Long> statistics)
- {
- this.id = Integer.toString(partition);
- this.statistics = statistics;
- }
-}
group-id: ${consumer.group-id}
topic: ${consumer.topic}
auto-offset-reset: ${consumer.auto-offset-reset}
-spring:
- data:
- mongodb:
- uri: mongodb://juplo:training@localhost:27017
- database: juplo
logging:
level:
root: INFO
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.test.web.server.LocalServerPort;
"consumer.topic=" + TOPIC,
"spring.mongodb.embedded.version=4.4.13" })
@EmbeddedKafka(topics = TOPIC)
-@AutoConfigureDataMongo
public class ApplicationIT
{
public static final String TOPIC = "FOO";
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
properties = {
"consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
"consumer.topic=" + TOPIC,
- "consumer.commit-interval=1s",
- "spring.mongodb.embedded.version=4.4.13" })
+ "consumer.commit-interval=1s" })
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
@EnableAutoConfiguration
-@AutoConfigureDataMongo
@Slf4j
class ApplicationTests
{
this.onNewRecord(record);
handler.accept(record);
}
- @Override
-
- public void beforeNextPoll()
- {
- handler.beforeNextPoll();
- }
}