* TransferConsumer uses subscribe() instead of assign().
* The subscription happens during the instanciation of TransferConsumer.
* The restorage-process of the state happens in onPartitionsAssigned().
* To be able to reset all data, that belongs to a specific partition
after a rebalance, in order to avoid state errors because of handling
events several times, InMemoryTransferRepository must know, which
partition a transfer belongs to.
* To achieve this, the partitioning algorithm is made known explicitly.
* On each rebalance, a mapping from the partions to the currently assigned
instances is maintained.
* TransferController uses the explicitly known partitioning algorithm, to
look up the partion for a requested transfer and decide, if the data is
available locally.
* If not, it looks up the assigned instance in the maintained mapping and
redirects the request.
--- /dev/null
+juplo:
+ transfer:
+ group-instance-id: peter
<properties>
<java.version>11</java.version>
<confluent.version>6.2.0</confluent.version>
- <kafka.version>2.8.0</kafka.version>
+ <kafka.version>2.7.1</kafka.version>
<docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
+ <spring-kafka-test.version>2.7.2</spring-kafka-test.version>
</properties>
<dependencies>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <version>${spring-kafka-test.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
-import de.juplo.kafka.payment.transfer.adapter.NoOpMessageService;
-import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
-import de.juplo.kafka.payment.transfer.adapter.TransferController;
+import de.juplo.kafka.payment.transfer.adapter.*;
import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository;
import de.juplo.kafka.payment.transfer.ports.TransferRepository;
import de.juplo.kafka.payment.transfer.ports.TransferService;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@Slf4j
public class TransferServiceApplication
{
+ @Bean(destroyMethod = "close")
+ AdminClient adminClient(TransferServiceProperties properties)
+ {
+ Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
+
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
+
+ return AdminClient.create(props);
+ }
+
@Bean(destroyMethod = "close")
KafkaProducer<String, String> producer(TransferServiceProperties properties)
{
Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set");
+ Assert.notNull(properties.getNumPartitions(), "juplo.transfer.num-partitions must be set");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, TransferPartitioner.class);
+ props.put(TransferPartitioner.TOPIC, properties.getTopic());
+ props.put(TransferPartitioner.NUM_PARTITIONS, properties.getNumPartitions());
return new KafkaProducer<>(props);
}
{
Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
+ Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
+ props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId());
+ props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
TransferConsumer transferConsumer(
TransferServiceProperties properties,
KafkaConsumer<String, String> consumer,
+ AdminClient adminClient,
+ TransferRepository repository,
ObjectMapper mapper,
TransferService productionTransferService,
TransferService restoreTransferService)
return
new TransferConsumer(
properties.getTopic(),
+ properties.getNumPartitions(),
+ properties.getInstanceIdUriMapping(),
consumer,
+ adminClient,
+ repository,
mapper,
new TransferConsumer.ConsumerUseCases() {
@Override
return new KafkaMessagingService(producer, mapper, properties.getTopic());
}
+ @Bean
+ InMemoryTransferRepository inMemoryTransferRepository(
+ TransferServiceProperties properties,
+ ObjectMapper mapper)
+ {
+ return new InMemoryTransferRepository(properties.getNumPartitions(), mapper);
+ }
+
@Bean
TransferService productionTransferService(
TransferRepository repository,
@Bean
TransferController transferController(
TransferService productionTransferService,
- KafkaMessagingService kafkaMessagingService)
+ KafkaMessagingService kafkaMessagingService,
+ TransferConsumer transferConsumer)
{
- return new TransferController(productionTransferService, kafkaMessagingService);
+ return new TransferController(productionTransferService, kafkaMessagingService, transferConsumer);
}
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
+import java.util.HashMap;
+import java.util.Map;
+
@ConfigurationProperties("juplo.transfer")
@Getter
{
private String bootstrapServers = "localhost:9092";
private String topic = "transfers";
+ private Integer numPartitions = 5;
private String groupId = "transfers";
+ private String groupInstanceId;
+ private Map<String, String> instanceIdUriMapping;
+
+ public Map<String, String> getInstanceIdUriMapping()
+ {
+ return instanceIdUriMapping == null ? new HashMap<>() : instanceIdUriMapping;
+ }
}
import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
-import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.payment.transfer.ports.TransferRepository;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.web.bind.annotation.ResponseBody;
import java.time.Duration;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@RequestMapping("/consumer")
@ResponseBody
-@RequiredArgsConstructor
@Slf4j
-public class TransferConsumer implements Runnable
+public class TransferConsumer implements Runnable, ConsumerRebalanceListener
{
private final String topic;
+ private final int numPartitions;
private final KafkaConsumer<String, String> consumer;
+ private final AdminClient adminClient;
+ private final TransferRepository repository;
private final ObjectMapper mapper;
private final ConsumerUseCases productionUseCases, restoreUseCases;
- private boolean restoring = true;
private boolean running = false;
private boolean shutdown = false;
private Future<?> future = null;
+ private final String groupId;
+ private final String groupInstanceId;
+ private final Map<String, String> instanceIdUriMapping;
+ private final String[] instanceIdByPartition;
+
+ private volatile boolean partitionOwnershipUnknown = true;
+
+
+ public TransferConsumer(
+ String topic,
+ int numPartitions,
+ Map<String, String> instanceIdUriMapping,
+ KafkaConsumer<String, String> consumer,
+ AdminClient adminClient,
+ TransferRepository repository,
+ ObjectMapper mapper,
+ ConsumerUseCases productionUseCases,
+ ConsumerUseCases restoreUseCases)
+ {
+ this.topic = topic;
+ this.numPartitions = numPartitions;
+ this.groupId = consumer.groupMetadata().groupId();
+ this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
+ this.instanceIdByPartition = new String[numPartitions];
+ this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
+ for (String instanceId : instanceIdUriMapping.keySet())
+ {
+ // Requests are not redirected for the instance itself
+ String uri = instanceId.equals(groupInstanceId)
+ ? null
+ : instanceIdUriMapping.get(instanceId);
+ this.instanceIdUriMapping.put(instanceId, uri);
+ }
+ this.consumer = consumer;
+ this.adminClient = adminClient;
+ this.repository = repository;
+ this.mapper = mapper;
+ this.productionUseCases = productionUseCases;
+ this.restoreUseCases = restoreUseCases;
+ }
+
@Override
public void run()
}
}
+
+ public Optional<String> uriForKey(String key)
+ {
+ synchronized (this)
+ {
+ while (partitionOwnershipUnknown)
+ {
+ try { wait(); } catch (InterruptedException e) {}
+ }
+
+ int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
+ return
+ Optional
+ .ofNullable(instanceIdByPartition[partition])
+ .map(id -> instanceIdUriMapping.get(id));
+ }
+ }
+
@EventListener
public synchronized void onApplicationEvent(ContextRefreshedEvent event)
{
- // Needed, because this method is called synchronously during the
- // initialization pahse of Spring. If the restoring is processed
+ // "Needed", because this method is called synchronously during the
+ // initialization pahse of Spring. If the subscription happens
// in the same thread, it would block the completion of the initialization.
// Hence, the app would not react to any signal (CTRL-C, for example) except
// a KILL until the restoring is finished.
- future = CompletableFuture.runAsync(() -> restore());
+ future = CompletableFuture.runAsync(() -> start());
+ }
+
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitionOwnershipUnknown = true;
+ log.info("partiotions revoked: {}", partitions);
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ log.info("partiotions assigned: {}", partitions);
+ fetchAssignmentsAsync();
+ if (partitions.size() > 0)
+ restore(partitions);
}
- private void restore()
+ private void fetchAssignmentsAsync()
+ {
+ adminClient
+ .describeConsumerGroups(List.of(groupId))
+ .describedGroups()
+ .get(groupId)
+ .whenComplete((descriptions, e) ->
+ {
+ if (e != null)
+ {
+ log.error("could not fetch group data: {}", e.getMessage());
+ }
+ else
+ {
+ synchronized (this)
+ {
+ for (MemberDescription description : descriptions.members())
+ {
+ description
+ .assignment()
+ .topicPartitions()
+ .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
+ }
+ partitionOwnershipUnknown = false;
+ notifyAll();
+ }
+ }
+ });
+ }
+
+ private int partitionForId(long id)
+ {
+ String key = Long.toString(id);
+ return TransferPartitioner.computeHashForKey(key, numPartitions);
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ partitionOwnershipUnknown = true;
+ log.info("partiotions lost: {}", partitions);
+ }
+
+
+ private void restore(Collection<TopicPartition> partitions)
{
log.info("--> starting restore...");
- List<TopicPartition> partitions =
- consumer
- .partitionsFor(topic)
- .stream()
- .map(info -> new TopicPartition(topic, info.partition()))
- .collect(Collectors.toList());
+ partitions
+ .stream()
+ .map(topicPartition -> topicPartition.partition())
+ .forEach(partition -> repository.resetStorageForPartition(partition));
Map<Integer, Long> lastSeen =
consumer
partition -> partition,
partition -> 0l));
- log.info("assigning {}}", partitions);
- consumer.assign(partitions);
-
while (
- restoring &&
positions
.entrySet()
.stream()
catch(WakeupException e)
{
log.info("--> cleanly interrupted while restoring");
- return;
}
}
log.info("--> restore completed!");
- restoring = false;
-
- // We are intentionally _not_ unsubscribing here, since that would
- // reset the offset to _earliest_, because we disabled offset-commits.
-
- start();
}
@PostMapping("start")
public synchronized String start()
{
- if (restoring)
+ if (running)
{
- log.error("cannot start while restoring");
- return "Denied: Restoring!";
+ log.info("already running!");
+ return "Already running!";
}
- String result = "Started";
-
- if (running)
+ int foundNumPartitions = consumer.partitionsFor(topic).size();
+ if (foundNumPartitions != numPartitions)
{
- stop();
- result = "Restarted";
+ log.error(
+ "unexpected number of partitions for topic {}: expected={}, found={}",
+ topic,
+ numPartitions,
+ foundNumPartitions
+ );
+ return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
}
+ consumer.subscribe(List.of(topic), this);
+
running = true;
future = CompletableFuture.runAsync(this);
log.info("started");
- return result;
+ return "Started";
}
@PostMapping("stop")
public synchronized String stop()
{
- if (!(running || restoring))
+ if (!running)
{
log.info("not running!");
return "Not running";
finally
{
future = null;
+ consumer.unsubscribe();
}
log.info("stopped");
}
+
public interface ConsumerUseCases
extends
GetTransferUseCase,
private final GetTransferUseCase getTransferUseCase;
private final MessagingService messagingService;
+ private final TransferConsumer consumer;
@PostMapping(
public ResponseEntity<TransferDTO> get(@PathVariable Long id)
{
return
- getTransferUseCase
- .get(id)
- .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer)))
- .orElse(ResponseEntity.notFound().build());
+ consumer
+ .uriForKey(Long.toString(id))
+ .map(uri ->
+ {
+ ResponseEntity<TransferDTO> response =
+ ResponseEntity
+ .status(HttpStatus.TEMPORARY_REDIRECT)
+ .location(URI.create(uri + PATH + "/" + id))
+ .build();
+ return response;
+ })
+ .orElseGet(() ->
+ getTransferUseCase
+ .get(id)
+ .map(transfer -> ResponseEntity.ok(TransferDTO.of(transfer)))
+ .orElse(ResponseEntity.notFound().build()));
}
@ResponseStatus(HttpStatus.BAD_REQUEST)
--- /dev/null
+package de.juplo.kafka.payment.transfer.adapter;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.utils.Utils;
+import org.springframework.util.Assert;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+
+/**
+ * This partitioner uses the same algorithm to compute the partion
+ * for a given record as hash of its key as the
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+ *
+ * The main reason, to use this partitioner instead of the default is, to
+ * make it more explicitly visible in the code of this example, that the
+ * same algorithm is used when calculating the partition for a record, that
+ * is about to be send, as when calculating the partition for a given
+ * {@link Transfer#getId() transfer-id} to look up the node, that can
+ * serve a get-request for that transfer.
+ */
+public class TransferPartitioner implements Partitioner
+{
+ public final static Charset CONVERSION_CHARSET = Charset.forName("UTF-8");
+
+ public final static String TOPIC = "topic";
+ public final static String NUM_PARTITIONS = "num-partitions";
+
+
+ /**
+ * Computes the partition as a hash of the given key.
+ *
+ * @param key The key
+ * @return An <code>int</code>, that represents the partition.
+ */
+ public static int computeHashForKey(String key, int numPartitions)
+ {
+ return TransferPartitioner.computeHashForKey(key.getBytes(CONVERSION_CHARSET), numPartitions);
+ }
+
+ /**
+ * Computes the partition as a hash of the given key.
+ *
+ * @param keyBytes The key as an <code>byte</code></code>-array.
+ * @return An <code>int</code>, that represents the partition.
+ */
+ public static int computeHashForKey(byte[] keyBytes, int numPartitions)
+ {
+ return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+ }
+
+
+ private String topic;
+ private int numPartitions;
+
+ @Override
+ public void configure(Map<String, ?> configs)
+ {
+ Assert.notNull(configs.get(TOPIC), TOPIC + " must not be null");
+ Assert.hasText(configs.get(TOPIC).toString(), TOPIC + " must not be empty");
+ Assert.notNull(configs.get(NUM_PARTITIONS), NUM_PARTITIONS + " must not be null");
+ Assert.isAssignable(
+ configs.get(NUM_PARTITIONS).getClass(),
+ Integer.class,
+ NUM_PARTITIONS + " must by an int");
+
+ topic = configs.get(TOPIC).toString();
+ numPartitions = (Integer)configs.get(NUM_PARTITIONS);
+ }
+
+ /**
+ * Compute the partition for the given record.
+ *
+ * @param topic The topic name - Only used to check, if it equals the configured fix topic.
+ * @param key The key to partition on - Not used: the algorithm uses the argument <code>keyBytes</code>.
+ * @param keyBytes serialized key to partition on - Must not be null.
+ * @param value The value to partition on or null - Not used.
+ * @param valueBytes serialized value to partition on or null - Not used.
+ * @param cluster The current cluster metadata - Not used.
+ */
+ @Override
+ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
+ {
+ if (keyBytes == null)
+ throw new IllegalArgumentException("The argument \"keyBytes\" must not be null");
+
+ if (!topic.equals(this.topic))
+ throw new IllegalArgumentException(
+ "This partitioner can only be used for a fixe partition. Here: fixed=" +
+ this.topic +
+ ", requested=" +
+ topic);
+
+ // "Stolen" from the DefaultPartitioner: hash the keyBytes to choose a partition
+ return computeHashForKey(keyBytes, numPartitions);
+ }
+
+ @Override
+ public void close() {}
+ @Override
+ public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
+}
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
import de.juplo.kafka.payment.transfer.domain.Transfer;
import de.juplo.kafka.payment.transfer.ports.TransferRepository;
import lombok.RequiredArgsConstructor;
import java.util.Optional;
-@Component
-@RequiredArgsConstructor
@Slf4j
public class InMemoryTransferRepository implements TransferRepository
{
- private final Map<Long, String> map = new HashMap<>();
+ private final int numPartitions;
+ private final Map<Long, String> map[];
private final ObjectMapper mapper;
+ public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
+ {
+ this.numPartitions = numPartitions;
+ this.map = new HashMap[numPartitions];
+ for (int i = 0; i < numPartitions; i++)
+ this.map[i] = new HashMap<>();
+ this.mapper = mapper;
+ }
+
+
@Override
public void store(Transfer transfer)
{
try
{
- map.put(transfer.getId(), mapper.writeValueAsString(transfer));
+ int partition = partitionForId(transfer.getId());
+ map[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
}
catch (JsonProcessingException e)
{
{
return
Optional
- .ofNullable(map.get(id))
+ .ofNullable(map[partitionForId(id)].get(id))
.map(json -> {
try
{
@Override
public void remove(Long id)
{
- map.remove(id);
+ map[partitionForId(id)].remove(id);
+ }
+
+ @Override
+ public void resetStorageForPartition(int partition)
+ {
+ log.info(
+ "reseting storage for partition {}: dropping {} entries",
+ partition,
+ map[partition].size());
+ map[partition].clear();
+ }
+
+ private int partitionForId(long id)
+ {
+ String key = Long.toString(id);
+ return TransferPartitioner.computeHashForKey(key, numPartitions);
}
}
Optional<Transfer> get(Long id);
void remove(Long id);
+
+ void resetStorageForPartition(int partition);
}
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
@SpringBootTest
+@EmbeddedKafka
class TransferServiceApplicationTests
{
@Test