import throttle from 'lodash/throttle';

import { AllTradeEntity } from '../client/entities';
import { EntityType } from '../client/entityTypes';
import { StreamingService, SubscribeReturnData } from './streaming';

import {
  AbstractTradesService,
  SubscriberData,
  SubscriberHandle,
  TradeInfo,
} from '../../types/TradesService';

export class TradesService extends AbstractTradesService {
  private subscribers: Map<number, Set<SubscriberData>> = new Map();
  private subscriptionHandles: Map<number, SubscribeReturnData> = new Map();

  public subscribeToFI(
    fi: number,
    updater: (trades: TradeInfo[]) => void,
    timeout = 100
  ): SubscriberHandle {
    const subscriberData: SubscriberData = {
      updater,
      cache: [],
      throttledUpdater: () => {},
    };

    subscriberData.throttledUpdater = throttle(() => {
      subscriberData.updater(subscriberData.cache);
      subscriberData.cache = [];
    }, timeout);

    if (!this.subscribers.has(fi)) {
      this.subscribers.set(fi, new Set());

      const subscription = StreamingService.subscribe(
        { entity: EntityType.AllTradeEntity, fi: [fi] },
        (message) => {
          const tradeInfos: TradeInfo[] = [];

          message.data.forEach((obj) => {
            const entity = obj as AllTradeEntity;

            if (entity.IdFI === fi) {
              const tradeInfo: TradeInfo = {
                IdFI: entity.IdFI,
                IdTradeType: entity.IdTradeType,
                IdTradePeriodStatus: entity.IdTradePeriodStatus,
                TradeNo: entity.TradeNo.toString(),
                TradeTime: entity.TradeTime,
                BuySell: entity.BuySell,
                Value: entity.Value,
                Qty: entity.Qty,
                Price: entity.Price,
              };

              tradeInfos.push(tradeInfo);
            }
          });

          if (tradeInfos.length > 0) {
            const subscribers = this.subscribers.get(fi);

            if (subscribers) {
              subscribers.forEach((subscriber) => {
                subscriber.cache.push(...tradeInfos);
                subscriber.throttledUpdater();
              });
            }
          }
        }
      );

      this.subscriptionHandles.set(fi, subscription);
    }

    this.subscribers.get(fi)!.add(subscriberData);

    return { fi, subscriberData };
  }

  public unsubscribe(_: number, subscriberHandle: SubscriberHandle) {
    const { fi, subscriberData } = subscriberHandle;

    const subscribers = this.subscribers.get(fi);

    if (subscribers) {
      subscribers.delete(subscriberData);

      if (subscribers.size === 0) {
        const subscribeReturnData = this.subscriptionHandles.get(fi);

        if (subscribeReturnData) {
          StreamingService.unsubscribe(
            { fi: [fi], entity: EntityType.AllTradeEntity },
            subscribeReturnData
          );
          this.subscriptionHandles.delete(fi);
        }

        this.subscribers.delete(fi);
      }
    }
  }
}
