projects
/
demos
/
kafka
/
training
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Service ergänzt, der das Dead-Letter-Topic ausliest
[demos/kafka/training]
/
src
/
test
/
java
/
de
/
juplo
/
kafka
/
ApplicationTests.java
diff --git
a/src/test/java/de/juplo/kafka/ApplicationTests.java
b/src/test/java/de/juplo/kafka/ApplicationTests.java
index
bd9f449
..
e01fdd1
100644
(file)
--- a/
src/test/java/de/juplo/kafka/ApplicationTests.java
+++ b/
src/test/java/de/juplo/kafka/ApplicationTests.java
@@
-41,17
+41,22
@@
public class ApplicationTests extends GenericApplicationTests<String, Message>
final StringSerializer stringSerializer = new StringSerializer();
final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
final StringSerializer stringSerializer = new StringSerializer();
final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
- int counter = 0;
+ int counterMessages;
+ int counterPoisonPills;
+ int counterLogicErrors;
Map<String, List<AdderResult>> state;
@Override
Map<String, List<AdderResult>> state;
@Override
- public
int
generate(
+ public
void
generate(
boolean poisonPills,
boolean logicErrors,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
{
boolean poisonPills,
boolean logicErrors,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
{
- counter = 0;
+ counterMessages = 0;
+ counterPoisonPills = 0;
+ counterLogicErrors = 0;
+
state =
Arrays
.stream(dieWilden13)
state =
Arrays
.stream(dieWilden13)
@@
-76,8
+81,8
@@
public class ApplicationTests extends GenericApplicationTests<String, Message>
key,
calculateMessage,
Message.Type.CALC,
key,
calculateMessage,
Message.Type.CALC,
- poisonPill(poisonPills, pass, counter),
- logicError(logicErrors, pass, counter),
+ poisonPill(poisonPills, pass, counter
Messages
),
+ logicError(logicErrors, pass, counter
Messages
),
messageSender);
state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
// Pick next number to calculate
messageSender);
state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
// Pick next number to calculate
@@
-90,13
+95,29
@@
public class ApplicationTests extends GenericApplicationTests<String, Message>
key,
new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
Message.Type.ADD,
key,
new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
Message.Type.ADD,
- poisonPill(poisonPills, pass, counter),
- logicError(logicErrors, pass, counter),
+ poisonPill(poisonPills, pass, counter
Messages
),
+ logicError(logicErrors, pass, counter
Messages
),
messageSender);
}
}
messageSender);
}
}
+ }
+
+ @Override
+ public int getNumberOfMessages()
+ {
+ return counterMessages;
+ }
- return counter;
+ @Override
+ public int getNumberOfPoisonPills()
+ {
+ return counterPoisonPills;
+ }
+
+ @Override
+ public int getNumberOfLogicErrors()
+ {
+ return counterLogicErrors;
}
boolean poisonPill (boolean poisonPills, int pass, int counter)
}
boolean poisonPill (boolean poisonPills, int pass, int counter)
@@
-117,15
+138,17
@@
public class ApplicationTests extends GenericApplicationTests<String, Message>
boolean logicError,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
{
boolean logicError,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
{
- counter++;
+ counter
Messages
++;
if (logicError)
{
value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
if (logicError)
{
value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
+ counterLogicErrors++;
}
if (poisonPill)
{
value = new Bytes("BOOM!".getBytes());
}
if (poisonPill)
{
value = new Bytes("BOOM!".getBytes());
+ counterPoisonPills++;
}
ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
}
ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);