X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=e01fdd1baf25b1caed04dda5be26cadb9ee75383;hb=66ff7d205e66616de8aaca94503dbbcd7d281f6d;hp=bd9f449e0fa69cb00ee8fa43a4a369b059160b09;hpb=caed9441a9303af071a572405ae4a665d60faae7;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index bd9f449..e01fdd1 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -41,17 +41,22 @@ public class ApplicationTests extends GenericApplicationTests final StringSerializer stringSerializer = new StringSerializer(); final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}")); - int counter = 0; + int counterMessages; + int counterPoisonPills; + int counterLogicErrors; Map> state; @Override - public int generate( + public void generate( boolean poisonPills, boolean logicErrors, Consumer> messageSender) { - counter = 0; + counterMessages = 0; + counterPoisonPills = 0; + counterLogicErrors = 0; + state = Arrays .stream(dieWilden13) @@ -76,8 +81,8 @@ public class ApplicationTests extends GenericApplicationTests key, calculateMessage, Message.Type.CALC, - poisonPill(poisonPills, pass, counter), - logicError(logicErrors, pass, counter), + poisonPill(poisonPills, pass, counterMessages), + logicError(logicErrors, pass, counterMessages), messageSender); state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2)); // Pick next number to calculate @@ -90,13 +95,29 @@ public class ApplicationTests extends GenericApplicationTests key, new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")), Message.Type.ADD, - poisonPill(poisonPills, pass, counter), - logicError(logicErrors, pass, counter), + poisonPill(poisonPills, pass, counterMessages), + logicError(logicErrors, pass, counterMessages), messageSender); } } + } + + @Override + public int getNumberOfMessages() + { + return counterMessages; + } - return counter; + @Override + public int getNumberOfPoisonPills() + { + return counterPoisonPills; + } + + @Override + public int getNumberOfLogicErrors() + { + return counterLogicErrors; } boolean poisonPill (boolean poisonPills, int pass, int counter) @@ -117,15 +138,17 @@ public class ApplicationTests extends GenericApplicationTests boolean logicError, Consumer> messageSender) { - counter++; + counterMessages++; if (logicError) { value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}")); + counterLogicErrors++; } if (poisonPill) { value = new Bytes("BOOM!".getBytes()); + counterPoisonPills++; } ProducerRecord record = new ProducerRecord<>(TOPIC, key, value);