From: Kai Moritz <kai@juplo.de>
Date: Tue, 26 Jul 2022 10:52:23 +0000 (+0200)
Subject: Refaktorisierungen des Testfalls gemerged (Branch 'deserialization')
X-Git-Tag: sumup-adder---lvm-2-tage~9^2~7^2~3
X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=536c9c5320e52101bda5e1931bc84c5e0e0137f2;p=demos%2Fkafka%2Ftraining

Refaktorisierungen des Testfalls gemerged (Branch 'deserialization')
---

536c9c5320e52101bda5e1931bc84c5e0e0137f2
diff --cc src/test/java/de/juplo/kafka/ApplicationTests.java
index 6c25bcd,05eebd0..62906b3
--- a/src/test/java/de/juplo/kafka/ApplicationTests.java
+++ b/src/test/java/de/juplo/kafka/ApplicationTests.java
@@@ -78,7 -77,11 +78,11 @@@ class ApplicationTest
  	@Order(1) // << The poistion pill is not skipped. Hence, this test must run first
  	void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException
  	{
- 		send100Messages((key, counter) -> serialize(key, counter));
+ 		send100Messages((partition, key, counter) ->
+ 		{
 -			Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter));
++			Bytes value = serialize(key, counter);
+ 			return new ProducerRecord<>(TOPIC, partition, key, value);
+ 		});
  
  		await("100 records received")
  				.atMost(Duration.ofSeconds(30))
@@@ -101,10 -104,13 +105,13 @@@
  	@Order(2)
  	void commitsOffsetOfErrorForReprocessingOnError()
  	{
- 		send100Messages((key, counter) ->
- 				counter == 77
- 						? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
- 						: serialize(key, counter));
+ 		send100Messages((partition, key, counter) ->
+ 		{
+ 			Bytes value = counter == 77
+ 					? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
 -					: new Bytes(valueSerializer.serialize(TOPIC, counter));
++					: serialize(key, counter);
+ 			return new ProducerRecord<>(TOPIC, partition, key, value);
+ 		});
  
  		await("Consumer failed")
  				.atMost(Duration.ofSeconds(30))
@@@ -194,16 -205,9 +206,10 @@@
  		{
  			for (int key = 0; key < 10; key++)
  			{
- 				Bytes value = messageGenerator.apply(key, ++i);
- 
  				ProducerRecord<String, Bytes> record =
- 						new ProducerRecord<>(
- 								TOPIC,
- 								partition,
- 								Integer.toString(key%2),
- 								value);
+ 						recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i);
  
 +				record.headers().add("__TypeId__", "message".getBytes());
  				kafkaProducer.send(record, (metadata, e) ->
  				{
  					if (metadata != null)
@@@ -228,14 -232,6 +234,14 @@@
  		}
  	}
  
- 	Bytes serialize(Integer key, Long value)
++	Bytes serialize(String key, Long value)
 +	{
 +		ClientMessage message = new ClientMessage();
- 		message.setClient(key.toString());
++		message.setClient(key);
 +		message.setMessage(value.toString());
 +		return new Bytes(valueSerializer.serialize(TOPIC, message));
 +	}
 +
  
  	@BeforeEach
  	public void init()