Switched from single-node (assign) to multi-instance (subscribe)
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / persistence / InMemoryTransferRepository.java
1 package de.juplo.kafka.payment.transfer.persistence;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
6 import de.juplo.kafka.payment.transfer.domain.Transfer;
7 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
8 import lombok.RequiredArgsConstructor;
9 import lombok.extern.slf4j.Slf4j;
10 import org.springframework.stereotype.Component;
11
12 import java.util.HashMap;
13 import java.util.Map;
14 import java.util.Optional;
15
16
17 @Slf4j
18 public class InMemoryTransferRepository implements TransferRepository
19 {
20   private final int numPartitions;
21   private final Map<Long, String> map[];
22   private final ObjectMapper mapper;
23
24
25   public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
26   {
27     this.numPartitions = numPartitions;
28     this.map = new HashMap[numPartitions];
29     for (int i = 0; i < numPartitions; i++)
30       this.map[i] = new HashMap<>();
31     this.mapper = mapper;
32   }
33
34
35   @Override
36   public void store(Transfer transfer)
37   {
38     try
39     {
40       int partition = partitionForId(transfer.getId());
41       map[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
42     }
43     catch (JsonProcessingException e)
44     {
45       throw new RuntimeException(e);
46     }
47   }
48
49   @Override
50   public Optional<Transfer> get(Long id)
51   {
52     return
53         Optional
54             .ofNullable(map[partitionForId(id)].get(id))
55             .map(json -> {
56               try
57               {
58                 return mapper.readValue(json, Transfer.class);
59               }
60               catch (JsonProcessingException e)
61               {
62                 throw new RuntimeException("Could not convert JSON: " + json, e);
63               }
64             });
65   }
66
67   @Override
68   public void remove(Long id)
69   {
70     map[partitionForId(id)].remove(id);
71   }
72
73   @Override
74   public void resetStorageForPartition(int partition)
75   {
76     log.info(
77         "reseting storage for partition {}: dropping {} entries",
78         partition,
79         map[partition].size());
80     map[partition].clear();
81   }
82
83   private int partitionForId(long id)
84   {
85     String key = Long.toString(id);
86     return TransferPartitioner.computeHashForKey(key, numPartitions);
87   }
88 }