public class AdderBusinessLogic
{
- private final Map<String, Long> state;
+ private final Map<String, AdderResult> state;
public AdderBusinessLogic()
this(new HashMap<>());
}
- public AdderBusinessLogic(Map<String, Long> state)
+ public AdderBusinessLogic(Map<String, AdderResult> state)
{
this.state = state;
}
- public synchronized void startSum(String user)
- {
- if (state.containsKey(user))
- throw new IllegalStateException("Sumation for " + user + " already in progress, state: " + state.get(user));
-
- state.put(user, 0l);
- }
-
public synchronized Optional<Long> getSum(String user)
{
- return Optional.ofNullable(state.get(user));
+ return Optional.ofNullable(state.get(user)).map(result -> result.sum);
}
public synchronized void addToSum(String user, Integer value)
{
- if (!state.containsKey(user))
- throw new IllegalStateException("No sumation for " + user + " in progress");
if (value == null || value < 1)
throw new IllegalArgumentException("Not a positive number: " + value);
- long result = state.get(user) + value;
- state.put(user, result);
+ long sum =
+ Optional
+ .ofNullable(state.get(user))
+ .map(result -> result.sum)
+ .orElse(0l);
+ state.put(user, new AdderResult(value, sum + value));
}
- public synchronized Long endSum(String user)
+ public synchronized AdderResult calculate(String user)
{
if (!state.containsKey(user))
throw new IllegalStateException("No sumation for " + user + " in progress");
return state.remove(user);
}
- protected Map<String, Long> getState()
+ protected Map<String, AdderResult> getState()
{
return state;
}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+@Getter
+@EqualsAndHashCode
+public class AdderResult
+{
+ final int number;
+ final long sum;
+
+ @Override
+ public String toString()
+ {
+ return "sum(" + number + ") = " + sum;
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+public class AdderResults
+{
+ private final Map<Integer, Map<String, List<AdderResult>>> results = new HashMap<>();
+
+
+ public void addResults(Integer partition, String user, AdderResult result)
+ {
+ Map<String, List<AdderResult>> resultsByUser = this.results.get(partition);
+
+ List<AdderResult> results = resultsByUser.get(user);
+ if (results == null)
+ {
+ results = new LinkedList<>();
+ resultsByUser.put(user, results);
+ }
+
+ results.add(result);
+ }
+
+ protected void addPartition(Integer partition, Map<String, List<AdderResult>> results)
+ {
+ this.results.put(partition, results);
+ }
+
+ protected Map<String, List<AdderResult>> removePartition(Integer partition)
+ {
+ return this.results.remove(partition);
+ }
+
+ public Map<Integer, Map<String, List<AdderResult>>> getState()
+ {
+ return results;
+ }
+
+ public Map<String, List<AdderResult>> getState(Integer partition)
+ {
+ return results.get(partition);
+ }
+}
public class ApplicationConfiguration
{
@Bean
- public ApplicationRecordHandler recordHandler()
+ public ApplicationRecordHandler recordHandler(AdderResults adderResults)
{
- return new ApplicationRecordHandler();
+ return new ApplicationRecordHandler(adderResults);
+ }
+
+ @Bean
+ public AdderResults adderResults()
+ {
+ return new AdderResults();
}
@Bean
public ApplicationRebalanceListener rebalanceListener(
ApplicationRecordHandler recordHandler,
+ AdderResults adderResults,
StateRepository stateRepository,
ApplicationProperties properties)
{
return new ApplicationRebalanceListener(
recordHandler,
+ adderResults,
stateRepository,
properties.getClientId(),
Clock.systemDefaultZone(),
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
-import java.util.Collection;
-import java.util.Map;
+import java.util.*;
@RequiredArgsConstructor
public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
{
private final ApplicationRecordHandler recordHandler;
+ private final AdderResults adderResults;
private final StateRepository stateRepository;
private final String id;
private final Clock clock;
private final Duration commitInterval;
+ private final Set<Integer> partitions = new HashSet<>();
+
private Instant lastCommit = Instant.EPOCH;
@Override
{
Integer partition = tp.partition();
log.info("{} - adding partition: {}", id, partition);
+ this.partitions.add(partition);
StateDocument document =
stateRepository
.findById(Integer.toString(partition))
.orElse(new StateDocument(partition));
recordHandler.addPartition(partition, document.state);
+ adderResults.addPartition(partition, document.results);
});
}
{
Integer partition = tp.partition();
log.info("{} - removing partition: {}", id, partition);
- Map<String, Long> removed = recordHandler.removePartition(partition);
- for (String key : removed.keySet())
+ this.partitions.remove(partition);
+ Map<String, AdderResult> state = recordHandler.removePartition(partition);
+ for (String key : state.keySet())
{
log.info(
"{} - Seen {} messages for partition={}|key={}",
id,
- removed.get(key),
+ state.get(key),
partition,
key);
}
- stateRepository.save(new StateDocument(partition, removed));
+ Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
+ stateRepository.save(new StateDocument(partition, state, results));
});
}
if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
{
log.debug("Storing data, last commit: {}", lastCommit);
- recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
+ partitions.forEach(partition -> stateRepository.save(
new StateDocument(
- partiton,
- adder.getState())));
+ partition,
+ recordHandler.getState(partition).getState(),
+ adderResults.getState(partition))));
lastCommit = clock.instant();
}
}
package de.juplo.kafka;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Map;
+@RequiredArgsConstructor
@Slf4j
public class ApplicationRecordHandler implements RecordHandler<String, String>
{
+ private final AdderResults results;
+
private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
Integer partition = record.partition();
String user = record.key();
String message = record.value();
- switch (message)
+
+ if (message.equals("CALCULATE"))
{
- case "START":
- state.get(partition).startSum(user);
- break;
-
- case "END":
- Long result = state.get(partition).endSum(user);
- log.info("New result for {}: {}", user, result);
- break;
-
- default:
- state.get(partition).addToSum(user, Integer.parseInt(message));
- break;
+ AdderResult result = state.get(partition).calculate(user);
+ log.info("New result for {}: {}", user, result);
+ results.addResults(partition, user, result);
+ return;
}
+
+ state.get(partition).addToSum(user, Integer.parseInt(message));
}
- protected void addPartition(Integer partition, Map<String, Long> state)
+ protected void addPartition(Integer partition, Map<String, AdderResult> state)
{
this.state.put(partition, new AdderBusinessLogic(state));
}
- protected Map<String, Long> removePartition(Integer partition)
+ protected Map<String, AdderResult> removePartition(Integer partition)
{
return this.state.remove(partition).getState();
}
{
return state;
}
+
+ public AdderBusinessLogic getState(Integer partition)
+ {
+ return state.get(partition);
+ }
}
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
{
private final EndlessConsumer consumer;
private final ApplicationRecordHandler recordHandler;
+ private final AdderResults results;
@PostMapping("start")
@GetMapping("state")
- public Map<Integer, Map<String, Long>> state()
+ public Map<Integer, Map<String, AdderResult>> state()
{
return
recordHandler
}
@GetMapping("state/{user}")
- public ResponseEntity<Long> seen(@PathVariable String user)
+ public ResponseEntity<Long> state(@PathVariable String user)
{
for (AdderBusinessLogic adder : recordHandler.getState().values())
{
return ResponseEntity.notFound().build();
}
+ @GetMapping("results")
+ public Map<Integer, Map<String, List<AdderResult>>> results()
+ {
+ return results.getState();
+ }
+
+ @GetMapping("results/{user}")
+ public ResponseEntity<List<AdderResult>> results(@PathVariable String user)
+ {
+ for (Map<String, List<AdderResult>> resultsByUser : this.results.getState().values())
+ {
+ List<AdderResult> results = resultsByUser.get(user);
+ if (results != null)
+ return ResponseEntity.ok(results);
+ }
+
+ return ResponseEntity.notFound().build();
+ }
+
@ExceptionHandler
@ResponseStatus(HttpStatus.BAD_REQUEST)
import org.springframework.data.mongodb.core.mapping.Document;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
{
@Id
public String id;
- public Map<String, Long> state;
+ public Map<String, AdderResult> state;
+ public Map<String, List<AdderResult>> results;
public StateDocument()
{
{
this.id = Integer.toString(partition);
this.state = new HashMap<>();
+ this.results = new HashMap<>();
}
public StateDocument(
Integer partition,
- Map<String, Long> state)
+ Map<String, AdderResult> state,
+ Map<String, List<AdderResult>> results)
{
this.id = Integer.toString(partition);
this.state = state;
+ this.results = results;
}
}
public class AdderBusinessLogicTest
{
- @Test
- @DisplayName("A new sum can be started, if it does not exist")
- public void testStartSumPossibleIfNotExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- assertThatNoException().isThrownBy(() -> adder.startSum("foo"));
- }
-
- @Test
- @DisplayName("Starting an already existing sum again, causes an IllegalStateException")
- public void testStartSumCausesExceptionIfExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- assertThatIllegalStateException().isThrownBy(() -> adder.startSum("foo"));
- }
-
@Test
@DisplayName("An empty Optional should be returned, for a non-existing sum")
public void testGetSumReturnsEmptyOptionalForNonExistingSum()
public void testGetSumReturnsNonEmptyOptionalForExistingSum()
{
AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
+ adder.addToSum("foo", 6);
assertThat(adder.getSum("foo")).isNotEmpty();
}
@Test
- @DisplayName("A sum can be ended, if it does exist")
- public void testEndSumPossibleIfSumExists()
+ @DisplayName("A sum can be calculated, if it does exist")
+ public void testCalculatePossibleIfSumExists()
{
AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- assertThatNoException().isThrownBy(() -> adder.endSum("foo"));
+ adder.addToSum("foo", 6);
+ assertThatNoException().isThrownBy(() -> adder.calculate("foo"));
}
@Test
@DisplayName("An existing sum is removed, if ended")
- public void testEndSumRemovesSumIfSumExists()
+ public void testCalculateRemovesSumIfSumExists()
{
AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- adder.endSum("foo");
+ adder.addToSum("foo", 6);
+ adder.calculate("foo");
assertThat(adder.getSum("foo")).isEmpty();
}
@Test
- @DisplayName("An existing Sum returns a non-null value, if ended")
- public void testEndSumReturnsNonNullValueIfSumExists()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- assertThat(adder.endSum("foo")).isNotNull();
- }
-
- @Test
- @DisplayName("An existing Sum returns a non-negative value, if ended")
- public void testEndSumReturnsNonNegativeValueIfSumExists()
+ @DisplayName("An existing sum returns a non-null value, if calculated")
+ public void testCalculateReturnsNonNullValueIfSumExists()
{
AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- assertThat(adder.endSum("foo")).isNotNegative();
+ adder.addToSum("foo", 6);
+ assertThat(adder.calculate("foo")).isNotNull();
}
@Test
@DisplayName("Ending a non-existing sum, causes an IllegalStateException")
- public void testEndSumCausesExceptionIfNotExists()
+ public void testCalculateCausesExceptionIfNotExists()
{
AdderBusinessLogic adder = new AdderBusinessLogic();
- assertThatIllegalStateException().isThrownBy(() -> adder.endSum("foo"));
+ assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo"));
}
@Test
- @DisplayName("Adding to a non-existent sum causes an IllegalStateException")
- public void testAddToSumCausesExceptionIfNotExists()
+ @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException")
+ public void testAddToSumWithNullValueCausesException()
{
AdderBusinessLogic adder = new AdderBusinessLogic();
- assertThatIllegalStateException().isThrownBy(() -> adder.addToSum("foo", 1));
- }
-
- @Test
- @DisplayName("Adding a null-value to an existing sum causes an IllegalArgumentException")
- public void testAddSumWithNullValueToExistingSumCausesException()
- {
- AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null));
}
@ParameterizedTest(name = "{index}: Adding {0}")
- @DisplayName("Adding a non-positive value to an existing sum causes an IllegalArgumentException")
+ @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException")
@ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE })
- public void testAddSumWithNonPositiveValueToExistingSumCausesException(int value)
+ public void testAddToSumWithNonPositiveValueCausesException(int value)
{
AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value));
}
- @Test
- @DisplayName("Can add a positive value to an existing sum")
- public void testAddSumWithPositiveValuePossibleIfSumExists()
+ @ParameterizedTest(name = "{index}: Adding {0}")
+ @DisplayName("Can add a positive value to a sum")
+ @ValueSource(ints = { 1, 3, 6, 66, 7, 9 })
+ public void testAddToSumWithPositiveValuePossible(int value)
{
AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
- assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", -1));
+ assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value));
}
@ParameterizedTest(name = "{index}: Summing up {0}")
@DisplayName("Adds up numbers correctly")
@MethodSource("numbersProvider")
- public void testAddSumAddsUpNumbersCorrectlyIfSumExists(int... numbers)
+ public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers)
{
long expectedResult = Arrays.stream(numbers).sum();
AdderBusinessLogic adder = new AdderBusinessLogic();
- adder.startSum("foo");
Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number));
- assertThat(adder.endSum("foo")).isEqualTo(expectedResult);
+ AdderResult result = adder.calculate("foo");
+ assertThat(result.number).isEqualTo(numbers[numbers.length-1]);
+ assertThat(result.sum).isEqualTo(expectedResult);
}
static Stream<Arguments> numbersProvider() {
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
+import org.springframework.beans.factory.annotation.Autowired;
+import java.util.*;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.assertj.core.api.Assertions.assertThat;
+
public class ApplicationTests extends GenericApplicationTests<String, String>
{
+ @Autowired
+ AdderResults results;
+
+
public ApplicationTests()
{
- super(
- new RecordGenerator()
+ super(new ApplicationTestRecrodGenerator());
+ ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
+ }
+
+
+ static class ApplicationTestRecrodGenerator implements RecordGenerator
+ {
+ ApplicationTests tests;
+
+ final int[] numbers = {1, 7, 3, 2, 33, 6, 11};
+ final String[] dieWilden13 =
+ IntStream
+ .range(1, 14)
+ .mapToObj(i -> "seeräuber-" + i)
+ .toArray(i -> new String[i]);
+ final StringSerializer stringSerializer = new StringSerializer();
+ final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
+
+ int counter = 0;
+
+ Map<String, List<AdderResult>> state;
+
+ @Override
+ public int generate(
+ boolean poisonPills,
+ boolean logicErrors,
+ Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+ {
+ counter = 0;
+ state =
+ Arrays
+ .stream(dieWilden13)
+ .collect(Collectors.toMap(
+ seeräuber -> seeräuber,
+ seeräuber -> new LinkedList()));
+
+ for (int i = 0; i < 33; i++)
+ {
+ String seeräuber = dieWilden13[i % 13];
+ int number = numbers[i % 7];
+
+ Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
+
+ for (int message = 1; message <= number; message++)
{
- final int[] numbers = { 1, 7, 3, 2, 33, 6, 11 };
- final String[] dieWilden13 =
- IntStream
- .range(1,14)
- .mapToObj(i -> "seeräuber-" + i)
- .toArray(i -> new String[i]);
- final StringSerializer stringSerializer = new StringSerializer();
- final Bytes startMessage = new Bytes(stringSerializer.serialize(TOPIC, "START"));
- final Bytes endMessage = new Bytes(stringSerializer.serialize(TOPIC, "END"));
-
- int counter = 0;
-
-
- @Override
- public int generate(
- boolean poisonPills,
- boolean logicErrors,
- Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
- {
- counter = 0;
-
- for (int i = 0; i < 33; i++)
- {
- String seeräuber = dieWilden13[i%13];
- int number = numbers[i%7];
-
- Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
-
- send(key, startMessage, logicErrors, messageSender);
- for (int message = 1; message <= number; message++)
- {
- Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
- send(key, value, logicErrors, messageSender);
- }
- send(key, endMessage, logicErrors, messageSender);
- }
-
- return counter;
- }
-
- void send(
- Bytes key,
- Bytes value,
- boolean logicErrors,
- Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
- {
- counter++;
+ Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
+ send(key, value, logicErrors, messageSender);
+ }
+ send(key, calculateMessage, logicErrors, messageSender);
+
+ state.get(seeräuber).add(new AdderResult(number, (number + 1) * number / 2));
+ }
- if (counter == 77)
- {
- if (logicErrors)
- {
- value = value.equals(startMessage) ? endMessage : startMessage;
- }
- }
+ return counter;
+ }
- messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
- }
+ void send(
+ Bytes key,
+ Bytes value,
+ boolean logicErrors,
+ Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+ {
+ counter++;
- @Override
- public boolean canGeneratePoisonPill()
+ if (counter == 77)
+ {
+ if (logicErrors)
+ {
+ value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
+ }
+ }
+
+ messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
+ }
+
+ @Override
+ public boolean canGeneratePoisonPill()
+ {
+ return false;
+ }
+
+ @Override
+ public void assertBusinessLogic()
+ {
+ tests.results
+ .getState()
+ .values()
+ .stream()
+ .flatMap(map -> map.entrySet().stream())
+ .forEach(entry ->
{
- return false;
- }
- });
+ String user = entry.getKey();
+ List<AdderResult> resultsForUser = entry.getValue();
+
+ assertThat(state.get(user))
+ .describedAs("Unexpected results for user {}", user)
+ .containsExactlyElementsOf(resultsForUser);
+ });
+ }
}
}