import {asyncScheduler, filter, ReplaySubject} from 'rxjs';
import {Bound} from '../../../../decorators/methods/Bound';
import {sleep} from '../../../misc/sleep';
import {QxQueueTaskOptions} from './QxQueueTaskOptions';
import {QxQueueTask} from './QxQueueTask';
import {QxQueueOptions} from './QxQueueOptions';

/**
 * Simple task queue.
 *
 * It allows to pass async and regular functions to .next() method
 * which are executed in order, one by one, after resolved or rejected.
 *
 * This queue should remain SIMPLE.
 *
 * @example
 *
 *      const q = new QxQueue();
 *
 *      // calls will be executed one by one
 *      q.next(() => fetch(...))
 *      q.next(() => fetch(...))
 *      q.next(() => fetch(...))
 *      q.next(() => fetch(...))
 */
export class QxQueue {
    #tasks$: ReplaySubject<QxQueueTask<any>>;
    #cache: Record<string, any>;

    constructor(options: QxQueueOptions = {}) {
        this.#tasks$ = new ReplaySubject(options.bufferSize ?? 100);
        this.#cache = {};

        this.#processQueue().andWeAreDone();
    }

    /**
     * Schedules next task.
     *
     * Task will be executed when previously scheduled task is resolved or rejected.
     *
     * @param taskFn Task to be executed
     * @param options Task options
     * @returns Promise of value returned from task function
     */
    @Bound()
    async next<T>(taskFn: () => T | Promise<T>, options: QxQueueTaskOptions = {}): Promise<T> {
        return new Promise((resolve, reject) => {
            const task = {taskFn, options, resolve, reject};
            const infoObject = {taskFn: task.taskFn, options: task.options};

            console.debug(`[QxQ] Adding`, infoObject);
            this.#tasks$.next(task);
        });
    }

    async #processQueue(): Promise<void> {
        console.debug(`[QxQ] Awaiting`);
        const task = await this.#tasks$.pipe(filter(it => !!it.taskFn));
        const infoObject = {taskFn: task.taskFn, options: task.options};

        try {
            const taskId = task.options?.taskId;
            const ttl = task.options?.ttl;

            const cachedResult = taskId
                ? this.#cache[taskId]
                : null;

            if (cachedResult) {
                console.debug(`[QxQ] Using cache`, infoObject);
                task.resolve?.(cachedResult);
            } else {
                console.debug(`[QxQ] Starting`, infoObject);
                const result = await task.taskFn?.();

                if (taskId && ttl) {
                    console.debug(`[QxQ] Caching result`, infoObject);
                    this.#cache[taskId] = result;

                    sleep(ttl).then(() => {
                        if (taskId) {
                            console.debug(`[QxQ] Clearing cache`, infoObject);
                            delete this.#cache[taskId];
                        }
                    });
                }

                task.resolve?.(result);
            }
        } catch (e) {
            console.debug(`[QxQ] Rejecting`, infoObject);
            task.reject?.(e);
        } finally {
            console.debug(`[QxQ] Finishing`, infoObject);

            delete task.resolve;
            delete task.reject;
            delete task.taskFn;
            delete task.options;

            asyncScheduler.schedule(() => this.#processQueue());
        }
    }
}
