import { Injectable } from '@angular/core';
import { LocalStorageService } from 'angular-2-local-storage';

import Peer from 'peerjs';
import { Subject, Observable, Subscription, BehaviorSubject, ReplaySubject } from 'rxjs';

import { filter, tap } from 'rxjs/operators';

@Injectable({
  providedIn: 'root'
})
export class PeersService {

  private myId: string;
  private partnerId: string;

  private peer: Peer;

  private connections: (Peer.DataConnection | Peer.MediaConnection)[] = [];

  private dataConnection: Peer.DataConnection;
  private castConnection: Peer.MediaConnection;

  private camConnection$ = new ReplaySubject<Peer.MediaConnection>(1);
  private camConnectionStream$ = new ReplaySubject<MediaStream>(1);
  private castConnectionStream$ = new ReplaySubject<MediaStream>(1);

  private messages$ = new ReplaySubject<any>(5);

  private connected$ = new BehaviorSubject<boolean>(false);
  private incomingCastClosed$ = new Subject<boolean>();


  constructor(private locStor: LocalStorageService) {
    const configuration = this.locStor.get('PEER_CONFIG');

    this.peer = new Peer(this.getOrCreateMyId(),
      {
        host: 'bbs.penkrat.ru',
        port: 9000,
        path: '/',
        debug: 0,
        config: configuration,
      });
    this.myId = this.peer.id;

    this.peer.on('connection', (conn) => this.initializeDataConnection(conn, true));
    this.peer.on('call', (call) => {
      this.initializeMediaConnection(call, true);
    });

    this.peer.on('open', () => { console.log('Connection to the PeerServer is established.'); });
    this.peer.on('close', () => { this.connected$.next(false); });
    this.peer.on('disconnected', () => { console.log('Peer is disconnected from the signalling server'); });
  }

  public getMyId(): string {
    return this.myId;
  }

  public getRemoteId(): string {
    return this.partnerId;
  }

  public getPeer(): Peer {
    return this.peer;
  }

  public isConnected(): Observable<boolean> {
    return this.connected$;
  }

  public disconect() {
    this.dataConnection.close();

    while (this.connections.length) { this.connections.pop().close(); }

    this.connected$.next(false);
  }

  public onIncomingMessage(): Observable<any> {
    return this.messages$.asObservable();
  }
  /**
   * Входящее медиа-соединение с камеры
   */
  public onIncomingCall(): Observable<Peer.MediaConnection> {
    return this.camConnection$
      .pipe(
        // соединение еще не закрыто и не удалено
        filter(connection => this.connections.indexOf(connection) >= 0)
      );
  }

  /**
   * Входящее медиа-соединение с камеры закончено
   */
  public onIncomingCallEnded(): Observable<Peer.MediaConnection> {
    return this.camConnection$
      .pipe(
        // соединение уже закрыто или удалено
        filter(connection => connection == null || this.connections.indexOf(connection) < 0)
      );
  }

  /**
   * Входящий медиапоток камеры
   */
  public onIncomingCallStream(): Observable<MediaStream> {
    return this.camConnectionStream$
      .pipe(
        filter(stream => stream.active)
      );
  }
  /**
   * Входящий медиапоток вещания
   */
  public onIncomingCastStream(): Observable<MediaStream> {
    return this.castConnectionStream$
      .pipe(
        filter(stream => stream.active)
      );
  }
  /**
   * Входящий медиапоток вещания был завершен/прервался
   */
  public onIncomingCastStreamEnded(): Observable<boolean> {
    return this.incomingCastClosed$;
  }

  /**
   * Установка соединения с другой стороной, возвращает true в случае успеха
   *
   *  @param parnerId id второй стороны
   */
  public connect(parnerId: string): boolean {
    if (this.dataConnection != null && this.dataConnection.open) {
      console.error('Already connected');
      return false;
    }
    const conn = this.peer.connect(parnerId);
    if (!conn) {
      console.error('No connection');
      return false;
    }
    this.initializeDataConnection(conn);
    return true;
  }

  /**
   * Ответ на входящий медиапоток своим
   *
   */
  public answer(call: Peer.MediaConnection, stream: MediaStream) {
    call.answer(stream);
  }

  /**
   * Отправка сообщения в чат
   *
   * @param text текст сообщения
   */
  public sendMessage(text: string): any {
    if (!this.dataConnection) {
      console.warn('no connection');
      return;
    }
    const msg = { user: this.myId, msg: text, time: new Date() };
    this.dataConnection.send(msg);
    return msg;
  }

  public sendSysMessage(text: string) {
    if (!this.dataConnection) {
      console.warn('no connection');
      return;
    }
    const msg = { user: '*SYSTEM*', msg: text, time: new Date() };
    this.dataConnection.send(msg);
  }

  /**
   * Звонок
   *
   * @param stream исходящий поток
   */
  public call(stream: MediaStream) {
    const call = this.peer.call(this.partnerId, stream);
    this.initializeMediaConnection(call);
  }
  /**
   * Вещание
   *
   * @param stream исходящий поток
   */
  public cast(stream: MediaStream) {
    if (this.castConnection == null) {
      this.castConnection = this.peer.call(this.partnerId, stream, { metadata: { isCast: true } });
      this.initializeMediaConnection(this.castConnection);
    }
  }

  /**
   * Прекратить Вещание
   */
  public stop() {
    if (this.castConnection != null) {
      this.castConnection.close();
    }
  }

  private initializeDataConnection(conn: Peer.DataConnection, incoming = false) {
    this.connections.push(conn);
    this.partnerId = conn.peer;
    this.dataConnection = conn;

    this.dataConnection.on('open', () => {
      this.connected$.next(true);
      this.sendSysMessage('*connection established*');
    });
    this.dataConnection.on('close', () => {
      this.connected$.next(false);
      this.connections.splice(this.connections.indexOf(this.dataConnection), 1);
      this.dataConnection = null;
      this.logSysMessage('*connection terminated*');
    });
    this.dataConnection.on('data', (data) => {
      this.messages$.next(data);
    });
  }

  private initializeMediaConnection(call: Peer.MediaConnection, incoming = false) {
    if (this.connections.indexOf(call) >= 0) {
      console.warn('initializeMediaConnection called twice');
      return;
    }

    this.connections.push(call);

    const isCast = call.metadata?.isCast || false;

    call.on('open', () => {
      this.logSysMessage('*video connection established*');
    });
    call.on('close', () => {
      this.connections.splice(this.connections.indexOf(call), 1);
      this.logSysMessage('*video connection terminated*');
      if (isCast) {
        this.castConnection = null;
      } else {
        this.camConnection$.next(null);
      }
    });

    if (incoming && !isCast) {
      // уведомляем, чтобы ответить потоком камеры
      this.camConnection$.next(call);
      console.log('IT IS INCOMING WEBCAM STREAM');
      call.on('stream', (remoteStream) => {
        this.logSysMessage('Incoming video transmission...');
        this.camConnectionStream$.next(remoteStream);
      });
    }
    if (incoming && isCast) {
      call.answer(); // принимаем поток, свой не транслируем
      console.log('IT IS INCOMING CAST STREAM');
      call.on('stream', (remoteStream) => {
        this.logSysMessage('Incoming transmission...');
        this.castConnectionStream$.next(remoteStream);
      });
      call.on('error', error => {
        console.error(`[Peers]: ${error}`);
        // проверка, что уже есть другие соединения, + помимо вебки
        if (this.connections.length <= 1) {
          this.incomingCastClosed$.next(true);
        }
      });
      call.on('close', () => {
        console.log('INCOMING CAST STREAM ENDED');
        // проверка, что уже есть другие соединения, + помимо вебки
        if (this.connections.length <= 1) {
          this.incomingCastClosed$.next(true);
        }
      });
    }
    if (!incoming && !isCast) {
      // ждем ответный поток камеры партнера на свой вызов
      call.on('stream', (remoteStream) => {
        this.camConnectionStream$.next(remoteStream);
      });
    }
  }

  private getOrCreateMyId(): string {
    let id = this.locStor.get<string>('my-id');
    if (!id) {
      id = 'USER-' + this.makeid(10);
      this.locStor.set('my-id', id);
    }
    return id;
  }

  private makeid(length: number): string {
    const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
    const charactersLength = characters.length;
    let result = '';
    for (let i = 0; i < length; i++) {
      result += characters.charAt(Math.floor(Math.random() * charactersLength));
    }
    return result;
  }


  /**
   * Добавляет сообщение журнала в чат
   * @param text текст сообщения
   */
  private logSysMessage(text: string) {
    const msg = { user: '*SYSTEM*', msg: text, time: new Date() };
    this.messages$.next(msg);
  }

}
