import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
       UUID chatRoomId,
       String name)
   {
-    CommandCreateChatRoomToData createChatRoomRequestTo = CommandCreateChatRoomToData.of(name);
+    CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
     return Mono.create(sink ->
     {
       ProducerRecord<String, AbstractDataMessageTo> record =
               null,
               zdt.toEpochSecond(),
               chatRoomId.toString(),
-              EventChatDataMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
+              EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
 
       producer.send(record, ((metadata, exception) ->
       {
         case COMMAND_CREATE_CHATROOM:
           createChatRoom(
               chatRoomId,
-              (CommandCreateChatRoomToData) record.value(),
+              (CommandCreateChatRoomTo) record.value(),
               record.partition());
           break;
 
               chatRoomId,
               timestamp,
               record.offset(),
-              (EventChatDataMessageReceivedTo) record.value(),
+              (EventChatMessageReceivedTo) record.value(),
               record.partition());
           break;
 
 
   private void createChatRoom(
       UUID chatRoomId,
-      CommandCreateChatRoomToData createChatRoomRequestTo,
+      CommandCreateChatRoomTo createChatRoomRequestTo,
       Integer partition)
   {
     log.info(
       UUID chatRoomId,
       LocalDateTime timestamp,
       long offset,
-      EventChatDataMessageReceivedTo chatMessageTo,
+      EventChatMessageReceivedTo chatMessageTo,
       int partition)
   {
     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
 
 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 @Slf4j
 public class InfoChannel implements Runnable
 {
-  private final DataChannel dataChannel;
   private final String infoTopic;
   private final String dataTopic;
   private final Producer<String, AbstractDataMessageTo> producer;
   private final Consumer<String, AbstractDataMessageTo> consumer;
-  private final ZoneId zoneId;
-  private final int bufferSize;
-  private final Clock clock;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
 
   private boolean running;
 
 
   public InfoChannel(
-    DataChannel dataChannel,
     String infoTopic,
     String dataTopic,
     Producer<String, AbstractDataMessageTo> producer,
     Consumer<String, AbstractDataMessageTo> consumer,
-    ZoneId zoneId,
-    int bufferSize,
-    Clock clock)
+    ZoneId zoneId)
   {
     log.debug(
         "Creating InfoChannel for topic {}",
         infoTopic);
-    this.dataChannel = dataChannel;
     this.infoTopic = infoTopic;
     this.dataTopic = dataTopic;
     this.consumer = consumer;
     this.producer = producer;
-    this.zoneId = zoneId;
-    this.bufferSize = bufferSize;
-    this.clock = clock;
     this.chatRoomInfo = new HashMap<>();
   }
 
       UUID chatRoomId,
       String name)
   {
-    CommandCreateChatRoomToData createChatRoomRequestTo = CommandCreateChatRoomToData.of(name);
+    CommandCreateChatRoomTo to = CommandCreateChatRoomTo.of(name);
     return Mono.create(sink ->
     {
       ProducerRecord<String, AbstractDataMessageTo> record =
           new ProducerRecord<>(
               dataTopic,
               chatRoomId.toString(),
-              createChatRoomRequestTo);
+              to);
 
       producer.send(record, ((metadata, exception) ->
       {
         if (metadata != null)
         {
-          log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+          log.info("Successfully send chreate-request for chat room: {}", to);
           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
           createChatRoom(chatRoomInfo);
           sink.success(chatRoomInfo);
     });
   }
 
-  Mono<Message> sendChatMessage(
-      UUID chatRoomId,
-      Message.MessageKey key,
-      LocalDateTime timestamp,
-      String text)
-  {
-    ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, AbstractDataMessageTo> record =
-          new ProducerRecord<>(
-              infoTopic,
-              null,
-              zdt.toEpochSecond(),
-              chatRoomId.toString(),
-              EventChatDataMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          // On successful send
-          Message message = new Message(key, metadata.offset(), timestamp, text);
-          log.info("Successfully send message {}", message);
-          sink.success(message);
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
-              chatRoomId,
-              key,
-              timestamp,
-              text,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
-  }
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    log.info("Newly assigned partitions! Pausing normal operations...");
-    loadInProgress = true;
-
-    consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
-    {
-      int partition = topicPartition.partition();
-      isShardOwned[partition] =  true;
-      this.currentOffset[partition] = currentOffset;
-
-      log.info(
-          "Partition assigned: {} - loading messages: next={} -> current={}",
-          partition,
-          nextOffset[partition],
-          currentOffset);
-
-      consumer.seek(topicPartition, nextOffset[partition]);
-    });
-
-    consumer.resume(partitions);
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(topicPartition ->
-    {
-      int partition = topicPartition.partition();
-      isShardOwned[partition] = false;
-      log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
-    });
-  }
-
-  @Override
-  public void onPartitionsLost(Collection<TopicPartition> partitions)
-  {
-    log.warn("Lost partitions: {}, partitions");
-    // TODO: Muss auf den Verlust anders reagiert werden?
-    onPartitionsRevoked(partitions);
-  }
 
   @Override
   public void run()
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractDataMessageTo> records)
+  private void loadChatRoom(ConsumerRecords<String, AbstractInfoMessageTo> records)
   {
-    for (ConsumerRecord<String, AbstractDataMessageTo> record : records)
+    for (ConsumerRecord<String, AbstractInfoMessageTo> record : records)
     {
       UUID chatRoomId = UUID.fromString(record.key());
 
         case COMMAND_CREATE_CHATROOM:
           createChatRoom(
               chatRoomId,
-              (CommandCreateChatRoomToData) record.value(),
+              (CommandCreateChatRoomTo) record.value(),
               record.partition());
           break;
 
               chatRoomId,
               timestamp,
               record.offset(),
-              (EventChatDataMessageReceivedTo) record.value(),
+              (EventChatMessageReceivedTo) record.value(),
               record.partition());
           break;
 
 
   private void createChatRoom(
       UUID chatRoomId,
-      CommandCreateChatRoomToData createChatRoomRequestTo,
+      CommandCreateChatRoomTo createChatRoomRequestTo,
       Integer partition)
   {
     log.info(
       UUID chatRoomId,
       LocalDateTime timestamp,
       long offset,
-      EventChatDataMessageReceivedTo chatMessageTo,
+      EventChatMessageReceivedTo chatMessageTo,
       int partition)
   {
     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());