package de.juplo.kafka;
import de.juplo.test.MessageAdd;
+import de.juplo.test.MessageCalc;
+import de.juplo.test.MessageInvalid;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.RecordsToDelete;
send(key, MessageAdd.builder().key(key).next(value).build());
}
+ @Test
+ public void testMessageCalc()
+ {
+ String key = "foo";
+ send(key, MessageCalc.builder().key(key).build());
+ }
+
+ @Test
+ public void testMessageInvalid()
+ {
+ String key = "foo";
+ send(key, MessageInvalid.builder().key(key).build());
+ await("Application is healthy")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> mockMvc
+ .perform(get("/actuator/health"))
+ .andExpect(status().isOk())
+ .andExpect(jsonPath("status").value("UP")));
+ }
+
final long[] currentOffsets = new long[PARTITIONS];
@Autowired