import { Inject, Injectable, OnDestroy } from "@angular/core";
import * as Atmosphere from 'atmosphere.js';
import { KeycloakService } from 'keycloak-angular';
import { MeService } from "./me.service";
import { APP_ENVIRONMENT } from "@rollit/shared";
import { ReplaySubject, Observable, Subject } from "rxjs";
import { LoggerService } from '../other/logger.service';
import { File } from "../model/file";
import { User } from '../model/user';
import { takeUntil } from "rxjs/operators";


export interface Notification {
  timestamp: string;       // The time of this notification.
  user: User;         // The user the notification is for.
  type: string;       // the type of notification
  message: string;       // The message
  isRead: boolean;      // Whether notification has been read.
  from: User;         // Who initiated the notification.
  file: File;         // an attachment
}

/**
 * A service for receiving messages/notifications from the Rollit web service.
 */
@Injectable()
export class MessageService implements OnDestroy {
  apiUrl = this.environment.apiUrl + "/async/notification/";
  private me: User;
  private request: Atmosphere.Request = {};
  private subSocket: Atmosphere.Request;
  private log: any;
  private messageSubject = new ReplaySubject<Notification>(1);
  private isDestroyed = new Subject();

  constructor(
    @Inject(APP_ENVIRONMENT) private environment: any,
    private userService: MeService,
    private keycloakService: KeycloakService,
    private logger: LoggerService,
  ) {
    this.log = this.logger.info('messageService');
    this.log('Constructing MessageService');
    this.init();
  }

  init() {
    this.userService.me$.pipe(takeUntil(this.isDestroyed)).subscribe(me => {
      if (!this.me || me && this.me.id !== me.id) {      // connect socket only if userId changes
        this.me = me;
        this.listenSocket(me);
      }
    });
  }

  ngOnDestroy() {
    this.isDestroyed.next(true);
    this.isDestroyed.complete();
  }

  get messages$(): Observable<Notification> {
    return this.messageSubject.asObservable();
  }

  /**
   * Start listening for notifications from the web service.
   *
   * @param me The curent user.
   */
  private listenSocket(me: User) {
    try {
      if (this.subSocket) {
        Atmosphere.unsubscribe();
        this.subSocket = null;
      }

      if (me) {
        this.log('Subscribing to web socket for', me);

        this.request = {
          url: this.apiUrl + me.id,
          transport: "websocket",
          fallbackTransport: "long-polling",
          maxRequest: 100,
          trackMessageLength: true,
          reconnectInterval: 5000,
          maxReconnectOnClose: 5,
          pollingInterval: 1000,
          shared: true,
          withCredentials: true,
          headers: {},

          onMessage: (response: Atmosphere.Response) => {
            this.log(response.status, response.reasonPhrase, response.responseBody);
            this.messageSubject.next(JSON.parse(response.responseBody) as Notification);
          },

          onReconnect: (req, response) => {
            this.log("notifications.service: onReconnect", response);
            // refresh the access token
            this.keycloakService.updateToken().then(value => {
              this.keycloakService.getToken().then(token => this.request.headers['access_token'] = token);
            });
          }
        };

        this.keycloakService.getToken().then((value) => {
          this.request.headers['access_token'] = value;
          this.subSocket = Atmosphere.subscribe(this.request);
        });
      }
    } catch (e) {
      this.log("Error: " + e);
    }
  }
}
