import { webSocket, WebSocketSubject } from 'rxjs/webSocket';

import { Observable, timer, Subject, EMPTY, Subscription } from 'rxjs';
import { retryWhen, tap, delayWhen, switchAll, catchError } from 'rxjs/operators';

export const RECONNECT_INTERVAL = 10000;

export class AppWebSocket {
  private socket$: WebSocketSubject<any>;
  private socketSubscription: Subscription;

  private messagesSubject$ = new Subject();

  public get messages$(): Observable<any> {
    return this.messagesSubject$.asObservable();
  }

  constructor(private ws_endpoint: string) {}

  public connect(cfg: { reconnect: boolean } = { reconnect: false }): () => void {
    if (!this.socket$ || this.socket$.closed) {
      this.socket$ = null;

      this.closeSubscription();

      this.socket$ = this.getNewWebSocket(this.ws_endpoint);
      this.socketSubscription = this.socket$
        .pipe(
          cfg.reconnect ? this.reconnect : (o) => o,
          tap({
            error: (error) => console.log(error),
          }),
          catchError((_) => EMPTY),
        )
        .subscribe((message) => {
          this.messagesSubject$.next(message);
        });
      // toDO only next an observable if a new subscription was made double-check this
      //            this.messagesSubject$.next(messages);
    }

    return () => {
      this.close();
    };
  }

  private reconnect(observable: Observable<any>): Observable<any> {
    return observable.pipe(
      retryWhen((errors) =>
        errors.pipe(
          tap((val) => console.log('[AppWebSocket] Try to reconnect', val)),
          delayWhen((_) => timer(RECONNECT_INTERVAL)),
        ),
      ),
    );
  }

  close() {
    this.closeSubscription();
    this.socket$?.complete();
    this.socket$ = undefined;
  }

  sendMessage(msg: any) {
    this.socket$.next(msg);
  }

  private closeSubscription() {
    if (!!this.socketSubscription) {
      this.socketSubscription.unsubscribe();
      this.socketSubscription = null;
    }
  }

  private getNewWebSocket(ws_endpoint: string): WebSocketSubject<any> {
    return webSocket({
      url: ws_endpoint,
      openObserver: {
        next: () => {
          console.log('[AppWebSocket]: connection ok');
        },
      },
      closeObserver: {
        next: () => {
          console.log('[AppWebSocket]: connection closed');
          if (!!this.socket$) {
            this.socket$ = undefined;
            this.connect({ reconnect: true });
          }
        },
      },
    });
  }
}
