import { Inject, Injectable, OnDestroy } from '@angular/core';
import {
  BehaviorSubject,
  concat,
  EMPTY,
  firstValueFrom,
  from,
  fromEvent,
  interval,
  merge,
  Observable,
  of,
  race,
  Subject,
  timer
} from 'rxjs';
import { catchError, filter, first, map, retry, switchMap, take, takeUntil, takeWhile } from 'rxjs/operators';
import { REQUIRE_AUTHENTICATION } from '../../../auth/authentication/auth.service';
import { PRODUCTION_MODE } from '../../common/production-mode';
import { conversationHeader } from '../../../api-private/conversation-interceptor.service';
import { AsyncMessage, AsyncMessagingService, MessageType } from '../async-messaging.service';
import { TokenService } from '../../../auth/authentication/token.service';
import { RxStomp, RxStompState } from '@stomp/rx-stomp';
import { ObservableProvider, SubjectProvider } from '../../../d3-graph/d3-graph/common';
import { SINGLETON } from '../../common/on-destroy.mixin';
import { IMessage } from '@stomp/stompjs';
import { getRetryConfig } from '../../common/rxjs-utils';
import { get, has, isArray, isNumber } from 'lodash-es';

const DESIRED_CONNECTION_STATE_TIMEOUT_MS = 5000;

@Injectable()
export class FlexWebsocketService implements OnDestroy, AsyncMessagingService {
  get isConnected(): boolean {
    return this.isConnectedSubjectProvider.value;
  }

  constructor(
    private tokenService: TokenService,
    @Inject(REQUIRE_AUTHENTICATION) private requireAuthentication: boolean,
    @Inject(PRODUCTION_MODE) private isProduction: boolean,
    private rxStomp: RxStomp
  ) {
    this.init();
  }

  static AUTH_HEADER = 'X-Authorization';
  static XSRF_TOKEN_COOKIE_NAME = 'XSRF-TOKEN';
  static XSRF_TOKEN_HEADER = 'X-XSRF-TOKEN';

  private desiredConnectionStateProvider = new SubjectProvider(SINGLETON);
  private brokerURI = `ws${window.location.protocol.startsWith('https') ? 's' : ''}://${window.location.host}/events`;
  private onDestroy$ = new Subject<void>();

  private isConnectedSubjectProvider = new SubjectProvider(SINGLETON, new BehaviorSubject(false));

  isConnected$ = this.isConnectedSubjectProvider.value$;

  private lastEventId = null;

  private eventsProvider = new SubjectProvider<IMessage>(SINGLETON, new Subject());
  private messagesProvider = new ObservableProvider<AsyncMessage>(
    SINGLETON,
    this.eventsProvider.value$.pipe(
      map((event) => JSON.parse(event.body)),
      switchMap((result) => {
        if (isArray(result)) {
          return of(...result);
        }
        return of(result);
      })
    )
  );
  private xsrfTokenProvider = new SubjectProvider(SINGLETON, new BehaviorSubject(this.getXSRFTokenFromCookies()));
  xsrfToken$ = this.xsrfTokenProvider.value$.pipe(filter((a) => !has(window, 'cookieStore') || !!a));
  onMessage$ = this.messagesProvider.value$;

  private errorsProvider = new SubjectProvider<IMessage>(SINGLETON, new Subject());

  private static getDestinationEndpoint(destination: string): string {
    if (destination.startsWith('/')) {
      return destination.slice(1);
    }
    return destination;
  }

  /**
   * Filter messages to only include messages of the type we're interested in
   */
  onMessageOfType<AsyncM>(...messageTypes: MessageType[]): Observable<AsyncMessage> {
    return this.onMessage$.pipe(filter((message) => messageTypes.includes(message.type)));
  }

  onConversationMessageOfType(conversationId: string, ...messageTypes: MessageType[]): Observable<AsyncMessage> {
    return this.onMessage$.pipe(
      filter((message) => (!conversationId || message.conversationId === conversationId) && messageTypes.includes(message.type))
    );
  }

  /**
   * Subscribes to httpCall$ and then switches to listening to and emitting messages defined in messageTypes.
   * When resuscitated$ is called it will subscribe to httpCall$ again.
   * Will buffer messages between resuscitated$ and the completion of httpCall$.
   * Afterwards it will switch to emitting messages as defined by messageTypes.
   *
   * Note that you should NOT use retry/retryWhen in the observable that has been provided.
   * onMessageOfTypeWithInit resets the buffer on each retry and this will not work if errors are handled in the supplied httpCall$ observable.
   */
  onMessageOfTypeWithInit(httpCall$: Observable<any>, ...messageTypes: MessageType[]): Observable<AsyncMessage> {
    const httpCallWithRetry$ = merge(of(null), interval(60_000)).pipe(
      // Reset http calle very 60 seconds to handle long timeouts
      switchMap(() => httpCall$),
      retry(getRetryConfig({ excludedStatusCodes: [] })),
      map((result) => this.httpResponseToResponseAsyncMessage(result)),
      first()
    );

    const whenConnected$ = this.isConnected$.pipe(
      filter((isConnected) => isConnected),
      switchMap(() => concat(httpCallWithRetry$, this.onMessageOfType(...messageTypes)))
    );

    return race(whenConnected$, timer(5000).pipe(switchMap(() => concat(httpCallWithRetry$, whenConnected$))));
  }

  /**
   * Default behavior for http requests that will result in one or more event being emitted after the request is completed.
   * It will not listen for events that are emitted DURING the request, only after.
   * @param request$ The observable responsible for initiating the request, should use {observe: "response"}
   * @param messageTypes MessageTypes to filter on
   * @param numberOfEvents Amount of events to listen to. Does not include Response event.
   */
  handleSubscriptionRequest(
    request$: Observable<any>,
    messageTypes: ReadonlyArray<MessageType>,
    numberOfEvents: number = 1
  ): Observable<AsyncMessage> {
    return request$.pipe(
      map((response) => this.httpResponseToResponseAsyncMessage(response)),
      switchMap((responseMessage) =>
        concat(
          of(responseMessage),
          this.onConversationMessageOfType(responseMessage.conversationId, ...messageTypes).pipe(take(numberOfEvents))
        )
      ),
      catchError((errorResponse) => of(this.httpErrorResponseToFailureResponse(errorResponse)))
    );
  }

  ngOnDestroy(): void {
    this.isConnectedSubjectProvider.next(false);
    this.onDestroy$.next();
    this.onDestroy$.complete();
  }

  private init(): void {
    this.onMessage$.subscribe((event) => {
      this.lastEventId = event.identifier || this.lastEventId;
    });

    if (has(window, 'cookieStore')) {
      fromEvent(get(window, 'cookieStore') as any, 'change').subscribe((event) => {
        const newCookieValue = this.getXSRFTokenFromCookies();
        // Fired when any cookie changes
        if (this.xsrfTokenProvider.value !== newCookieValue) {
          this.xsrfTokenProvider.next(newCookieValue);
        }
      });
    }

    const idToken$ = this.requireAuthentication ? this.tokenService.validIdToken$ : of(null);

    this.isConnectedSubjectProvider.follow(this.rxStomp.connectionState$.pipe(map((state) => state === RxStompState.OPEN)));

    this.rxStomp.connectionState$.subscribe((stompState) => {
      if (this.lastEventId && stompState === RxStompState.OPEN) {
        this.getMissedEvents();
      }
    });

    this.eventsProvider.follow(merge(this.rxStomp.watch('/user/queue/missed-events'), this.rxStomp.watch('/topic/events')));
    this.errorsProvider.follow(this.rxStomp.watch('/user/queue/errors'));

    this.errorsProvider.value$.subscribe((error) => {
      console.warn('got websocket error: ', error);
    });

    this.getMissedEvents();

    idToken$.subscribe(() => {
      if (!this.isConnected) {
        // Try reconnecting
        this.activateWhenPossible();
      }
    });

    this.rxStomp.configure({
      brokerURL: this.brokerURI,
      heartbeatOutgoing: 10000,
      heartbeatIncoming: 10000,
      beforeConnect: async (client) => {
        if (!this.requireAuthentication) {
          return;
        }

        const idToken = await firstValueFrom(idToken$);

        if (!this.tokenService.isValid(idToken.originalToken)) {
          // If the token that is being used to connect is invalid, deactivate the client before the connect attempt and trigger refresh.
          this.tokenService.refreshTokenIfNotAvailable();

          // Deactivate immediately to cancel the connection
          return this.deactivateWhenPossible(true);
        }

        const connectHeaders = this.createHeaders(idToken.originalToken);

        client.configure({
          connectHeaders: {
            ...connectHeaders
          }
        });
      }
    });

    this.activateWhenPossible();

    // Waits until the rxStomp client is in an appropriate state when a state change is requested
    this.desiredConnectionStateProvider.value$
      .pipe(
        switchMap((desiredState) => {
          return this.rxStomp.connectionState$.pipe(
            filter((connectionState) => {
              const canClose = desiredState === RxStompState.CLOSED && connectionState === RxStompState.OPEN;
              const canOpen = desiredState === RxStompState.OPEN && connectionState === RxStompState.CLOSED;

              return canClose || canOpen;
            }),
            map(() => desiredState),
            take(1),
            takeUntil(interval(DESIRED_CONNECTION_STATE_TIMEOUT_MS))
          );
        }),
        switchMap((desiredConnectionState) => {
          if (desiredConnectionState === RxStompState.OPEN) {
            this.rxStomp.activate();

            return EMPTY;
          } else if (desiredConnectionState === RxStompState.CLOSED) {
            return from(this.rxStomp.deactivate());
          }
        })
      )
      .subscribe(() => {});
  }

  activateWhenPossible(): void {
    this.desiredConnectionStateProvider.next(RxStompState.OPEN);
  }

  deactivateWhenPossible(deactivateImmediately: boolean = false): Promise<void> {
    if (deactivateImmediately) {
      // Used when deactivateWhenPossible is called within beforeConnect, since it is valid to call deactivate in this callback according to the docs
      // see: https://stomp-js.github.io/api-docs/latest/classes/Client.html#beforeConnect
      this.rxStomp.deactivate();
    }

    this.desiredConnectionStateProvider.next(RxStompState.CLOSED);
    return firstValueFrom(
      this.rxStomp.connectionState$.pipe(
        filter((connectionState) => connectionState === RxStompState.CLOSED),
        map(() => undefined)
      )
    );
  }

  subscribeToResource<T>(destination: string): Observable<T> {
    return merge(this.subscribeToResourceUpdates<T>(destination), this.subscribeToResourceInit<T>(destination));
  }

  subscribeToResourceUpdates<T>(destination: string): Observable<T> {
    destination = FlexWebsocketService.getDestinationEndpoint(destination);

    return this.rxStomp.watch(`/topic/${destination}`).pipe(map((result) => result.body && JSON.parse(JSON.parse(result.body).payload)));
  }

  subscribeToResourceInit<T>(destination: string): Observable<T> {
    destination = FlexWebsocketService.getDestinationEndpoint(destination);

    return merge(this.rxStomp.watch(`/user/queue/${destination}`), this.rxStomp.watch(`/app/${destination}`)).pipe(
      map((result) => result.body && JSON.parse(result.body))
    );
  }

  private createHeaders(idToken: string): any {
    const headers = {};
    if (this.requireAuthentication) {
      headers[FlexWebsocketService.AUTH_HEADER] = `${idToken}`;
      const xsrfToken = this.getXSRFTokenFromCookies();
      if (xsrfToken) {
        headers[FlexWebsocketService.XSRF_TOKEN_HEADER] = xsrfToken;
      }
    }
    return headers;
  }

  private httpResponseToResponseAsyncMessage(response: any): AsyncMessage {
    return new AsyncMessage(response?.headers?.get(conversationHeader), response?.body, MessageType.Response);
  }

  private httpErrorResponseToFailureResponse(errorResponse: any): AsyncMessage {
    return new AsyncMessage(errorResponse?.headers?.get(conversationHeader), errorResponse, MessageType.FailureResponse);
  }

  private getXSRFTokenFromCookies(): string {
    const splitCookies = document.cookie.split(';').map((a) => a.trim().split('='));
    const foundPair = splitCookies.find((value) => value[0] === FlexWebsocketService.XSRF_TOKEN_COOKIE_NAME);

    if (foundPair) {
      return foundPair[1];
    }
  }

  alreadySubscribed = false;
  private getMissedEvents() {
    if (!this.alreadySubscribed) {
      this.alreadySubscribed = true;
      this.onMessageOfType(MessageType.LastKnownEvent)
        .pipe(
          switchMap((result) => merge(of(result), this.onMessageOfType(MessageType.LastKnownEvent))),
          takeWhile((result) => !isNumber(result))
        )
        .subscribe({
          next: (message) => {
            this.lastEventId = JSON.parse(message.payload).lastKnownEvent;
          },
          complete: () => (this.alreadySubscribed = false)
        });
    }

    this.rxStomp.publish({
      destination: '/app/missed-events',
      body: this.lastEventId
    });
  }
}
