Switched from single-node (assign) to multi-instance (subscribe)
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferPartitioner.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import de.juplo.kafka.payment.transfer.domain.Transfer;
4 import org.apache.kafka.clients.producer.Partitioner;
5 import org.apache.kafka.common.Cluster;
6 import org.apache.kafka.common.utils.Utils;
7 import org.springframework.util.Assert;
8
9 import java.nio.charset.Charset;
10 import java.util.Map;
11
12
13 /**
14  * This partitioner uses the same algorithm to compute the partion
15  * for a given record as hash of its key as the
16  * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
17  *
18  * The main reason, to use this partitioner instead of the default is, to
19  * make it more explicitly visible in the code of this example, that the
20  * same algorithm is used when calculating the partition for a record, that
21  * is about to be send, as when calculating the partition for a given
22  * {@link Transfer#getId() transfer-id} to look up the node, that can
23  * serve a get-request for that transfer.
24  */
25 public class TransferPartitioner implements Partitioner
26 {
27   public final static Charset CONVERSION_CHARSET = Charset.forName("UTF-8");
28
29   public final static String TOPIC = "topic";
30   public final static String NUM_PARTITIONS = "num-partitions";
31
32
33   /**
34    * Computes the partition as a hash of the given key.
35    *
36    * @param key The key
37    * @return An <code>int</code>, that represents the partition.
38    */
39   public static int computeHashForKey(String key, int numPartitions)
40   {
41     return TransferPartitioner.computeHashForKey(key.getBytes(CONVERSION_CHARSET), numPartitions);
42   }
43
44   /**
45    * Computes the partition as a hash of the given key.
46    *
47    * @param keyBytes The key as an <code>byte</code></code>-array.
48    * @return An <code>int</code>, that represents the partition.
49    */
50   public static int computeHashForKey(byte[] keyBytes, int numPartitions)
51   {
52     return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
53   }
54
55
56   private String topic;
57   private int numPartitions;
58
59   @Override
60   public void configure(Map<String, ?> configs)
61   {
62     Assert.notNull(configs.get(TOPIC), TOPIC + " must not be null");
63     Assert.hasText(configs.get(TOPIC).toString(), TOPIC + " must not be empty");
64     Assert.notNull(configs.get(NUM_PARTITIONS), NUM_PARTITIONS + " must not be null");
65     Assert.isAssignable(
66         configs.get(NUM_PARTITIONS).getClass(),
67         Integer.class,
68         NUM_PARTITIONS + " must by an int");
69
70     topic = configs.get(TOPIC).toString();
71     numPartitions = (Integer)configs.get(NUM_PARTITIONS);
72   }
73
74   /**
75    * Compute the partition for the given record.
76    *
77    * @param topic The topic name - Only used to check, if it equals the configured fix topic.
78    * @param key The key to partition on - Not used: the algorithm uses the argument <code>keyBytes</code>.
79    * @param keyBytes serialized key to partition on - Must not be null.
80    * @param value The value to partition on or null - Not used.
81    * @param valueBytes serialized value to partition on or null - Not used.
82    * @param cluster The current cluster metadata - Not used.
83    */
84   @Override
85   public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
86   {
87     if (keyBytes == null)
88       throw new IllegalArgumentException("The argument \"keyBytes\" must not be null");
89
90     if (!topic.equals(this.topic))
91       throw new IllegalArgumentException(
92           "This partitioner can only be used for a fixe partition. Here: fixed=" +
93               this.topic +
94               ", requested=" +
95               topic);
96
97     // "Stolen" from the DefaultPartitioner: hash the keyBytes to choose a partition
98     return computeHashForKey(keyBytes, numPartitions);
99   }
100
101   @Override
102   public void close() {}
103   @Override
104   public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
105 }