import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
import { Socket } from 'ngx-socket-io';
import { DialogsService } from './dialogs.service';
import { EventNames, RootEventName } from '../models/events-names.models';
import { WebsocketMessage, Origin } from '../models/websocket.models';
import { SubscriptionEntity } from '../models/socketio.models';
import { SubscriptionKind } from '../types/socketio.types';

@Injectable({
    providedIn: 'root'
})
export class SocketioService {
    public kafkaMessageObserver: BehaviorSubject<WebsocketMessage<any>>;
    private topics: Map<string, SubscriptionEntity>;
    private timeoutOffline: NodeJS.Timeout = null;
    private offlineDialog = null;

    /*----------------------------------------------------------------------------------------------------------------------------------------------------------
    ----------------------------------------------------------------------------------------------------------------------------------------------------------*/
    constructor(private socket: Socket, private dialogsService: DialogsService) {}

    /*----------------------------------------------------------------------------------------------------------------------------------------------------------
    ----------------------------------------------------------------------------------------------------------------------------------------------------------*/
    public init(): void {
        this.topics = new Map<string, any>();
        this.kafkaMessageObserver = new BehaviorSubject(undefined);

        //TODO Apriamo il payload del token
        //const decoded: any = jwt_decode(accessToken);

        /*------------------------------------------------------------------------------------------------------------------------------------------------------
            Evento alla connessione del websocket
        ------------------------------------------------------------------------------------------------------------------------------------------------------*/
        this.socket.ioSocket.on('connect', async () => {
            console.log(`Socketio connected!`);

            //Rinnoviamo il subscribe ai vecchi topics
            this.topicsRefresh();
            // TODO - chiedere utente, subscribe ai topic, etc

            //Spegnamo il messaggio di connessione non disponibile
            if (this.timeoutOffline) clearTimeout(this.timeoutOffline);
            if (this.offlineDialog) {
                (await this.offlineDialog).dismiss();
                //Per ricaricare le risorse. In futuro mettere un messaggio websocket di refresh
                window.location.reload();
            }
        });

        /*------------------------------------------------------------------------------------------------------------------------------------------------------
            Evento alla disconnessione del websocket
        ------------------------------------------------------------------------------------------------------------------------------------------------------*/
        this.socket.ioSocket.on('disconnect', () => {
            console.error(`Socketio disconnected!`);
            this.timeoutOffline = setTimeout(() => {
                this.offlineDialog = this.dialogsService.openOfflineDialog();
            }, 2000);
        });

        /*------------------------------------------------------------------------------------------------------------------------------------------------------
            Stampiamo a frontend messaggi di errore in arrivo dal backend, a seguito di messaggi ws errati o incompleti
        ------------------------------------------------------------------------------------------------------------------------------------------------------*/
        this.socket.on('ERROR', (message: any) => {
            console.error(message);
        });

        /*------------------------------------------------------------------------------------------------------------------------------------------------------
            Evento "acchiappatutto" alla ricezione di dati sul websocket
        ------------------------------------------------------------------------------------------------------------------------------------------------------*/
        // this.socket.ioSocket.onAny((event, ...args) => {
        //     console.log(`SocketIo on event ${event} is receiving: ${JSON.stringify(args[0])}`);
        //     this.kafkaMessageObserver.next(args[0]);
        // });
    }

    /*----------------------------------------------------------------------------------------------------------------------------------------------------------
        Refresh subscription after socket reconnection
    ----------------------------------------------------------------------------------------------------------------------------------------------------------*/
    private topicsRefresh() {
        //TODO prendere dalla mappa dei topic const currentTopics = this.state.getValue().topics;
        for (const [topic, value] of this.topics) {
            console.log(`Refreshing topic subscription: ${JSON.stringify(value)}`);
            this.socket.emit(RootEventName.subscribe, {
                topic,
                eventName: RootEventName.subscribe,
                origin: Origin.app,
                description: value.description
            });
        }
    }

    /*----------------------------------------------------------------------------------------------------------------------------------------------------------
        Add a topic to listen
    ----------------------------------------------------------------------------------------------------------------------------------------------------------*/
    public topicSubscribe(topic: string, kind: SubscriptionKind, description: string): void {
        if (topic) {
            //Se non ancora presente...
            if (!this.topics.has(topic)) {
                this.topics.set(topic, { topic, kind, description });
                // Emit message of name subscribe
                this.socket.emit(RootEventName.subscribe, {
                    topic,
                    eventName: RootEventName.subscribe,
                    origin: Origin.app,
                    description
                });
            }
        }
    }

    /*----------------------------------------------------------------------------------------------------------------------------------------------------------
        Remove a topic to listen
    ----------------------------------------------------------------------------------------------------------------------------------------------------------*/
    public topicUnsubscribe(topic: string): void {
        if (topic) {
            //Se presente
            if (this.topics.has(topic)) {
                this.topics.delete(topic);
                // Emit message of name unsubscribe
                this.socket.emit(RootEventName.unsubscribe, {
                    topic,
                    eventName: RootEventName.unsubscribe,
                    origin: Origin.app
                });
            }
        }
    }

    /*----------------------------------------------------------------------------------------------------------------------------------------------------------
        Publish message on topic
    ----------------------------------------------------------------------------------------------------------------------------------------------------------*/
    public publishTopic<Payload>(
        topic: string,
        eventName: EventNames,
        options?: {
            payload?: Payload;
            destination?: Origin;
        }
    ): void {
        const { payload, destination } = options ?? {};
        if (topic) {
            const message: WebsocketMessage<Payload> = {
                topic,
                eventName,
                origin: Origin.app,
                payload
            };
            console.log(`SocketIo publishing payload ${JSON.stringify(payload)} to topic ${topic} on event name ${eventName}`);
            // Emit message on event publish
            this.socket.emit(RootEventName.publish, message);
        }
    }

    /*----------------------------------------------------------------------------------------------------------------------------------------------------------
        Create an Observable from socket listener to event
    ----------------------------------------------------------------------------------------------------------------------------------------------------------*/
    public getMessage = <T>(event: string): Observable<WebsocketMessage<T>> => {
        const message: BehaviorSubject<WebsocketMessage<T>> = new BehaviorSubject(undefined);
        this.socket.on(event, (kafkaMessage: WebsocketMessage<T>) => {
            //console.log(`SocketIo on event ${event} is receiving: ${JSON.stringify(kafkaMessage)}`);
            message.next(kafkaMessage);
        });
        return message.asObservable();
    };

    /*----------------------------------------------------------------------------------------------------------------------------------------------------------
    ----------------------------------------------------------------------------------------------------------------------------------------------------------*/
    public getSubscribedTopics() {
        return this.topics;
    }
}
