X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferPartitioner.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferPartitioner.java;h=0e8cda2c15f83dc41d48ecac87978a3924574ed0;hp=0000000000000000000000000000000000000000;hb=43ea59755f9673864a3ef95250009f091e99a760;hpb=cbfe4b796266ff7b9689fb69c5a8efee8ebb130a diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferPartitioner.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferPartitioner.java new file mode 100644 index 0000000..0e8cda2 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferPartitioner.java @@ -0,0 +1,105 @@ +package de.juplo.kafka.payment.transfer.adapter; + +import de.juplo.kafka.payment.transfer.domain.Transfer; +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.utils.Utils; +import org.springframework.util.Assert; + +import java.nio.charset.Charset; +import java.util.Map; + + +/** + * This partitioner uses the same algorithm to compute the partion + * for a given record as hash of its key as the + * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}. + * + * The main reason, to use this partitioner instead of the default is, to + * make it more explicitly visible in the code of this example, that the + * same algorithm is used when calculating the partition for a record, that + * is about to be send, as when calculating the partition for a given + * {@link Transfer#getId() transfer-id} to look up the node, that can + * serve a get-request for that transfer. + */ +public class TransferPartitioner implements Partitioner +{ + public final static Charset CONVERSION_CHARSET = Charset.forName("UTF-8"); + + public final static String TOPIC = "topic"; + public final static String NUM_PARTITIONS = "num-partitions"; + + + /** + * Computes the partition as a hash of the given key. + * + * @param key The key + * @return An int, that represents the partition. + */ + public static int computeHashForKey(String key, int numPartitions) + { + return TransferPartitioner.computeHashForKey(key.getBytes(CONVERSION_CHARSET), numPartitions); + } + + /** + * Computes the partition as a hash of the given key. + * + * @param keyBytes The key as an byte-array. + * @return An int, that represents the partition. + */ + public static int computeHashForKey(byte[] keyBytes, int numPartitions) + { + return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; + } + + + private String topic; + private int numPartitions; + + @Override + public void configure(Map configs) + { + Assert.notNull(configs.get(TOPIC), TOPIC + " must not be null"); + Assert.hasText(configs.get(TOPIC).toString(), TOPIC + " must not be empty"); + Assert.notNull(configs.get(NUM_PARTITIONS), NUM_PARTITIONS + " must not be null"); + Assert.isAssignable( + configs.get(NUM_PARTITIONS).getClass(), + Integer.class, + NUM_PARTITIONS + " must by an int"); + + topic = configs.get(TOPIC).toString(); + numPartitions = (Integer)configs.get(NUM_PARTITIONS); + } + + /** + * Compute the partition for the given record. + * + * @param topic The topic name - Only used to check, if it equals the configured fix topic. + * @param key The key to partition on - Not used: the algorithm uses the argument keyBytes. + * @param keyBytes serialized key to partition on - Must not be null. + * @param value The value to partition on or null - Not used. + * @param valueBytes serialized value to partition on or null - Not used. + * @param cluster The current cluster metadata - Not used. + */ + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) + { + if (keyBytes == null) + throw new IllegalArgumentException("The argument \"keyBytes\" must not be null"); + + if (!topic.equals(this.topic)) + throw new IllegalArgumentException( + "This partitioner can only be used for a fixe partition. Here: fixed=" + + this.topic + + ", requested=" + + topic); + + // "Stolen" from the DefaultPartitioner: hash the keyBytes to choose a partition + return computeHashForKey(keyBytes, numPartitions); + } + + @Override + public void close() {} + @Override + public void onNewBatch(String topic, Cluster cluster, int prevPartition) {} +}