import {
  FIREBASE_LISTEN_REQUESTED,
  FIREBASE_UNLISTEN_REQUESTED,
  eventTypes,
  metaTypes
} from "../constants";
import {
  setSessionMessagesInitial,
  setSessionMessagesUpdate
} from "./sessions/sessionMessagesSagas";
import {
  setSessionTypingEventsInitial,
  setSessionTypingEventsUpdate
} from "./sessions/sessionTypingEventsSagas";
import { eventChannel, buffers } from "redux-saga";
import { take, call, fork, flush, cancel } from "redux-saga/effects";

export function createEventChannel(id, ref) {
  const listener = eventChannel(emit => {
    ref.on("child_added", snap => {
      emit({
        id: id,
        eventType: eventTypes.CHILD_ADDED,
        key: snap.key,
        value: snap.val()
      });
    });

    ref.on("child_changed", snap => {
      emit({
        id: id,
        snap: snap,
        eventType: eventTypes.CHILD_CHANGED,
        key: snap.key,
        value: snap.val()
      });
    });

    ref.on("child_removed", snap => {
      emit({
        id: id,
        eventType: eventTypes.CHILD_REMOVED,
        key: snap.key
      });
    });
    return () => {
      ref.off();
    };
  }, buffers.expanding(1));
  return listener;
}

export function* getDataAndListenToChannel(ref, id, metaType) {
  const chan = yield call(createEventChannel, id, ref);
  try {
    try {
      const snap = yield call([ref, ref.once], "value");
      yield flush(chan);
      const val = snap.val();
      const value = val ? val : {};
      switch (metaType) {
        case metaTypes.sessionMessages:
          yield call(setSessionMessagesInitial, value, id, snap.key, metaType);
          break;
        case metaTypes.sessionTypingEvents:
          yield call(
            setSessionTypingEventsInitial,
            value,
            id,
            snap.key,
            metaType
          );
          break;
        default:
          return;
      }
    } catch (error) {}
    while (true) {
      const data = yield take(chan);
      switch (metaType) {
        case metaTypes.sessionMessages:
          yield call(setSessionMessagesUpdate, data, metaType);
          break;
        case metaTypes.sessionTypingEvents:
          yield call(setSessionTypingEventsUpdate, data, metaType);
          break;
        default:
          return;
      }
    }
  } finally {
    chan.close();
  }
}

export function* addListener(ref, id, metaType) {
  let task = yield fork(getDataAndListenToChannel, ref, id, metaType);
  while (true) {
    const action = yield take([FIREBASE_UNLISTEN_REQUESTED]);
    if (
      action.meta.type === metaType &&
      action.id !== undefined &&
      action.id === id
    ) {
      yield cancel(task);
    }
  }
}

export function* firebaseWatchListener(metaType) {
  while (true) {
    const listenRequestAction = yield take(FIREBASE_LISTEN_REQUESTED);
    if (listenRequestAction.meta.type === metaType) {
      yield fork(
        addListener,
        listenRequestAction.payload.ref,
        listenRequestAction.payload.id,
        metaType
      );
    }
  }
}
