--- /dev/null
+root = true
+
+[*]
+indent_style = space
+indent_size = tab
+tab_width = 2
+charset = utf-8
+end_of_line = lf
+trim_trailing_whitespace = true
+insert_final_newline = false
+
+[*.properties]
+charset = latin1
\ No newline at end of file
</execution>
</executions>
</plugin>
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
</plugins>
</build>
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.function.Consumer;
@Configuration
public class ApplicationConfiguration
{
@Bean
- public Consumer<ConsumerRecord<String, Long>> consumer()
+ public RecordHandler<String, Long> recordHandler()
{
return (record) ->
{
public EndlessConsumer<String, Long> endlessConsumer(
KafkaConsumer<String, Long> kafkaConsumer,
ExecutorService executor,
- Consumer<ConsumerRecord<String, Long>> handler,
+ RecordHandler recordHandler,
ApplicationProperties properties)
{
return
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
- handler);
+ recordHandler);
}
@Bean
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
- private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
+ private final RecordHandler handler;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.function.Consumer;
+
+
+public interface RecordHandler<K, V> extends Consumer<ConsumerRecord<K,V>>
+{
+}
--- /dev/null
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.boot.test.web.server.LocalServerPort;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import static de.juplo.kafka.ApplicationIT.TOPIC;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "consumer.topic=" + TOPIC })
+@EmbeddedKafka(topics = TOPIC)
+@AutoConfigureDataMongo
+public class ApplicationIT
+{
+ public static final String TOPIC = "FOO";
+
+ @LocalServerPort
+ private int port;
+
+ @Autowired
+ private TestRestTemplate restTemplate;
+
+
+
+ @Test
+ public void testApplicationStartup()
+ {
+ restTemplate.getForObject(
+ "http://localhost:" + port + "/actuator/health",
+ String.class
+ )
+ .contains("UP");
+ }
+}
package de.juplo.kafka;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
import org.springframework.test.context.ContextConfiguration;
-import java.util.Set;
import java.util.function.Consumer;
@TestConfiguration
public static class Configuration
{
- @Primary
@Bean
- public Consumer<ConsumerRecord<String, Long>> consumer()
+ public RecordHandler<String, Long> recordHandler()
{
return (record) ->
{
package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
@Autowired
- KafkaConsumer<K, V> kafkaConsumer;
+ org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer;
@Autowired
Consumer<ConsumerRecord<K, V>> consumer;
@Autowired
- ApplicationProperties properties;
+ ApplicationProperties applicationProperties;
@Autowired
ExecutorService executor;
+ @Autowired
+ ConsumerRebalanceListener rebalanceListener;
+ @Autowired
+ RecordHandler<K, V> recordHandler;
KafkaProducer<Bytes, Bytes> testRecordProducer;
KafkaConsumer<Bytes, Bytes> offsetConsumer;
/** Tests methods */
@Test
- void commitsCurrentOffsetsOnSuccess()
+ void commitsCurrentOffsetsOnSuccess() throws Exception
{
int numberOfGeneratedMessages =
recordGenerator.generate(false, false, messageSender);
.isThrownBy(() -> endlessConsumer.exitStatus())
.describedAs("Consumer should still be running");
+ endlessConsumer.stop();
recordGenerator.assertBusinessLogic();
}
Long expected = offsetsToCheck.get(tp) + 1;
log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
assertThat(offset)
- .describedAs("Committed offset corresponds to the offset of the consumer")
+ .describedAs("Committed offset must be at most equal to the offset of the consumer")
.isLessThanOrEqualTo(expected);
isOffsetBehindSeen.add(offset < expected);
});
{
Properties props;
props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("bootstrap.servers", applicationProperties.getBootstrapServer());
props.put("linger.ms", 100);
props.put("key.serializer", BytesSerializer.class.getName());
props.put("value.serializer", BytesSerializer.class.getName());
testRecordProducer = new KafkaProducer<>(props);
props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("bootstrap.servers", applicationProperties.getBootstrapServer());
props.put("client.id", "OFFSET-CONSUMER");
- props.put("group.id", properties.getGroupId());
+ props.put("group.id", applicationProperties.getGroupId());
props.put("key.deserializer", BytesDeserializer.class.getName());
props.put("value.deserializer", BytesDeserializer.class.getName());
offsetConsumer = new KafkaConsumer<>(props);
seenOffsets.put(tp, offset - 1);
});
- Consumer<ConsumerRecord<K, V>> captureOffsetAndExecuteTestHandler =
- record ->
+ TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
+ new TestRecordHandler<K, V>(recordHandler)
{
- seenOffsets.put(
- new TopicPartition(record.topic(), record.partition()),
- record.offset());
- receivedRecords.add(record);
- consumer.accept(record);
+ @Override
+ public void onNewRecord(ConsumerRecord<K, V> record)
+ {
+ seenOffsets.put(
+ new TopicPartition(record.topic(), record.partition()),
+ record.offset());
+ receivedRecords.add(record);
+ consumer.accept(record);
+ }
};
endlessConsumer =
new EndlessConsumer<>(
executor,
- properties.getClientId(),
- properties.getTopic(),
+ applicationProperties.getClientId(),
+ applicationProperties.getTopic(),
kafkaConsumer,
captureOffsetAndExecuteTestHandler);
{
try
{
- endlessConsumer.stop();
testRecordProducer.close();
offsetConsumer.close();
}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+
+@RequiredArgsConstructor
+public abstract class TestRecordHandler<K, V> implements RecordHandler<K, V>
+{
+ private final RecordHandler<K, V> handler;
+
+
+ public abstract void onNewRecord(ConsumerRecord<K, V> record);
+
+
+ @Override
+ public void accept(ConsumerRecord<K, V> record)
+ {
+ this.onNewRecord(record);
+ handler.accept(record);
+ }
+}