import { Injectable } from '@angular/core';
import { AblyService } from '@shared/ably';
import { Types } from 'ably';
import { plainToInstance } from 'class-transformer';
import { castArray } from 'lodash';
import { finalize, Observable, Subject, switchMap, take } from 'rxjs';
import { DriverEventData, RealtimeEvent } from './models';
import { RealtimeChannelOptions } from './types';

@Injectable()
export class RealtimeService {
  public get connectionKey$(): Observable<string | undefined> {
    return this.ablyService.connectionKey$;
  }

  private decoder: TextDecoder;

  constructor(
    private ablyService: AblyService
  ) {
    this.decoder = new TextDecoder();
  }

  public clientEvents$(params: { clientID: number }): Observable<RealtimeEvent<DriverEventData>> {
    return this.getChannelEventsSource({
      channel: `clients/${params.clientID}`,
      options: { params: { rewind: 1 as any }, useDataToObjectConverter: true },
      onMessage: (message) => {
        const data = plainToInstance(DriverEventData, message.data);

        return { data, name: undefined };
      }
    });
  }

  public orderEvents$(params: { orderID: number }): Observable<RealtimeEvent<DriverEventData>> {
    return this.getChannelEventsSource({
      channel: `orders/${params.orderID}`,
      options: { params: { rewind: 1 as any }, useDataToObjectConverter: true },
      onMessage: (message) => {
        const data = plainToInstance(DriverEventData, message.data);

        return { data, name: undefined };
      }
    });
  }

  public getChannelEventsSource<T>({ channel, options, onMessage }: {
    channel: string;
    options?: RealtimeChannelOptions;
    onMessage: (message: Types.Message) => T;
  }): Observable<T> {
    return this.ablyService.server$
      .pipe(
        take(1),
        switchMap((server) => {
          const result$ = new Subject<T>();

          server.channels
            .get(channel, options)
            .subscribe((_message) => {
              if (!this.shouldProcessMessage(_message, options)) {
                return;
              }

              const message: Types.Message = (options?.useDataToObjectConverter)
                ? {
                  ..._message,
                  data: this.convertDataToObject(_message.data)
                }
                : _message;
              const convertedMessage = onMessage(message);

              result$.next(convertedMessage);
            });

          return result$.pipe(
            finalize(() => {
              server.channels.get(channel).unsubscribe();
              server.channels.get(channel).detach();
            })
          );
        })
      );
  }

  private convertDataToObject(data: ArrayBuffer): object {
    return JSON.parse(this.decoder.decode(data));
  }

  private shouldProcessMessage(message: Types.Message, options: RealtimeChannelOptions | undefined): boolean {
    return !options || !('messageName' in options) || castArray(options.messageName).includes(message.name);
  }
}
