1 package de.juplo.kafka.payment.transfer.adapter;
3 import de.juplo.kafka.payment.transfer.domain.Transfer;
4 import org.apache.kafka.clients.producer.Partitioner;
5 import org.apache.kafka.common.Cluster;
6 import org.apache.kafka.common.utils.Utils;
7 import org.springframework.util.Assert;
9 import java.nio.charset.Charset;
14 * This partitioner uses the same algorithm to compute the partion
15 * for a given record as hash of its key as the
16 * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
18 * The main reason, to use this partitioner instead of the default is, to
19 * make it more explicitly visible in the code of this example, that the
20 * same algorithm is used when calculating the partition for a record, that
21 * is about to be send, as when calculating the partition for a given
22 * {@link Transfer#getId() transfer-id} to look up the node, that can
23 * serve a get-request for that transfer.
25 public class TransferPartitioner implements Partitioner
27 public final static Charset CONVERSION_CHARSET = Charset.forName("UTF-8");
29 public final static String TOPIC = "topic";
30 public final static String NUM_PARTITIONS = "num-partitions";
34 * Computes the partition as a hash of the given key.
37 * @return An <code>int</code>, that represents the partition.
39 public static int computeHashForKey(String key, int numPartitions)
41 return TransferPartitioner.computeHashForKey(key.getBytes(CONVERSION_CHARSET), numPartitions);
45 * Computes the partition as a hash of the given key.
47 * @param keyBytes The key as an <code>byte</code></code>-array.
48 * @return An <code>int</code>, that represents the partition.
50 public static int computeHashForKey(byte[] keyBytes, int numPartitions)
52 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
57 private int numPartitions;
60 public void configure(Map<String, ?> configs)
62 Assert.notNull(configs.get(TOPIC), TOPIC + " must not be null");
63 Assert.hasText(configs.get(TOPIC).toString(), TOPIC + " must not be empty");
64 Assert.notNull(configs.get(NUM_PARTITIONS), NUM_PARTITIONS + " must not be null");
66 configs.get(NUM_PARTITIONS).getClass(),
68 NUM_PARTITIONS + " must by an int");
70 topic = configs.get(TOPIC).toString();
71 numPartitions = (Integer)configs.get(NUM_PARTITIONS);
75 * Compute the partition for the given record.
77 * @param topic The topic name - Only used to check, if it equals the configured fix topic.
78 * @param key The key to partition on - Not used: the algorithm uses the argument <code>keyBytes</code>.
79 * @param keyBytes serialized key to partition on - Must not be null.
80 * @param value The value to partition on or null - Not used.
81 * @param valueBytes serialized value to partition on or null - Not used.
82 * @param cluster The current cluster metadata - Not used.
85 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
88 throw new IllegalArgumentException("The argument \"keyBytes\" must not be null");
90 if (!topic.equals(this.topic))
91 throw new IllegalArgumentException(
92 "This partitioner can only be used for a fixe partition. Here: fixed=" +
97 // "Stolen" from the DefaultPartitioner: hash the keyBytes to choose a partition
98 return computeHashForKey(keyBytes, numPartitions);
102 public void close() {}
104 public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}