import { Injectable, NgZone } from '@angular/core';
import * as SignalR from '@microsoft/signalr';
import * as msgpack from '@microsoft/signalr-protocol-msgpack';
import { StopfinderApiService } from 'src/app/shared/stopfinder/stopfinder-api.service';
import { LocalStorageService } from '../../../shared/local-storage/local-storage.service';
import { BehaviorSubject, Observable, ReplaySubject, Subject, defer, of, throwError, timer } from 'rxjs';
import { catchError, delayWhen, finalize, map, retryWhen, switchMap, take, takeUntil, tap } from 'rxjs/operators';

export enum ConnectionState
{
  Connected,
  Reconnecting,
  Disconnected,
}

@Injectable()
export class SignalRService
{
  private baseUrl: string;
  private connectionState: { [key in string]: BehaviorSubject<ConnectionState> } = {};
  private connectionSubject: { [key in string]: ReplaySubject<SignalR.HubConnection> } = {};
  private connectionObservables: { [key in string]: Observable<SignalR.HubConnection> } = {};
  private connectionRetry: { [key: string]: number } = {};
  private connectionId: { [key in symbol]: string } = {};
  private close: { [key in string]: Subject<void> } = {};

  constructor(private _apiService: StopfinderApiService,
    private readonly _localStorageService: LocalStorageService)
  {
  }

  get(key: string, endpoint: string, name: string, mark?: {}): Observable<SignalR.HubConnection>
  {
    if (!endpoint || !name || !key)
    {
      return throwError("SignalR key, url, method name is null!");
    }
    // get existing connection
    if (!!this.connectionObservables[key])
    {
      return this.connectionObservables[key];
    }
    // no existing connection so start one
    const connection = this.build(key, endpoint);
    this.start(key, connection).pipe(take(1)).subscribe();
    return this.connectionObservables[key];
  }

  /**
   * start the SignalR service
   * @param key SignalR id
   * @param endpoint end point
   * @param token if need token
   * @param name  hub method name
   * @param receivedCallback received call back, this callback handle new messages received
   * @param reconnectedCallback
   * @param closeCallback
   */
  build(
    key: string,
    endpoint: string,
  ): SignalR.HubConnection
  {

    this.baseUrl = this._apiService.getStopFinderBaseUri();
    const uri = `${this.baseUrl}${endpoint}`;

    const connection = new SignalR.HubConnectionBuilder()
      .withUrl(uri, {
        skipNegotiation: true,
        transport: SignalR.HttpTransportType.WebSockets,
        accessTokenFactory: () =>
        {
          return this._localStorageService.get('opaqueToken');
        }
      })
      .withHubProtocol(new msgpack.MessagePackHubProtocol())
      .configureLogging(SignalR.LogLevel.Information)
      .withAutomaticReconnect({
        nextRetryDelayInMilliseconds: retryContext =>
        {
          if (retryContext.elapsedMilliseconds <= 30 * 60 * 1000)
          {
            // wait between 10 seconds before the next reconnect attempt.
            return 10 * 1000;
          } else
          {
            // stop reconnect after 0.5 hour
            return null;
          }
        }
      })
      .build();
    connection.serverTimeoutInMilliseconds = 60 * 1000;
    connection.keepAliveIntervalInMilliseconds = 10 * 1000;
    this.close[key] = new Subject<void>();
    this.connectionSubject[key] = new ReplaySubject<SignalR.HubConnection>();
    this.connectionObservables[key] = this.connectionSubject[key].pipe(
      takeUntil(this.close[key])
    );
    return connection;
  }

  /**
   * send data to the server
   * @param name hub method name
   * @param data params data
   */
  send(key: string, name: string, data: any): Observable<any>
  {
    if (!this.connectionObservables[key])
    {
      return of(null);
    }
    return this.connectionObservables[key].pipe(
      switchMap((connection) =>
      {
        return defer(() => connection.send(name, data))
      }),
      catchError(() => of(null))
    );
  }

  invoke(key: string, name: string, data: any): Observable<any>
  {
    if (!this.connectionObservables[key])
    {
      return of(null);
    }
    return this.connectionObservables[key].pipe(
      switchMap((connection) =>
      {
        return defer(() => connection.invoke(name, data))
      }),
      catchError(() => of(null))
    );
  }

  /**
   * stop the SignalR services
   */
  stop(key: string): Observable<any>
  {
    if (!this.connectionObservables[key])
    {
      return of(null);
    }
    return this.connectionObservables[key].pipe(
      switchMap((connection) =>
      {
        return defer(() => connection.stop()).pipe(
          tap(() =>
          {
            delete this.connectionId[key];
            delete this.connectionSubject[key];
            delete this.connectionObservables[key];
            this.close[key].next();
            delete this.close[key];
          })
        )
      }),
      catchError(() => of(null))
    );
  }

  public closeAllConnections(): void
  {
    if (!this.connectionSubject || !Object.keys(this.connectionSubject).length)
    {
      return;
    }

    Object.keys(this.connectionSubject).forEach((key) =>
    {
      this.stop(key).pipe(take(1)).subscribe();
    })
  }

  getConnectionState(key: string): Observable<ConnectionState>
  {
    return this.connectionState[key].asObservable();
  }

  getClose(key): Observable<void>
  {
    return this.close[key].asObservable();
  }

  private start(key: string, connection: SignalR.HubConnection): Observable<SignalR.HubConnection>
  {
    return this.onStartLambda(key, connection).pipe(
      retryWhen((error) =>
      {
        return error.pipe(
          map((error) =>
          {
            if (!this.connectionRetry[key])
            {
              this.connectionRetry[key] = 0;
            }
            if (this.connectionRetry[key] > 3)
            {
              this.connectionRetry[key] = 0;
              return throwError(error);
            }
            this.connectionRetry[key]++;
            return of(null);
          }),
          delayWhen(() => timer(5000))
        )
      })
    );
  }

  private onStartLambda(key: string, connection: SignalR.HubConnection): Observable<SignalR.HubConnection>
  {
    const start = defer(() => connection.start());
    const connectionId = defer(() => connection.invoke("getConnectionId")).pipe(
      map((connectionId) =>
      {
        this.connectionId[key] = connectionId;
        this.connectionRetry[key] = 0;
        this.connectionState[key] = new BehaviorSubject<ConnectionState>(
          ConnectionState.Connected
        );
        this.setupListeners(key, connection);
        this.connectionSubject[key].next(connection);
        return connection;
      })
    );

    return start.pipe(
      switchMap(() => connectionId)
    );
  }

  private setupListeners(key: string, hubConnection: SignalR.HubConnection): void
  {
    var state = this.connectionState[key];
    var destroy = this.close[key];
    hubConnection.onreconnected((err) =>
    {
      state.next(ConnectionState.Connected);
    });

    hubConnection.onclose((err) =>
    {
      destroy.next();
      state.next(ConnectionState.Disconnected);
    });

    hubConnection.onreconnecting((err) =>
    {
      state.next(ConnectionState.Reconnecting);
    });
  }
}
