Switched from single-node (assign) to multi-instance (subscribe)
authorKai Moritz <kai@juplo.de>
Sun, 20 Jun 2021 17:51:13 +0000 (19:51 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 29 Jun 2021 18:29:41 +0000 (20:29 +0200)
* TransferConsumer uses subscribe() instead of assign().
* The subscription happens during the instanciation of TransferConsumer.
* The restorage-process of the state happens in onPartitionsAssigned().
* To be able to reset all data, that belongs to a specific partition
  after a rebalance, in order to avoid state errors because of handling
  events several times, InMemoryTransferRepository must know, which
  partition a transfer belongs to.
* To achieve this, the partitioning algorithm is made known explicitly.
* On each rebalance, a mapping from the partions to the currently assigned
  instances is maintained.
* TransferController uses the explicitly known partitioning algorithm, to
  look up the partion for a requested transfer and decide, if the data is
  available locally.
* If not, it looks up the assigned instance in the maintained mapping and
  redirects the request.

application.yml [new file with mode: 0644]
pom.xml
src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferPartitioner.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java
src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java
src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java

diff --git a/application.yml b/application.yml
new file mode 100644 (file)
index 0000000..818a596
--- /dev/null
@@ -0,0 +1,3 @@
+juplo:
+  transfer:
+    group-instance-id: peter
diff --git a/pom.xml b/pom.xml
index 6cab464..6fb54df 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -20,8 +20,9 @@
   <properties>
     <java.version>11</java.version>
     <confluent.version>6.2.0</confluent.version>
-    <kafka.version>2.8.0</kafka.version>
+    <kafka.version>2.7.1</kafka.version>
     <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
+    <spring-kafka-test.version>2.7.2</spring-kafka-test.version>
   </properties>
 
   <dependencies>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka-test</artifactId>
+      <version>${spring-kafka-test.version}</version>
+      <scope>test</scope>
+    </dependency>
 
   </dependencies>
 
index 53ff0f4..a82f8b1 100644 (file)
@@ -2,15 +2,16 @@ package de.juplo.kafka.payment.transfer;
 
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
-import de.juplo.kafka.payment.transfer.adapter.NoOpMessageService;
-import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
-import de.juplo.kafka.payment.transfer.adapter.TransferController;
+import de.juplo.kafka.payment.transfer.adapter.*;
 import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository;
 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import de.juplo.kafka.payment.transfer.ports.TransferService;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -31,16 +32,31 @@ import java.util.Properties;
 @Slf4j
 public class TransferServiceApplication
 {
+  @Bean(destroyMethod = "close")
+  AdminClient adminClient(TransferServiceProperties properties)
+  {
+    Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
+
+    Properties props = new Properties();
+    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
+
+    return AdminClient.create(props);
+  }
+
   @Bean(destroyMethod = "close")
   KafkaProducer<String, String> producer(TransferServiceProperties properties)
   {
     Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
     Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set");
+    Assert.notNull(properties.getNumPartitions(), "juplo.transfer.num-partitions must be set");
 
     Properties props = new Properties();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, TransferPartitioner.class);
+    props.put(TransferPartitioner.TOPIC, properties.getTopic());
+    props.put(TransferPartitioner.NUM_PARTITIONS, properties.getNumPartitions());
 
     return new KafkaProducer<>(props);
   }
@@ -50,10 +66,13 @@ public class TransferServiceApplication
   {
     Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
     Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
+    Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
 
     Properties props = new Properties();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
     props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
+    props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId());
+    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName());
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
@@ -66,6 +85,8 @@ public class TransferServiceApplication
   TransferConsumer transferConsumer(
       TransferServiceProperties properties,
       KafkaConsumer<String, String> consumer,
+      AdminClient adminClient,
+      TransferRepository repository,
       ObjectMapper mapper,
       TransferService productionTransferService,
       TransferService restoreTransferService)
@@ -73,7 +94,11 @@ public class TransferServiceApplication
     return
         new TransferConsumer(
             properties.getTopic(),
+            properties.getNumPartitions(),
+            properties.getInstanceIdUriMapping(),
             consumer,
+            adminClient,
+            repository,
             mapper,
             new TransferConsumer.ConsumerUseCases() {
               @Override
@@ -124,6 +149,14 @@ public class TransferServiceApplication
     return new KafkaMessagingService(producer, mapper, properties.getTopic());
   }
 
+  @Bean
+  InMemoryTransferRepository inMemoryTransferRepository(
+      TransferServiceProperties properties,
+      ObjectMapper mapper)
+  {
+    return new InMemoryTransferRepository(properties.getNumPartitions(), mapper);
+  }
+
   @Bean
   TransferService productionTransferService(
       TransferRepository repository,
@@ -143,9 +176,10 @@ public class TransferServiceApplication
   @Bean
   TransferController transferController(
       TransferService productionTransferService,
-      KafkaMessagingService kafkaMessagingService)
+      KafkaMessagingService kafkaMessagingService,
+      TransferConsumer transferConsumer)
   {
-    return new TransferController(productionTransferService, kafkaMessagingService);
+    return new TransferController(productionTransferService, kafkaMessagingService, transferConsumer);
   }
 
 
index c18f525..e001748 100644 (file)
@@ -5,6 +5,9 @@ import lombok.Getter;
 import lombok.Setter;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
+import java.util.HashMap;
+import java.util.Map;
+
 
 @ConfigurationProperties("juplo.transfer")
 @Getter
@@ -13,5 +16,13 @@ public class TransferServiceProperties
 {
   private String bootstrapServers = "localhost:9092";
   private String topic = "transfers";
+  private Integer numPartitions = 5;
   private String groupId = "transfers";
+  private String groupInstanceId;
+  private Map<String, String> instanceIdUriMapping;
+
+  public Map<String, String> getInstanceIdUriMapping()
+  {
+    return instanceIdUriMapping == null ? new HashMap<>() : instanceIdUriMapping;
+  }
 }
index 1fd2689..501bfd0 100644 (file)
@@ -5,8 +5,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
-import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -19,8 +22,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
 import java.time.Duration;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -29,20 +31,62 @@ import java.util.stream.Collectors;
 
 @RequestMapping("/consumer")
 @ResponseBody
-@RequiredArgsConstructor
 @Slf4j
-public class TransferConsumer implements Runnable
+public class TransferConsumer implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
+  private final int numPartitions;
   private final KafkaConsumer<String, String> consumer;
+  private final AdminClient adminClient;
+  private final TransferRepository repository;
   private final ObjectMapper mapper;
   private final ConsumerUseCases productionUseCases, restoreUseCases;
 
-  private boolean restoring = true;
   private boolean running = false;
   private boolean shutdown = false;
   private Future<?> future = null;
 
+  private final String groupId;
+  private final String groupInstanceId;
+  private final Map<String, String> instanceIdUriMapping;
+  private final String[] instanceIdByPartition;
+
+  private volatile boolean partitionOwnershipUnknown = true;
+
+
+  public TransferConsumer(
+      String topic,
+      int numPartitions,
+      Map<String, String> instanceIdUriMapping,
+      KafkaConsumer<String, String> consumer,
+      AdminClient adminClient,
+      TransferRepository repository,
+      ObjectMapper mapper,
+      ConsumerUseCases productionUseCases,
+      ConsumerUseCases restoreUseCases)
+  {
+    this.topic = topic;
+    this.numPartitions = numPartitions;
+    this.groupId = consumer.groupMetadata().groupId();
+    this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
+    this.instanceIdByPartition = new String[numPartitions];
+    this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
+    for (String instanceId : instanceIdUriMapping.keySet())
+    {
+      // Requests are not redirected for the instance itself
+      String uri = instanceId.equals(groupInstanceId)
+          ? null
+          : instanceIdUriMapping.get(instanceId);
+      this.instanceIdUriMapping.put(instanceId, uri);
+    }
+    this.consumer = consumer;
+    this.adminClient = adminClient;
+    this.repository = repository;
+    this.mapper = mapper;
+    this.productionUseCases = productionUseCases;
+    this.restoreUseCases = restoreUseCases;
+  }
+
 
   @Override
   public void run()
@@ -116,27 +160,98 @@ public class TransferConsumer implements Runnable
     }
   }
 
+
+  public Optional<String> uriForKey(String key)
+  {
+    synchronized (this)
+    {
+      while (partitionOwnershipUnknown)
+      {
+        try { wait(); } catch (InterruptedException e) {}
+      }
+
+      int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
+      return
+          Optional
+              .ofNullable(instanceIdByPartition[partition])
+              .map(id -> instanceIdUriMapping.get(id));
+    }
+  }
+
   @EventListener
   public synchronized void onApplicationEvent(ContextRefreshedEvent event)
   {
-    // Needed, because this method is called synchronously during the
-    // initialization pahse of Spring. If the restoring is processed
+    // "Needed", because this method is called synchronously during the
+    // initialization pahse of Spring. If the subscription happens
     // in the same thread, it would block the completion of the initialization.
     // Hence, the app would not react to any signal (CTRL-C, for example) except
     // a KILL until the restoring is finished.
-    future = CompletableFuture.runAsync(() -> restore());
+    future = CompletableFuture.runAsync(() -> start());
+  }
+
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitionOwnershipUnknown = true;
+    log.info("partitions revoked: {}", partitions);
   }
 
-  private void restore()
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    log.info("partitions assigned: {}", partitions);
+    fetchAssignmentsAsync();
+    if (partitions.size() > 0)
+      restore(partitions);
+  }
+
+  private void fetchAssignmentsAsync()
+  {
+    adminClient
+        .describeConsumerGroups(List.of(groupId))
+        .describedGroups()
+        .get(groupId)
+        .whenComplete((descriptions, e) ->
+        {
+          if (e != null)
+          {
+            log.error("could not fetch group data: {}", e.getMessage());
+          }
+          else
+          {
+            synchronized (this)
+            {
+              for (MemberDescription description : descriptions.members())
+              {
+                description
+                    .assignment()
+                    .topicPartitions()
+                    .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
+              }
+              partitionOwnershipUnknown = false;
+              notifyAll();
+            }
+          }
+        });
+  }
+
+  @Override
+  public void onPartitionsLost(Collection<TopicPartition> partitions)
+  {
+    partitionOwnershipUnknown = true;
+    log.info("partiotions lost: {}", partitions);
+  }
+
+
+  private void restore(Collection<TopicPartition> partitions)
   {
     log.info("--> starting restore...");
 
-    List<TopicPartition> partitions =
-        consumer
-            .partitionsFor(topic)
-            .stream()
-            .map(info -> new TopicPartition(topic, info.partition()))
-            .collect(Collectors.toList());
+    partitions
+        .stream()
+        .map(topicPartition -> topicPartition.partition())
+        .forEach(partition -> repository.resetStorageForPartition(partition));
 
     Map<Integer, Long> lastSeen =
         consumer
@@ -155,11 +270,7 @@ public class TransferConsumer implements Runnable
                 partition -> partition,
                 partition -> 0l));
 
-    log.info("assigning {}}", partitions);
-    consumer.assign(partitions);
-
     while (
-        restoring &&
         positions
             .entrySet()
             .stream()
@@ -182,47 +293,46 @@ public class TransferConsumer implements Runnable
       catch(WakeupException e)
       {
         log.info("--> cleanly interrupted while restoring");
-        return;
       }
     }
 
     log.info("--> restore completed!");
-    restoring = false;
-
-    // We are intentionally _not_ unsubscribing here, since that would
-    // reset the offset to _earliest_, because we disabled offset-commits.
-
-    start();
   }
 
   @PostMapping("start")
   public synchronized String start()
   {
-    if (restoring)
+    if (running)
     {
-      log.error("cannot start while restoring");
-      return "Denied: Restoring!";
+      log.info("already running!");
+      return "Already running!";
     }
 
-    String result = "Started";
-
-    if (running)
+    int foundNumPartitions = consumer.partitionsFor(topic).size();
+    if (foundNumPartitions != numPartitions)
     {
-      stop();
-      result = "Restarted";
+      log.error(
+          "unexpected number of partitions for topic {}: expected={}, found={}",
+          topic,
+          numPartitions,
+          foundNumPartitions
+          );
+      return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
     }
 
+    consumer.subscribe(List.of(topic), this);
+
     running = true;
     future = CompletableFuture.runAsync(this);
 
     log.info("started");
-    return result;
+    return "Started";
   }
 
   @PostMapping("stop")
   public synchronized String stop()
   {
-    if (!(running || restoring))
+    if (!running)
     {
       log.info("not running!");
       return "Not running";
@@ -246,6 +356,7 @@ public class TransferConsumer implements Runnable
     finally
     {
       future = null;
+      consumer.unsubscribe();
     }
 
     log.info("stopped");
@@ -262,6 +373,7 @@ public class TransferConsumer implements Runnable
   }
 
 
+
   public interface ConsumerUseCases
       extends
         GetTransferUseCase,
index 5f30df6..d81c554 100644 (file)
@@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
 
   private final GetTransferUseCase getTransferUseCase;
   private final MessagingService messagingService;
+  private final TransferConsumer consumer;
 
 
   @PostMapping(
@@ -91,10 +92,22 @@ import java.util.concurrent.CompletableFuture;
   public ResponseEntity<TransferDTO> get(@PathVariable Long id)
   {
     return
-        getTransferUseCase
-            .get(id)
-            .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer)))
-            .orElse(ResponseEntity.notFound().build());
+        consumer
+            .uriForKey(Long.toString(id))
+            .map(uri ->
+            {
+              ResponseEntity<TransferDTO> response =
+                  ResponseEntity
+                      .status(HttpStatus.TEMPORARY_REDIRECT)
+                      .location(URI.create(uri + PATH + "/" + id))
+                      .build();
+              return response;
+            })
+            .orElseGet(() ->
+                getTransferUseCase
+                    .get(id)
+                    .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer)))
+                    .orElse(ResponseEntity.notFound().build()));
   }
 
   @ResponseStatus(HttpStatus.BAD_REQUEST)
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferPartitioner.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferPartitioner.java
new file mode 100644 (file)
index 0000000..0e8cda2
--- /dev/null
@@ -0,0 +1,105 @@
+package de.juplo.kafka.payment.transfer.adapter;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.utils.Utils;
+import org.springframework.util.Assert;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+
+/**
+ * This partitioner uses the same algorithm to compute the partion
+ * for a given record as hash of its key as the
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+ *
+ * The main reason, to use this partitioner instead of the default is, to
+ * make it more explicitly visible in the code of this example, that the
+ * same algorithm is used when calculating the partition for a record, that
+ * is about to be send, as when calculating the partition for a given
+ * {@link Transfer#getId() transfer-id} to look up the node, that can
+ * serve a get-request for that transfer.
+ */
+public class TransferPartitioner implements Partitioner
+{
+  public final static Charset CONVERSION_CHARSET = Charset.forName("UTF-8");
+
+  public final static String TOPIC = "topic";
+  public final static String NUM_PARTITIONS = "num-partitions";
+
+
+  /**
+   * Computes the partition as a hash of the given key.
+   *
+   * @param key The key
+   * @return An <code>int</code>, that represents the partition.
+   */
+  public static int computeHashForKey(String key, int numPartitions)
+  {
+    return TransferPartitioner.computeHashForKey(key.getBytes(CONVERSION_CHARSET), numPartitions);
+  }
+
+  /**
+   * Computes the partition as a hash of the given key.
+   *
+   * @param keyBytes The key as an <code>byte</code></code>-array.
+   * @return An <code>int</code>, that represents the partition.
+   */
+  public static int computeHashForKey(byte[] keyBytes, int numPartitions)
+  {
+    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+  }
+
+
+  private String topic;
+  private int numPartitions;
+
+  @Override
+  public void configure(Map<String, ?> configs)
+  {
+    Assert.notNull(configs.get(TOPIC), TOPIC + " must not be null");
+    Assert.hasText(configs.get(TOPIC).toString(), TOPIC + " must not be empty");
+    Assert.notNull(configs.get(NUM_PARTITIONS), NUM_PARTITIONS + " must not be null");
+    Assert.isAssignable(
+        configs.get(NUM_PARTITIONS).getClass(),
+        Integer.class,
+        NUM_PARTITIONS + " must by an int");
+
+    topic = configs.get(TOPIC).toString();
+    numPartitions = (Integer)configs.get(NUM_PARTITIONS);
+  }
+
+  /**
+   * Compute the partition for the given record.
+   *
+   * @param topic The topic name - Only used to check, if it equals the configured fix topic.
+   * @param key The key to partition on - Not used: the algorithm uses the argument <code>keyBytes</code>.
+   * @param keyBytes serialized key to partition on - Must not be null.
+   * @param value The value to partition on or null - Not used.
+   * @param valueBytes serialized value to partition on or null - Not used.
+   * @param cluster The current cluster metadata - Not used.
+   */
+  @Override
+  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
+  {
+    if (keyBytes == null)
+      throw new IllegalArgumentException("The argument \"keyBytes\" must not be null");
+
+    if (!topic.equals(this.topic))
+      throw new IllegalArgumentException(
+          "This partitioner can only be used for a fixe partition. Here: fixed=" +
+              this.topic +
+              ", requested=" +
+              topic);
+
+    // "Stolen" from the DefaultPartitioner: hash the keyBytes to choose a partition
+    return computeHashForKey(keyBytes, numPartitions);
+  }
+
+  @Override
+  public void close() {}
+  @Override
+  public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
+}
index ec293ad..2a4d734 100644 (file)
@@ -2,32 +2,41 @@ package de.juplo.kafka.payment.transfer.persistence;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
 
-@Component
-@RequiredArgsConstructor
 @Slf4j
 public class InMemoryTransferRepository implements TransferRepository
 {
-  private final Map<Long, String> map = new HashMap<>();
+  private final int numPartitions;
+  private final Map<Long, String> mappings[];
   private final ObjectMapper mapper;
 
 
+  public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
+  {
+    this.numPartitions = numPartitions;
+    this.mappings = new HashMap[numPartitions];
+    for (int i = 0; i < numPartitions; i++)
+      this.mappings[i] = new HashMap<>();
+    this.mapper = mapper;
+  }
+
+
   @Override
   public void store(Transfer transfer)
   {
     try
     {
-      map.put(transfer.getId(), mapper.writeValueAsString(transfer));
+      int partition = partitionForId(transfer.getId());
+      mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
     }
     catch (JsonProcessingException e)
     {
@@ -40,7 +49,8 @@ public class InMemoryTransferRepository implements TransferRepository
   {
     return
         Optional
-            .ofNullable(map.get(id))
+            .ofNullable(this.mappings[partitionForId(id)])
+            .map(mapping -> mapping.get(id))
             .map(json -> {
               try
               {
@@ -56,6 +66,22 @@ public class InMemoryTransferRepository implements TransferRepository
   @Override
   public void remove(Long id)
   {
-    map.remove(id);
+    mappings[partitionForId(id)].remove(id);
+  }
+
+  @Override
+  public void resetStorageForPartition(int partition)
+  {
+    log.info(
+        "resetting storage for partition {}: dropping {} entries",
+        partition,
+        mappings[partition].size());
+    mappings[partition].clear();
+  }
+
+  private int partitionForId(long id)
+  {
+    String key = Long.toString(id);
+    return TransferPartitioner.computeHashForKey(key, numPartitions);
   }
 }
index e44a1d6..0629eab 100644 (file)
@@ -12,4 +12,6 @@ public interface TransferRepository
   Optional<Transfer> get(Long id);
 
   void remove(Long id);
+
+  void resetStorageForPartition(int partition);
 }
index 39fb248..12f5eba 100644 (file)
@@ -2,8 +2,11 @@ package de.juplo.kafka.payment.transfer;
 
 import org.junit.jupiter.api.Test;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
 
 @SpringBootTest
+@EmbeddedKafka
 class TransferServiceApplicationTests
 {
        @Test