#!/bin/bash
-IMAGE=juplo/endless-consumer:1.0-SNAPSHOT
+IMAGE=juplo/sumup-consumer:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
"$1" = "build"
]]
then
- mvn install || exit
+ mvn install -D skipTests || exit
else
echo "Using image existing images:"
docker image ls $IMAGE
echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
docker-compose up setup
-docker-compose up -d
+docker-compose up -d producer consumer-1
while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done
-while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-2..."; sleep 1; done
-sleep 10
+echo -n START | http -v :8080/peter
+echo -n 1 | http -v :8080/peter
+echo -n 2 | http -v :8080/peter
+echo -n 3 | http -v :8080/peter
+echo -n 4 | http -v :8080/peter
+echo -n 5 | http -v :8080/peter
+echo -n 6 | http -v :8080/peter
+http -v :8081/state
+http -v :8081/state/peter
+echo -n END | http -v :8080/peter
+http -v :8081/state
+http -v :8081/state/peter
+docker-compose logs consumer-1
+docker-compose stop consumer-1
-docker-compose stop bart nerd riddler kraut poet linux
-
-http -v :8081/seen
-http -v :8081/seen/bart
-http -v :8082/seen
-http -v :8082/seen/bart
-
-docker-compose stop consumer-1 consumer-2
image: juplo/toolbox
command: sleep infinity
- bart:
- image: juplo/wordcount--fortune:1.0.0
- command: bash -c "
- while [ true ];
- do
- /usr/games/fortune chalkboard
- | head -1
- | http -v producer:8080/bart;
- echo;
- sleep 1;
- done"
-
- nerd:
- image: juplo/wordcount--fortune:1.0.0
- command: bash -c "
- while [ true ];
- do
- /usr/games/fortune computers
- | grep -v '^[[:space:]]*--'
- | http -v producer:8080/nerd;
- echo;
- sleep 1;
- done"
-
- riddler:
- image: juplo/wordcount--fortune:1.0.0
- command: bash -c "
- while [ true ];
- do
- /usr/games/fortune riddles
- | awk -F':' '/^Q/ { print $$2 }'
- | http -v producer:8080/riddler;
- echo;
- sleep 1;
- sleep 1;
- done"
-
- kraut:
- image: juplo/wordcount--fortune:1.0.0
- command: bash -c "
- while [ true ];
- do
- /usr/games/fortune de
- | http -v producer:8080/kraut;
- echo;
- sleep 1;
- done"
-
- poet:
- image: juplo/wordcount--fortune:1.0.0
- command: bash -c "
- while [ true ];
- do
- /usr/games/fortune songs-poems
- | http -v producer:8080/poet;
- echo;
- sleep 1;
- done"
-
- linux:
- image: juplo/wordcount--fortune:1.0.0
- command: bash -c "
- while [ true ];
- do
- /usr/games/fortune linux
- | http -v producer:8080/linux;
- echo;
- sleep 1;
- done"
-
producer:
image: juplo/rest-producer:1.0-SNAPSHOT
ports:
producer.topic: test
consumer-1:
- image: juplo/wordcount:1.0-SNAPSHOT
+ image: juplo/sumup-consumer:1.0-SNAPSHOT
ports:
- 8081:8080
environment:
spring.data.mongodb.database: juplo
consumer-2:
- image: juplo/wordcount:1.0-SNAPSHOT
+ image: juplo/sumup-consumer:1.0-SNAPSHOT
ports:
- 8082:8080
environment:
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>sum</artifactId>
+ <artifactId>sumup-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
- <name>Sum</name>
+ <name>Summing Up Consumer</name>
<description>Calculates the sum of all natuarl numbers up to the given natural number</description>
<dependencies>
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
@RestController
}
- @GetMapping("seen")
- public Map<Integer, Map<String, List<Long>>> seen()
+ @GetMapping("state")
+ public Map<Integer, Map<String, Long>> state()
{
- return sumRecordHandler.getSeen();
+ return
+ sumRecordHandler
+ .getState()
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ entry -> entry.getKey(),
+ entry -> entry.getValue().getState()));
}
- @GetMapping("seen/{user}")
- public ResponseEntity<List<Long>> seen(@PathVariable String user)
+ @GetMapping("state/{user}")
+ public ResponseEntity<Long> seen(@PathVariable String user)
{
- for (Map<String, List<Long>> users : sumRecordHandler.getSeen().values())
+ for (SumBusinessLogic sumBusinessLogic : sumRecordHandler.getState().values())
{
- List<Long> results = users.get(user);
- if (results != null)
- return ResponseEntity.ok(results);
+ Optional<Long> sum = sumBusinessLogic.getSum(user);
+ if (sum.isPresent())
+ return ResponseEntity.ok(sum.get());
}
return ResponseEntity.notFound().build();
public String id;
public long offset = -1l;
public Map<String, Long> state;
- public Map<String, List<Long>> seen;
public StateDocument()
{
{
this.id = Integer.toString(partition);
this.state = new HashMap<>();
- this.seen = new HashMap<>();
}
public StateDocument(
Integer partition,
Map<String, Long> state,
- Map<String, List<Long>> seen)
+ long offset)
{
this.id = Integer.toString(partition);
this.state = state;
- this.seen = seen;
+ this.offset = offset;
}
}
// Otherwise: Use initial offset, generated by Kafka
consumer.seek(tp, document.offset);
}
- handler.addPartition(partition, document);
+ handler.addPartition(partition, document.state);
});
}
id,
partition,
newOffset);
- repository.save(handler.removePartition(partition));
+ repository.save(new StateDocument(partition, handler.removePartition(partition), newOffset));
});
}
if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
{
log.debug("Storing data and offsets, last commit: {}", lastCommit);
- handler.getSeen().forEach((partiton, statistics) -> repository.save(
+ handler.getState().forEach((partiton, sumBusinessLogic) -> repository.save(
new StateDocument(
partiton,
- statistics,
+ sumBusinessLogic.getState(),
consumer.position(new TopicPartition(topic, partiton)))));
lastCommit = clock.instant();
}
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
@Slf4j
public class SumRecordHandler implements RecordHandler<String, String>
{
- private final Map<Integer, Map<String, List<Long>>> seen = new HashMap<>();
private final Map<Integer, SumBusinessLogic> state = new HashMap<>();
{
case "START":
state.get(partition).startSum(user);
- return;
+ break;
case "END":
Long result = state.get(partition).endSum(user);
log.info("New result for {}: {}", user, result);
- return;
+ break;
default:
state.get(partition).addToSum(user, Integer.parseInt(message));
- return;
+ break;
}
}
- protected void addPartition(Integer partition, StateDocument document)
+ protected void addPartition(Integer partition, Map<String, Long> state)
{
- this.seen.put(partition, document.seen);
- this.state.put(partition, new SumBusinessLogic(document.state));
+ this.state.put(partition, new SumBusinessLogic(state));
}
- protected StateDocument removePartition(Integer partition)
+ protected Map<String, Long> removePartition(Integer partition)
{
- return new StateDocument(
- partition,
- this.state.remove(partition).getState(),
- this.seen.remove(partition));
+ return this.state.remove(partition).getState();
}
- public Map<Integer, Map<String, List<Long>>> getSeen()
+ public Map<Integer, SumBusinessLogic> getState()
{
- return seen;
+ return state;
}
}