import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.wordcount.counter.commit-interval=0",
+ "juplo.wordcount.counter.cacheMaxBytes=0",
"juplo.wordcount.counter.input-topic=" + TOPIC_IN,
"juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS)
{
return new Consumer(mapper);
}
+
+ @Primary
+ @Bean
+ KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+ {
+ return Stores.inMemoryKeyValueStore("TEST-STORE");
+ }
}
}