import { isPromise } from './isPromise';

const DEFAULT_ASYNC_QUEUE_OPTIONS = {
  maxRunners: 5,
  run: () => {
    throw new Error('ParallelQueueOptions.run must be set before calling run.');
  },
} satisfies Omit<ParallelQueueOptions<object>, 'key'>;

/** Queue to execute multiple async tasks up to a configurable number of parallelism. */
export class ParallelQueue<TJob extends object> {
  static [Symbol.toStringTag]() {
    return 'ParallelQueue';
  }

  /** Maximum number of simultaneous runners to execute at a time. */
  private _maxRunners: number;

  /** Caller's queue runner callback. */
  private _runner: (job: TJob) => void | Promise<void>;

  private _onQueueStart: (() => void) | null = null;

  private _onQueueStop: (() => void) | null = null;

  /** Lock to prevent the queue runner from being recursively executed. */
  private _runLock = false;

  /** Counter to help keep track of the number of runners that are executing simultaneously. */
  private _runningCount = 0;

  /** Promise and resolve function to allow for waiting until a slot opens up in the concurrency limiter logic. */
  private _maxConcurrencyResolvers: { promise: Promise<void>; resolve: () => void } | null = null;

  /** Queue for the jobs. */
  private _jobs: TJob[] = [];

  constructor(options: ParallelQueueOptions<TJob>) {
    this.enqueue = this.enqueue.bind(this);
    this.run = this.run.bind(this);
    this.runInternal = this.runInternal.bind(this);
    this.clear = this.clear.bind(this);
    this.destroy = this.destroy.bind(this);

    const normalizedOptions = {
      ...DEFAULT_ASYNC_QUEUE_OPTIONS,
      ...options,
    };

    if (normalizedOptions.run === DEFAULT_ASYNC_QUEUE_OPTIONS.run) {
      throw new Error('AsyncQueueOptions.run must be set.');
    }

    this._maxRunners = normalizedOptions.maxRunners;
    this._runner = normalizedOptions.run;
    this._onQueueStart = normalizedOptions.onQueueStart ?? null;
    this._onQueueStop = normalizedOptions.onQueueStop ?? null;
  }

  public enqueue(job: TJob) {
    this._jobs.push(job);
  }

  public clear() {
    this._jobs = [];
  }

  public destroy() {
    this._jobs = [];
    this._runner = DEFAULT_ASYNC_QUEUE_OPTIONS.run;
  }

  public get length() {
    return this._jobs.length;
  }

  public run() {
    // Hide the fact that the queue loop is asynchronous from the caller.  It's likely better for the caller to utilize the onQueueStart and onQueueStop
    // callbacks if they need to perform any initialization or cleanup.
    this.runInternal();
  }

  private async runInternal() {
    if (this._runLock) return;

    try {
      this._runLock = true;
      this._onQueueStart?.();

      while (this._jobs.length > 0) {
        const queueItem = this._jobs.shift();

        if (queueItem == null) {
          throw new Error('Queue item is null.');
        }

        // We are at the max concurrency limit, wait for a slot to open up.
        if (this._maxConcurrencyResolvers) {
          await this._maxConcurrencyResolvers.promise;
        }

        try {
          // Increment the running number of jobs and setup a promise to resolve when a slot has opened up if necessary.
          this._runningCount++;

          if (this._runningCount >= this._maxRunners) {
            let newResolve: () => void;

            const newPromise = new Promise<void>((resolve) => {
              newResolve = resolve;
            });

            this._maxConcurrencyResolvers = { promise: newPromise, resolve: newResolve! }; // Non-null assertion because we just initialized them.  TS just doesn't infer that correctly yet (TS 5.3.3).
          }

          // Execute the job.
          const jobResult = this._runner(queueItem);

          if (isPromise(jobResult)) {
            await jobResult;
          }
        } finally {
          this._runningCount--;

          if (this._maxConcurrencyResolvers) {
            const tempResolver = this._maxConcurrencyResolvers.resolve;
            this._maxConcurrencyResolvers = null;
            tempResolver();
          }
        }
      }
    } finally {
      try {
        this._onQueueStop?.();
      } finally {
        this._runLock = false;
      }
    }
  }
}

export type ParallelQueueOptions<TJob extends object> = {
  /** Maximum number of simultaneous runners to execute at a time.  If the `run` callback is *not* `async` then this won't have any effect.  Default set to `5`. */
  maxRunners?: number;
  /** Name of the property that contains a uniquely identifying key. */
  key: keyof TJob;
  /** Primary callback to execute for each job in the queue. */
  run: (job: TJob) => void | Promise<void>;
  /** Event callback that is executed right before the queue loop begins execution.  This is **NOT** executed for each item in the queue, but instead is emitted only when the queue transitions from idle to running. */
  onQueueStart?: () => void;
  /** Event callback that is executed right after the queue loop ends execution.  This is **NOT** executed for each item in the queue, but instead is emitted only when the queue transitions from running to idle. */
  onQueueStop?: () => void;
};
