From 225fdb5fe597b10ea29ee56895aa2b0df98e0604 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 20 Jun 2021 19:51:13 +0200 Subject: [PATCH] Switched from single-node (assign) to multi-instance (subscribe) * TransferConsumer uses subscribe() instead of assign(). * The subscription happens during the instanciation of TransferConsumer. * The restorage-process of the state happens in onPartitionsAssigned(). * To be able to reset all data, that belongs to a specific partition after a rebalance, in order to avoid state errors because of handling events several times, InMemoryTransferRepository must know, which partition a transfer belongs to. * To achieve this, the partitioning algorithm is made known explicitly. * On each rebalance, a mapping from the partions to the currently assigned instances is maintained. * TransferController uses the explicitly known partitioning algorithm, to look up the partion for a requested transfer and decide, if the data is available locally. * If not, it looks up the assigned instance in the maintained mapping and redirects the request. --- application.yml | 3 + pom.xml | 9 +- .../transfer/TransferServiceApplication.java | 46 ++++- .../transfer/TransferServiceProperties.java | 11 + .../transfer/adapter/TransferConsumer.java | 192 ++++++++++++++---- .../transfer/adapter/TransferController.java | 21 +- .../transfer/adapter/TransferPartitioner.java | 105 ++++++++++ .../InMemoryTransferRepository.java | 39 +++- .../transfer/ports/TransferRepository.java | 2 + .../TransferServiceApplicationTests.java | 3 + 10 files changed, 377 insertions(+), 54 deletions(-) create mode 100644 application.yml create mode 100644 src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferPartitioner.java diff --git a/application.yml b/application.yml new file mode 100644 index 0000000..818a596 --- /dev/null +++ b/application.yml @@ -0,0 +1,3 @@ +juplo: + transfer: + group-instance-id: peter diff --git a/pom.xml b/pom.xml index 6cab464..6fb54df 100644 --- a/pom.xml +++ b/pom.xml @@ -20,8 +20,9 @@ 11 6.2.0 - 2.8.0 + 2.7.1 0.33.0 + 2.7.2 @@ -65,6 +66,12 @@ spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + ${spring-kafka-test.version} + test + diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index 53ff0f4..a82f8b1 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -2,15 +2,16 @@ package de.juplo.kafka.payment.transfer; import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService; -import de.juplo.kafka.payment.transfer.adapter.NoOpMessageService; -import de.juplo.kafka.payment.transfer.adapter.TransferConsumer; -import de.juplo.kafka.payment.transfer.adapter.TransferController; +import de.juplo.kafka.payment.transfer.adapter.*; import de.juplo.kafka.payment.transfer.domain.Transfer; +import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository; import de.juplo.kafka.payment.transfer.ports.TransferRepository; import de.juplo.kafka.payment.transfer.ports.TransferService; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -31,16 +32,31 @@ import java.util.Properties; @Slf4j public class TransferServiceApplication { + @Bean(destroyMethod = "close") + AdminClient adminClient(TransferServiceProperties properties) + { + Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set"); + + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + + return AdminClient.create(props); + } + @Bean(destroyMethod = "close") KafkaProducer producer(TransferServiceProperties properties) { Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set"); Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set"); + Assert.notNull(properties.getNumPartitions(), "juplo.transfer.num-partitions must be set"); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, TransferPartitioner.class); + props.put(TransferPartitioner.TOPIC, properties.getTopic()); + props.put(TransferPartitioner.NUM_PARTITIONS, properties.getNumPartitions()); return new KafkaProducer<>(props); } @@ -50,10 +66,13 @@ public class TransferServiceApplication { Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set"); Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set"); + Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set"); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); + props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId()); + props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); @@ -66,6 +85,8 @@ public class TransferServiceApplication TransferConsumer transferConsumer( TransferServiceProperties properties, KafkaConsumer consumer, + AdminClient adminClient, + TransferRepository repository, ObjectMapper mapper, TransferService productionTransferService, TransferService restoreTransferService) @@ -73,7 +94,11 @@ public class TransferServiceApplication return new TransferConsumer( properties.getTopic(), + properties.getNumPartitions(), + properties.getInstanceIdUriMapping(), consumer, + adminClient, + repository, mapper, new TransferConsumer.ConsumerUseCases() { @Override @@ -124,6 +149,14 @@ public class TransferServiceApplication return new KafkaMessagingService(producer, mapper, properties.getTopic()); } + @Bean + InMemoryTransferRepository inMemoryTransferRepository( + TransferServiceProperties properties, + ObjectMapper mapper) + { + return new InMemoryTransferRepository(properties.getNumPartitions(), mapper); + } + @Bean TransferService productionTransferService( TransferRepository repository, @@ -143,9 +176,10 @@ public class TransferServiceApplication @Bean TransferController transferController( TransferService productionTransferService, - KafkaMessagingService kafkaMessagingService) + KafkaMessagingService kafkaMessagingService, + TransferConsumer transferConsumer) { - return new TransferController(productionTransferService, kafkaMessagingService); + return new TransferController(productionTransferService, kafkaMessagingService, transferConsumer); } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java index c18f525..e001748 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java @@ -5,6 +5,9 @@ import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; +import java.util.HashMap; +import java.util.Map; + @ConfigurationProperties("juplo.transfer") @Getter @@ -13,5 +16,13 @@ public class TransferServiceProperties { private String bootstrapServers = "localhost:9092"; private String topic = "transfers"; + private Integer numPartitions = 5; private String groupId = "transfers"; + private String groupInstanceId; + private Map instanceIdUriMapping; + + public Map getInstanceIdUriMapping() + { + return instanceIdUriMapping == null ? new HashMap<>() : instanceIdUriMapping; + } } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 1fd2689..94fccf7 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -5,8 +5,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase; import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; -import lombok.RequiredArgsConstructor; +import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -19,8 +22,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.time.Duration; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -29,20 +31,62 @@ import java.util.stream.Collectors; @RequestMapping("/consumer") @ResponseBody -@RequiredArgsConstructor @Slf4j -public class TransferConsumer implements Runnable +public class TransferConsumer implements Runnable, ConsumerRebalanceListener { private final String topic; + private final int numPartitions; private final KafkaConsumer consumer; + private final AdminClient adminClient; + private final TransferRepository repository; private final ObjectMapper mapper; private final ConsumerUseCases productionUseCases, restoreUseCases; - private boolean restoring = true; private boolean running = false; private boolean shutdown = false; private Future future = null; + private final String groupId; + private final String groupInstanceId; + private final Map instanceIdUriMapping; + private final String[] instanceIdByPartition; + + private volatile boolean partitionOwnershipUnknown = true; + + + public TransferConsumer( + String topic, + int numPartitions, + Map instanceIdUriMapping, + KafkaConsumer consumer, + AdminClient adminClient, + TransferRepository repository, + ObjectMapper mapper, + ConsumerUseCases productionUseCases, + ConsumerUseCases restoreUseCases) + { + this.topic = topic; + this.numPartitions = numPartitions; + this.groupId = consumer.groupMetadata().groupId(); + this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get(); + this.instanceIdByPartition = new String[numPartitions]; + this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size()); + for (String instanceId : instanceIdUriMapping.keySet()) + { + // Requests are not redirected for the instance itself + String uri = instanceId.equals(groupInstanceId) + ? null + : instanceIdUriMapping.get(instanceId); + this.instanceIdUriMapping.put(instanceId, uri); + } + this.consumer = consumer; + this.adminClient = adminClient; + this.repository = repository; + this.mapper = mapper; + this.productionUseCases = productionUseCases; + this.restoreUseCases = restoreUseCases; + } + @Override public void run() @@ -116,27 +160,104 @@ public class TransferConsumer implements Runnable } } + + public Optional uriForKey(String key) + { + synchronized (this) + { + while (partitionOwnershipUnknown) + { + try { wait(); } catch (InterruptedException e) {} + } + + int partition = TransferPartitioner.computeHashForKey(key, numPartitions); + return + Optional + .ofNullable(instanceIdByPartition[partition]) + .map(id -> instanceIdUriMapping.get(id)); + } + } + @EventListener public synchronized void onApplicationEvent(ContextRefreshedEvent event) { - // Needed, because this method is called synchronously during the - // initialization pahse of Spring. If the restoring is processed + // "Needed", because this method is called synchronously during the + // initialization pahse of Spring. If the subscription happens // in the same thread, it would block the completion of the initialization. // Hence, the app would not react to any signal (CTRL-C, for example) except // a KILL until the restoring is finished. - future = CompletableFuture.runAsync(() -> restore()); + future = CompletableFuture.runAsync(() -> start()); + } + + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitionOwnershipUnknown = true; + log.info("partiotions revoked: {}", partitions); + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("partiotions assigned: {}", partitions); + fetchAssignmentsAsync(); + if (partitions.size() > 0) + restore(partitions); } - private void restore() + private void fetchAssignmentsAsync() + { + adminClient + .describeConsumerGroups(List.of(groupId)) + .describedGroups() + .get(groupId) + .whenComplete((descriptions, e) -> + { + if (e != null) + { + log.error("could not fetch group data: {}", e.getMessage()); + } + else + { + synchronized (this) + { + for (MemberDescription description : descriptions.members()) + { + description + .assignment() + .topicPartitions() + .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get()); + } + partitionOwnershipUnknown = false; + notifyAll(); + } + } + }); + } + + private int partitionForId(long id) + { + String key = Long.toString(id); + return TransferPartitioner.computeHashForKey(key, numPartitions); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + partitionOwnershipUnknown = true; + log.info("partiotions lost: {}", partitions); + } + + + private void restore(Collection partitions) { log.info("--> starting restore..."); - List partitions = - consumer - .partitionsFor(topic) - .stream() - .map(info -> new TopicPartition(topic, info.partition())) - .collect(Collectors.toList()); + partitions + .stream() + .map(topicPartition -> topicPartition.partition()) + .forEach(partition -> repository.resetStorageForPartition(partition)); Map lastSeen = consumer @@ -155,11 +276,7 @@ public class TransferConsumer implements Runnable partition -> partition, partition -> 0l)); - log.info("assigning {}}", partitions); - consumer.assign(partitions); - while ( - restoring && positions .entrySet() .stream() @@ -182,47 +299,46 @@ public class TransferConsumer implements Runnable catch(WakeupException e) { log.info("--> cleanly interrupted while restoring"); - return; } } log.info("--> restore completed!"); - restoring = false; - - // We are intentionally _not_ unsubscribing here, since that would - // reset the offset to _earliest_, because we disabled offset-commits. - - start(); } @PostMapping("start") public synchronized String start() { - if (restoring) + if (running) { - log.error("cannot start while restoring"); - return "Denied: Restoring!"; + log.info("already running!"); + return "Already running!"; } - String result = "Started"; - - if (running) + int foundNumPartitions = consumer.partitionsFor(topic).size(); + if (foundNumPartitions != numPartitions) { - stop(); - result = "Restarted"; + log.error( + "unexpected number of partitions for topic {}: expected={}, found={}", + topic, + numPartitions, + foundNumPartitions + ); + return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions; } + consumer.subscribe(List.of(topic), this); + running = true; future = CompletableFuture.runAsync(this); log.info("started"); - return result; + return "Started"; } @PostMapping("stop") public synchronized String stop() { - if (!(running || restoring)) + if (!running) { log.info("not running!"); return "Not running"; @@ -246,6 +362,7 @@ public class TransferConsumer implements Runnable finally { future = null; + consumer.unsubscribe(); } log.info("stopped"); @@ -262,6 +379,7 @@ public class TransferConsumer implements Runnable } + public interface ConsumerUseCases extends GetTransferUseCase, diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java index 5f30df6..d81c554 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java @@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture; private final GetTransferUseCase getTransferUseCase; private final MessagingService messagingService; + private final TransferConsumer consumer; @PostMapping( @@ -91,10 +92,22 @@ import java.util.concurrent.CompletableFuture; public ResponseEntity get(@PathVariable Long id) { return - getTransferUseCase - .get(id) - .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer))) - .orElse(ResponseEntity.notFound().build()); + consumer + .uriForKey(Long.toString(id)) + .map(uri -> + { + ResponseEntity response = + ResponseEntity + .status(HttpStatus.TEMPORARY_REDIRECT) + .location(URI.create(uri + PATH + "/" + id)) + .build(); + return response; + }) + .orElseGet(() -> + getTransferUseCase + .get(id) + .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer))) + .orElse(ResponseEntity.notFound().build())); } @ResponseStatus(HttpStatus.BAD_REQUEST) diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferPartitioner.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferPartitioner.java new file mode 100644 index 0000000..0e8cda2 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferPartitioner.java @@ -0,0 +1,105 @@ +package de.juplo.kafka.payment.transfer.adapter; + +import de.juplo.kafka.payment.transfer.domain.Transfer; +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.utils.Utils; +import org.springframework.util.Assert; + +import java.nio.charset.Charset; +import java.util.Map; + + +/** + * This partitioner uses the same algorithm to compute the partion + * for a given record as hash of its key as the + * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}. + * + * The main reason, to use this partitioner instead of the default is, to + * make it more explicitly visible in the code of this example, that the + * same algorithm is used when calculating the partition for a record, that + * is about to be send, as when calculating the partition for a given + * {@link Transfer#getId() transfer-id} to look up the node, that can + * serve a get-request for that transfer. + */ +public class TransferPartitioner implements Partitioner +{ + public final static Charset CONVERSION_CHARSET = Charset.forName("UTF-8"); + + public final static String TOPIC = "topic"; + public final static String NUM_PARTITIONS = "num-partitions"; + + + /** + * Computes the partition as a hash of the given key. + * + * @param key The key + * @return An int, that represents the partition. + */ + public static int computeHashForKey(String key, int numPartitions) + { + return TransferPartitioner.computeHashForKey(key.getBytes(CONVERSION_CHARSET), numPartitions); + } + + /** + * Computes the partition as a hash of the given key. + * + * @param keyBytes The key as an byte-array. + * @return An int, that represents the partition. + */ + public static int computeHashForKey(byte[] keyBytes, int numPartitions) + { + return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; + } + + + private String topic; + private int numPartitions; + + @Override + public void configure(Map configs) + { + Assert.notNull(configs.get(TOPIC), TOPIC + " must not be null"); + Assert.hasText(configs.get(TOPIC).toString(), TOPIC + " must not be empty"); + Assert.notNull(configs.get(NUM_PARTITIONS), NUM_PARTITIONS + " must not be null"); + Assert.isAssignable( + configs.get(NUM_PARTITIONS).getClass(), + Integer.class, + NUM_PARTITIONS + " must by an int"); + + topic = configs.get(TOPIC).toString(); + numPartitions = (Integer)configs.get(NUM_PARTITIONS); + } + + /** + * Compute the partition for the given record. + * + * @param topic The topic name - Only used to check, if it equals the configured fix topic. + * @param key The key to partition on - Not used: the algorithm uses the argument keyBytes. + * @param keyBytes serialized key to partition on - Must not be null. + * @param value The value to partition on or null - Not used. + * @param valueBytes serialized value to partition on or null - Not used. + * @param cluster The current cluster metadata - Not used. + */ + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) + { + if (keyBytes == null) + throw new IllegalArgumentException("The argument \"keyBytes\" must not be null"); + + if (!topic.equals(this.topic)) + throw new IllegalArgumentException( + "This partitioner can only be used for a fixe partition. Here: fixed=" + + this.topic + + ", requested=" + + topic); + + // "Stolen" from the DefaultPartitioner: hash the keyBytes to choose a partition + return computeHashForKey(keyBytes, numPartitions); + } + + @Override + public void close() {} + @Override + public void onNewBatch(String topic, Cluster cluster, int prevPartition) {} +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java index ec293ad..eaad3ee 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java @@ -2,6 +2,7 @@ package de.juplo.kafka.payment.transfer.persistence; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner; import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.RequiredArgsConstructor; @@ -13,21 +14,31 @@ import java.util.Map; import java.util.Optional; -@Component -@RequiredArgsConstructor @Slf4j public class InMemoryTransferRepository implements TransferRepository { - private final Map map = new HashMap<>(); + private final int numPartitions; + private final Map map[]; private final ObjectMapper mapper; + public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper) + { + this.numPartitions = numPartitions; + this.map = new HashMap[numPartitions]; + for (int i = 0; i < numPartitions; i++) + this.map[i] = new HashMap<>(); + this.mapper = mapper; + } + + @Override public void store(Transfer transfer) { try { - map.put(transfer.getId(), mapper.writeValueAsString(transfer)); + int partition = partitionForId(transfer.getId()); + map[partition].put(transfer.getId(), mapper.writeValueAsString(transfer)); } catch (JsonProcessingException e) { @@ -40,7 +51,7 @@ public class InMemoryTransferRepository implements TransferRepository { return Optional - .ofNullable(map.get(id)) + .ofNullable(map[partitionForId(id)].get(id)) .map(json -> { try { @@ -56,6 +67,22 @@ public class InMemoryTransferRepository implements TransferRepository @Override public void remove(Long id) { - map.remove(id); + map[partitionForId(id)].remove(id); + } + + @Override + public void resetStorageForPartition(int partition) + { + log.info( + "reseting storage for partition {}: dropping {} entries", + partition, + map[partition].size()); + map[partition].clear(); + } + + private int partitionForId(long id) + { + String key = Long.toString(id); + return TransferPartitioner.computeHashForKey(key, numPartitions); } } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java index e44a1d6..0629eab 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java @@ -12,4 +12,6 @@ public interface TransferRepository Optional get(Long id); void remove(Long id); + + void resetStorageForPartition(int partition); } diff --git a/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java b/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java index 39fb248..12f5eba 100644 --- a/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java +++ b/src/test/java/de/juplo/kafka/payment/transfer/TransferServiceApplicationTests.java @@ -2,8 +2,11 @@ package de.juplo.kafka.payment.transfer; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.test.context.EmbeddedKafka; + @SpringBootTest +@EmbeddedKafka class TransferServiceApplicationTests { @Test -- 2.20.1