Switched from single-node (assign) to multi-instance (subscribe)
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferPartitioner.java
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferPartitioner.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferPartitioner.java
new file mode 100644 (file)
index 0000000..0e8cda2
--- /dev/null
@@ -0,0 +1,105 @@
+package de.juplo.kafka.payment.transfer.adapter;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.utils.Utils;
+import org.springframework.util.Assert;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+
+/**
+ * This partitioner uses the same algorithm to compute the partion
+ * for a given record as hash of its key as the
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+ *
+ * The main reason, to use this partitioner instead of the default is, to
+ * make it more explicitly visible in the code of this example, that the
+ * same algorithm is used when calculating the partition for a record, that
+ * is about to be send, as when calculating the partition for a given
+ * {@link Transfer#getId() transfer-id} to look up the node, that can
+ * serve a get-request for that transfer.
+ */
+public class TransferPartitioner implements Partitioner
+{
+  public final static Charset CONVERSION_CHARSET = Charset.forName("UTF-8");
+
+  public final static String TOPIC = "topic";
+  public final static String NUM_PARTITIONS = "num-partitions";
+
+
+  /**
+   * Computes the partition as a hash of the given key.
+   *
+   * @param key The key
+   * @return An <code>int</code>, that represents the partition.
+   */
+  public static int computeHashForKey(String key, int numPartitions)
+  {
+    return TransferPartitioner.computeHashForKey(key.getBytes(CONVERSION_CHARSET), numPartitions);
+  }
+
+  /**
+   * Computes the partition as a hash of the given key.
+   *
+   * @param keyBytes The key as an <code>byte</code></code>-array.
+   * @return An <code>int</code>, that represents the partition.
+   */
+  public static int computeHashForKey(byte[] keyBytes, int numPartitions)
+  {
+    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+  }
+
+
+  private String topic;
+  private int numPartitions;
+
+  @Override
+  public void configure(Map<String, ?> configs)
+  {
+    Assert.notNull(configs.get(TOPIC), TOPIC + " must not be null");
+    Assert.hasText(configs.get(TOPIC).toString(), TOPIC + " must not be empty");
+    Assert.notNull(configs.get(NUM_PARTITIONS), NUM_PARTITIONS + " must not be null");
+    Assert.isAssignable(
+        configs.get(NUM_PARTITIONS).getClass(),
+        Integer.class,
+        NUM_PARTITIONS + " must by an int");
+
+    topic = configs.get(TOPIC).toString();
+    numPartitions = (Integer)configs.get(NUM_PARTITIONS);
+  }
+
+  /**
+   * Compute the partition for the given record.
+   *
+   * @param topic The topic name - Only used to check, if it equals the configured fix topic.
+   * @param key The key to partition on - Not used: the algorithm uses the argument <code>keyBytes</code>.
+   * @param keyBytes serialized key to partition on - Must not be null.
+   * @param value The value to partition on or null - Not used.
+   * @param valueBytes serialized value to partition on or null - Not used.
+   * @param cluster The current cluster metadata - Not used.
+   */
+  @Override
+  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
+  {
+    if (keyBytes == null)
+      throw new IllegalArgumentException("The argument \"keyBytes\" must not be null");
+
+    if (!topic.equals(this.topic))
+      throw new IllegalArgumentException(
+          "This partitioner can only be used for a fixe partition. Here: fixed=" +
+              this.topic +
+              ", requested=" +
+              topic);
+
+    // "Stolen" from the DefaultPartitioner: hash the keyBytes to choose a partition
+    return computeHashForKey(keyBytes, numPartitions);
+  }
+
+  @Override
+  public void close() {}
+  @Override
+  public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
+}