Rückbau auf einfachen Consumer mit Statistiken zur Nachrichtenzählung consumer/spring-consumer--rebalance-listener consumer/spring-consumer--rebalance-listener--2024-11-13--si
authorKai Moritz <kai@juplo.de>
Sat, 9 Nov 2024 16:21:56 +0000 (17:21 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:31:44 +0000 (14:31 +0100)
README.sh
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index 07f7de4..bdefd2b 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-consumer:1.1-log-compaction-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -10,7 +10,7 @@ then
 fi
 
 docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf consumer-1
+docker compose -f docker/docker-compose.yml rm -svf consumer-1 consumer-2
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -28,8 +28,12 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
 
 docker compose -f docker/docker-compose.yml up -d producer
 docker compose -f docker/docker-compose.yml up -d consumer-1
+sleep 10
+docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/
 
+docker compose -f docker/docker-compose.yml up -d consumer-2
 sleep 10
-docker compose -f docker/docker-compose.yml exec cli http consumer-1:8881/
+docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/
+docker compose -f docker/docker-compose.yml exec cli http -v consumer-2:8881/
 
 docker compose -f docker/docker-compose.yml stop producer consumer-1
index db8abf6..d80fed4 100644 (file)
@@ -137,17 +137,11 @@ services:
           echo -n Bereits konfiguriert: 
           cat INITIALIZED
           kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-          kafka-topics --bootstrap-server kafka:9092 --describe --topic state
         else
           kafka-topics --bootstrap-server kafka:9092 \
                        --delete \
                        --if-exists \
                        --topic test
-          kafka-topics --bootstrap-server kafka:9092 \
-                       --delete \
-                       --if-exists \
-                       --topic state \
-
           kafka-topics --bootstrap-server kafka:9092 \
                        --create \
                        --topic test \
@@ -156,20 +150,7 @@ services:
                        --config min.insync.replicas=2 \
           && echo Das Topic \'test\' wurde erfolgreich angelegt: \
           && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \
-          && \
-          kafka-topics --bootstrap-server kafka:9092 \
-                       --create \
-                       --topic state \
-                       --partitions 2 \
-                       --replication-factor 3 \
-                       --config min.insync.replicas=2 \
-                       --config cleanup.policy=compact \
-                       --config segment.ms=3000 \
-                       --config max.compaction.lag.ms=5000 \
-          && echo Das Topic \'state\' wurde erfolgreich angelegt: \
-          && kafka-topics --bootstrap-server kafka:9092 --describe --topic state \
-          && \
-          date > INITIALIZED
+          && date > INITIALIZED
         fi
     stop_grace_period: 0s
     depends_on:
@@ -218,7 +199,7 @@ services:
       juplo.producer.throttle-ms: 10
 
   consumer-1:
-    image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT
+    image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: consumer-1
@@ -227,7 +208,7 @@ services:
       logging.level.de.juplo: TRACE
 
   consumer-2:
-    image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT
+    image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: consumer-2
@@ -236,7 +217,7 @@ services:
       logging.level.de.juplo: TRACE
 
   consumer-3:
-    image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT
+    image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: consumer-3
diff --git a/pom.xml b/pom.xml
index 9136018..58da35e 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
   <artifactId>spring-consumer</artifactId>
   <name>Spring Consumer</name>
   <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.1-log-compaction-SNAPSHOT</version>
+  <version>1.1-rebalance-listener-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
index 49875a0..a4856a6 100644 (file)
@@ -2,11 +2,8 @@ package de.juplo.kafka;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.RangeAssignor;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.consumer.StickyAssignor;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
@@ -22,7 +19,6 @@ public class ApplicationConfiguration
   @Bean
   public ExampleConsumer exampleConsumer(
       Consumer<String, String> kafkaConsumer,
-      Producer<String, String> kafkaProducer,
       ApplicationProperties properties,
       ConfigurableApplicationContext applicationContext)
   {
@@ -31,8 +27,6 @@ public class ApplicationConfiguration
             properties.getClientId(),
             properties.getConsumerProperties().getTopic(),
             kafkaConsumer,
-            properties.getProducerProperties().getTopic(),
-            kafkaProducer,
             () -> applicationContext.close());
   }
 
@@ -52,29 +46,10 @@ public class ApplicationConfiguration
       props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
     }
     props.put("metadata.maxage.ms", 5000); //  5 Sekunden
-    props.put("partition.assignment.strategy", RangeAssignor.class.getName());
+    props.put("partition.assignment.strategy", StickyAssignor.class.getName());
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
 
     return new KafkaConsumer<>(props);
   }
-
-  @Bean
-  public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
-  {
-    Properties props = new Properties();
-    props.put("bootstrap.servers", properties.getBootstrapServer());
-    props.put("client.id", properties.getClientId());
-    props.put("acks", properties.getProducerProperties().getAcks());
-    props.put("batch.size", properties.getProducerProperties().getBatchSize());
-    props.put("metadata.maxage.ms",   5000); //  5 Sekunden
-    props.put("delivery.timeout.ms", 20000); // 20 Sekunden
-    props.put("request.timeout.ms",  10000); // 10 Sekunden
-    props.put("linger.ms", properties.getProducerProperties().getLingerMs());
-    props.put("compression.type", properties.getProducerProperties().getCompressionType());
-    props.put("key.serializer", StringSerializer.class.getName());
-    props.put("value.serializer", StringSerializer.class.getName());
-
-    return new KafkaProducer<>(props);
-  }
 }
index 0b43159..c8193c9 100644 (file)
@@ -25,8 +25,6 @@ public class ApplicationProperties
 
   @NotNull
   private ConsumerProperties consumer;
-  @NotNull
-  private ProducerProperties producer;
 
 
   public ConsumerProperties getConsumerProperties()
@@ -34,11 +32,6 @@ public class ApplicationProperties
     return consumer;
   }
 
-  public ProducerProperties getProducerProperties()
-  {
-    return producer;
-  }
-
 
   @Validated
   @Getter
@@ -56,24 +49,4 @@ public class ApplicationProperties
 
     enum OffsetReset { latest, earliest, none }
   }
-
-  @Validated
-  @Getter
-  @Setter
-  static class ProducerProperties
-  {
-    @NotNull
-    @NotEmpty
-    private String topic;
-    @NotNull
-    @NotEmpty
-    private String acks;
-    @NotNull
-    private Integer batchSize;
-    @NotNull
-    private Integer lingerMs;
-    @NotNull
-    @NotEmpty
-    private String compressionType;
-  }
 }
index fa2ff81..53abd4d 100644 (file)
@@ -5,14 +5,11 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
 import java.util.*;
-import java.util.concurrent.Phaser;
 
 
 @Slf4j
@@ -24,19 +21,9 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   private final Thread workerThread;
   private final Runnable closeCallback;
 
-  private final String stateTopic;
-  private final Producer<String, String> producer;
-
   private volatile boolean running = false;
-  private final Phaser phaser = new Phaser(1);
   private final Set<TopicPartition> assignedPartitions = new HashSet<>();
-  private volatile State[] partitionStates;
-  private Map<String, Long>[] restoredState;
   private CounterState[] counterState;
-  private volatile long[] stateEndOffsets;
-  private volatile int[] seen;
-  private volatile int[] acked;
-  private volatile boolean[] done;
   private long consumed = 0;
 
 
@@ -44,15 +31,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     String clientId,
     String topic,
     Consumer<String, String> consumer,
-    String stateTopic,
-    Producer<String, String> producer,
     Runnable closeCallback)
   {
     this.id = clientId;
     this.topic = topic;
     this.consumer = consumer;
-    this.stateTopic = stateTopic;
-    this.producer = producer;
 
     workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
     workerThread.start();
@@ -69,20 +52,10 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
       int numPartitions = consumer.partitionsFor(topic).size();
       log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
-      partitionStates = new State[numPartitions];
-      for (int i=0; i<numPartitions; i++)
-      {
-        partitionStates[i] = State.UNASSIGNED;
-      }
-      restoredState = new Map[numPartitions];
       counterState = new CounterState[numPartitions];
-      stateEndOffsets = new long[numPartitions];
-      seen = new int[numPartitions];
-      acked = new int[numPartitions];
-      done = new boolean[numPartitions];
 
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic, stateTopic), this);
+      consumer.subscribe(Arrays.asList(topic), this);
       running = true;
 
       while (running)
@@ -90,48 +63,16 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
         ConsumerRecords<String, String> records =
             consumer.poll(Duration.ofSeconds(1));
 
-        int phase = phaser.getPhase();
-
-        assignedPartitions
-          .forEach(partition ->
-          {
-            seen[partition.partition()] = 0;
-            acked[partition.partition()] = 0;
-            done[partition.partition()] = false;
-          });
-
-        log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
-        records
-          .partitions()
-          .forEach(partition ->
-          {
-            for (ConsumerRecord<String, String> record : records.records(partition))
-            {
-              handleRecord(
-                record.topic(),
-                record.partition(),
-                record.offset(),
-                record.key(),
-                record.value());
-            }
-
-            checkRestoreProgress(partition);
-
-            done[partition.partition()] = true;
-          });
-
-        assignedPartitions
-          .forEach(partition ->
-          {
-            if (seen[partition.partition()] == 0)
-            {
-              int arrivedPhase = phaser.arrive();
-              log.debug("{} - Received no records for partition {} in phase {}", id, partition, arrivedPhase);
-            }
-          });
-
-        int arrivedPhase = phaser.arriveAndAwaitAdvance();
-        log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase);
+        log.info("{} - Received {} messages", id, records.count());
+        for (ConsumerRecord<String, String> record : records)
+        {
+          handleRecord(
+            record.topic(),
+            record.partition(),
+            record.offset(),
+            record.key(),
+            record.value());
+        }
       }
     }
     catch(WakeupException e)
@@ -163,59 +104,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     consumed++;
     log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
 
-    if (topic.equals(this.topic))
-    {
-      handleMessage(partition, key);
-    }
-    else
-    {
-      handleState(partition, key, value);
-    }
-  }
-
-  private void checkRestoreProgress(TopicPartition topicPartition)
-  {
-    int partition = topicPartition.partition();
-
-    if (partitionStates[partition] == State.RESTORING)
-    {
-      long consumerPosition = consumer.position(topicPartition);
-
-      if (consumerPosition + 1 >= stateEndOffsets[partition])
-      {
-        log.info(
-          "{} - Position of consumer is {}. Restoring of state for partition {} done!",
-          id,
-          consumerPosition,
-          topicPartition);
-        stateAssigned(partition);
-      }
-      else
-      {
-        log.debug(
-          "{} - Restored state up to offset {}, end-offset: {}",
-          id,
-          consumerPosition,
-          stateEndOffsets[partition]);
-      }
-    }
-  }
-
-  private synchronized void handleState(
-    int partition,
-    String key,
-    String value)
-  {
-    restoredState[partition].put(key, Long.parseLong(value));
-  }
-
-  private void handleMessage(
-    Integer partition,
-    String key)
-  {
     Long counter = computeCount(partition, key);
     log.info("{} - current value for counter {}: {}", id, key, counter);
-    sendCounterState(partition, key, counter);
   }
 
   private synchronized Long computeCount(int partition, String key)
@@ -230,82 +120,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     return result;
   }
 
-  void sendCounterState(int partition, String key, Long counter)
-  {
-    seen[partition]++;
-
-    final long time = System.currentTimeMillis();
-
-    final ProducerRecord<String, String> record = new ProducerRecord<>(
-        stateTopic,        // Topic
-        key,               // Key
-        counter.toString() // Value
-    );
-
-    producer.send(record, (metadata, e) ->
-    {
-      long now = System.currentTimeMillis();
-      if (e == null)
-      {
-        // HANDLE SUCCESS
-        log.debug(
-            "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms",
-            id,
-            record.key(),
-            record.value(),
-            metadata.partition(),
-            metadata.offset(),
-            metadata.timestamp(),
-            now - time
-        );
-      }
-      else
-      {
-        // HANDLE ERROR
-        log.error(
-            "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}",
-            id,
-            record.key(),
-            record.value(),
-            metadata == null ? -1 : metadata.timestamp(),
-            now - time,
-            e.toString()
-        );
-      }
-
-      acked[partition]++;
-      if (done[partition] && !(acked[partition] < seen[partition]))
-      {
-        int arrivedPhase = phaser.arrive();
-        log.debug(
-            "{} - Arrived at phase {} for partition {}, seen={}, acked={}",
-            id,
-            arrivedPhase,
-            partition,
-            seen[partition],
-            acked[partition]);
-      }
-      else
-      {
-        log.debug(
-            "{} - Still in phase {} for partition {}, seen={}, acked={}",
-            id,
-            phaser.getPhase(),
-            partition,
-            seen[partition],
-            acked[partition]);
-      }
-    });
-
-    long now = System.currentTimeMillis();
-    log.trace(
-        "{} - Queued message {}={}, latency={}ms",
-        id,
-        record.key(),
-        record.value(),
-        now - time
-    );
-  }
 
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
@@ -313,7 +127,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     partitions
       .stream()
       .filter(partition -> partition.topic().equals(topic))
-      .forEach(partition -> restoreAndAssign(partition.partition()));
+      .forEach(partition ->
+      {
+        assignedPartitions.add(partition);
+        counterState[partition.partition()] = new CounterState(new HashMap<>());
+      });
   }
 
   @Override
@@ -322,132 +140,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     partitions
       .stream()
       .filter(partition -> partition.topic().equals(topic))
-      .forEach(partition -> revoke(partition.partition()));
-  }
-
-  private void restoreAndAssign(int partition)
-  {
-    TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
-
-    long stateEndOffset = consumer
-      .endOffsets(List.of(statePartition))
-      .get(statePartition)
-      .longValue();
-
-    long stateBeginningOffset = consumer
-      .beginningOffsets(List.of(statePartition))
-      .get(statePartition);
-
-    log.info(
-      "{} - Found beginning-offset {} and end-offset {} for state partition {}",
-      id,
-      stateBeginningOffset,
-      stateEndOffset,
-      partition);
-
-    if (stateBeginningOffset < stateEndOffset)
-    {
-      stateRestoring(partition, stateBeginningOffset, stateEndOffset);
-    }
-    else
-    {
-      log.info("{} - No state available for partition {}", id, partition);
-      restoredState[partition] = new HashMap<>();
-      stateAssigned(partition);
-    }
-  }
-
-  private void revoke(int partition)
-  {
-    State partitionState = partitionStates[partition];
-    switch (partitionState)
-    {
-      case RESTORING, ASSIGNED -> stateUnassigned(partition);
-      case UNASSIGNED -> log.warn("{} - partition {} in state {} was revoked!", id, partition, partitionState);
-    }
-  }
-
-  private void stateRestoring(int partition, long stateBeginningOffset, long stateEndOffset)
-  {
-    log.info(
-      "{} - Changing partition-state for {}: {} -> RESTORING",
-      id,
-      partition,
-      partitionStates[partition]);
-    partitionStates[partition] = State.RESTORING;
-
-    TopicPartition messagePartition = new TopicPartition(this.topic, partition);
-    log.info("{} - Pausing message partition {}", id, messagePartition);
-    consumer.pause(List.of(messagePartition));
-
-    TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
-    log.info(
-      "{} - Seeking to first offset {} for state partition {}",
-      id,
-      stateBeginningOffset,
-      statePartition);
-    consumer.seek(statePartition, stateBeginningOffset);
-    stateEndOffsets[partition] = stateEndOffset;
-    restoredState[partition] = new HashMap<>();
-    log.info("{} - Resuming state partition {}", id, statePartition);
-    consumer.resume(List.of(statePartition));
-  }
-
-  private void stateAssigned(int partition)
-  {
-    log.info(
-      "{} - State-change for partition {}: {} -> ASSIGNED",
-      id,
-      partition,
-      partitionStates[partition]);
-
-    partitionStates[partition] = State.ASSIGNED;
-
-    TopicPartition statePartition = new TopicPartition(stateTopic, partition);
-    log.info("{} - Pausing state partition {}...", id, statePartition);
-    consumer.pause(List.of(statePartition));
-    counterState[partition] = new CounterState(restoredState[partition]);
-    restoredState[partition] = null;
-
-    TopicPartition messagePartition = new TopicPartition(topic, partition);
-    log.info("{} - Adding partition {} to the assigned partitions", id, messagePartition);
-    assignedPartitions.add(messagePartition);
-    phaser.register();
-    log.info(
-      "{} - Registered new party for newly assigned partition {}. New total number of parties: {}",
-      id,
-      messagePartition,
-      phaser.getRegisteredParties());
-    log.info("{} - Resuming message partition {}...", id, messagePartition);
-    consumer.resume(List.of(messagePartition));
-  }
-
-  private void stateUnassigned(int partition)
-  {
-    State oldPartitionState = partitionStates[partition];
-
-    log.info(
-      "{} - State-change for partition {}: {} -> UNASSIGNED",
-      id,
-      partition,
-      oldPartitionState);
-
-    partitionStates[partition] = State.UNASSIGNED;
-
-    if (oldPartitionState == State.ASSIGNED)
-    {
-      TopicPartition messagePartition = new TopicPartition(topic, partition);
-      log.info("{} - Revoking partition {}", id, messagePartition);
-      assignedPartitions.remove(messagePartition);
-      counterState[partition] = null;
-
-      phaser.arriveAndDeregister();
-      log.info(
-        "{} - Deregistered party for revoked partition {}. New total number of parties: {}",
-        id,
-        messagePartition,
-        phaser.getRegisteredParties());
-    }
+      .forEach(partition ->
+      {
+        assignedPartitions.remove(partition);
+        counterState[partition.partition()] = null;
+      });
   }
 
 
@@ -458,11 +155,4 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     consumer.wakeup();
     workerThread.join();
   }
-
-  enum State
-  {
-    UNASSIGNED,
-    RESTORING,
-    ASSIGNED
-  }
 }
index d9e7066..7a06731 100644 (file)
@@ -6,12 +6,6 @@ juplo:
     topic: test
     auto-offset-reset: earliest
     auto-commit-interval: 5s
-  producer:
-    topic: state
-    acks: -1
-    batch-size: 16384
-    linger-ms: 0
-    compression-type: gzip
 management:
   endpoint:
     shutdown:
@@ -34,12 +28,6 @@ info:
       topic: ${juplo.consumer.topic}
       auto-offset-reset: ${juplo.consumer.auto-offset-reset}
       auto-commit-interval: ${juplo.consumer.auto-commit-interval}
-    producer:
-      topic: ${juplo.producer.topic}
-      acks: ${juplo.producer.acks}
-      batch-size: ${juplo.producer.batch-size}
-      linger-ms: ${juplo.producer.linger-ms}
-      compression-type: ${juplo.producer.compression-type}
 logging:
   level:
     root: INFO
index 22bb613..e4b97a4 100644 (file)
@@ -9,7 +9,8 @@ import org.springframework.test.web.servlet.MockMvc;
 
 import java.time.Duration;
 
-import static de.juplo.kafka.ApplicationTests.*;
+import static de.juplo.kafka.ApplicationTests.PARTITIONS;
+import static de.juplo.kafka.ApplicationTests.TOPIC;
 import static org.awaitility.Awaitility.await;
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
@@ -20,13 +21,12 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
     properties = {
         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
         "spring.kafka.consumer.auto-offset-reset=earliest",
-        "juplo.consumer.topic=" + TOPIC_IN})
+        "juplo.consumer.topic=" + TOPIC })
 @AutoConfigureMockMvc
-@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS)
+@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 public class ApplicationTests
 {
-  static final String TOPIC_IN  = "FOO";
-  static final String TOPIC_OUT = "BAR";
+  static final String TOPIC = "FOO";
   static final int PARTITIONS = 10;
 
   @Autowired