import { ofType } from '@ngrx/effects';
import { Action } from '@zerops/fe/core';
import { Observable, Subscription, BehaviorSubject } from 'rxjs';
import { filter, map } from 'rxjs/operators';
import { ActionTypes, Message } from './websockets.action';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export const ofWsType = <R = any>(type: string, actionType?: string) => <T extends Action>(source: Observable<T>) => source.pipe(
  ofType<Message>(actionType || ActionTypes.Message),
  filter((action) => action.payload.type === type),
  map((action) => action.payload.data)
) as Observable<R>;

export interface Connection {
  connectionStatus: Observable<number>;
  messages: Observable<string>;
}

export interface IWebSocket {
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  onopen?: (event: Event) => any;
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  onclose?: (event: CloseEvent) => any;
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  onmessage?: (event: MessageEvent) => any;
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  onerror?: (event: ErrorEvent) => any;
  close();
  send(data: string | ArrayBuffer | Blob);
}

export type WebSocketFactory = (url: string, protocols?: string | string[]) => IWebSocket;

const defaultProtocols = [];

// eslint-disable-next-line max-len
const defaultWebsocketFactory: WebSocketFactory = (url: string, protocols: string | string[] = defaultProtocols): IWebSocket => new WebSocket(url, protocols);

export function connect(
  url: string,
  input: Observable<string>,
  protocols: string | string[] = defaultProtocols,
  websocketFactory: WebSocketFactory = defaultWebsocketFactory
): Connection {
  const connectionStatus = new BehaviorSubject<number>(0);

  const messages = new Observable<string>((observer) => {
    const socket = websocketFactory(url, protocols);
    let inputSubscription: Subscription;

    let open = false;
    let forcedClose = false;

    const closed = () => {
      if (!open) { return; }

      connectionStatus.next(connectionStatus.getValue() - 1);
      open = false;
    };

    socket.onopen = () => {
      open = true;
      connectionStatus.next(connectionStatus.getValue() + 1);
      inputSubscription = input.subscribe((data) => {
        socket.send(data);
      });
    };

    socket.onmessage = (message: MessageEvent) => {
      observer.next(message.data);
    };

    socket.onerror = (error: ErrorEvent) => {
      closed();
      observer.error(error);
    };

    socket.onclose = (event: CloseEvent) => {
      // prevent observer.complete() being called after observer.error(...)
      if (!open) { return; }

      closed();
      if (forcedClose) {
        observer.complete();
      } else {
        observer.error(new Error(event.reason));
      }
    };

    return () => {
      forcedClose = true;
      if (inputSubscription) {
        inputSubscription.unsubscribe();
      }

      if (open) {
        closed();
        socket.close();
      }
    };
  });

  return { messages, connectionStatus };
}
