import { BlobHTTPHeaders, BlobServiceClient, ContainerClient } from '@azure/storage-blob';
import { datadogLogs } from '@datadog/browser-logs';
import dayjs from 'dayjs';

import {
  DataStream,
  EventStream,
  IDataStreamConsumer,
  IEventStreamConsumer,
  MIME_TYPES,
  ParallelQueue,
  getFileExtension,
  hasText,
  isPromise,
  lookupMimeType,
} from 'core/utils';

import { UploadCompleteEvent, UploadErrorEvent, UploadFileJob, UploadProgressEvent, UploadStartEvent } from '../types';

type FileUploadContext = {
  fileId: string;
  fileName: string;
  isComplete: boolean;
  compressed: boolean;
  contentType: string | null;
  contentEncoding: string | null;
  /** Total number of bytes for the file **BEFORE** compression. */
  fileSize: number;
  /** Total number of bytes for the file **AFTER** compression.  `null` indicates the file was not compressed. */
  compressedSize: number | null;
  /** Total number of bytes uploaded so far. */
  progressSize: number;
};

/** Maximum number of concurrent file uploads.  Some points that were considered when deciding a suitable value:
 *    1) As of September 2024, Azure Blob Storage only supports HTTP/1.1.
 *    2) As of September 2024, Chrome has a limit of 6 concurrent connections to the same domain for HTTP/1.1.
 *    3) Additional connections will automatically be queued by the browser and started as soon as a connection slot opens up.
 *
 * With these points in mind, a value of 50 was chosen in order to minimize the amount of downtime between file uploads.
 */
const MAX_UPLOAD_CONCURRENCY = 50;

export class AzureBlobUploader {
  static [Symbol.toStringTag]() {
    return 'AzureBlobUploader';
  }

  private _abortControllers = new Map<string, AbortController>();

  /** The key is the fileId. */
  private _fileUploadContexts = new Map<string, FileUploadContext>();

  private _queue: ParallelQueue<UploadFileJob>;

  /** The key is the destination container name. */
  private _containerClients = new Map<string, Promise<ContainerClient>>();

  private _getSasFn: ((containerName: string) => Promise<string>) | null = null;

  private _streams = {
    onStart: new EventStream<UploadStartEvent>(),
    onProgress: new EventStream<UploadProgressEvent>(),
    onComplete: new EventStream<UploadCompleteEvent>(),
    onError: new EventStream<UploadErrorEvent>(),
  };

  public get streams(): {
    onStart: IEventStreamConsumer<UploadStartEvent>;
    onProgress: IEventStreamConsumer<UploadProgressEvent>;
    onComplete: IEventStreamConsumer<UploadCompleteEvent>;
    onError: IEventStreamConsumer<UploadErrorEvent>;
  } {
    return this._streams;
  }

  constructor() {
    this.initialize = this.initialize.bind(this);
    this.destroy = this.destroy.bind(this);
    this.uploadBlob = this.uploadBlob.bind(this);
    this.abort = this.abort.bind(this);
    this.uploadCount = this.uploadCount.bind(this);
    this.enqueueFiles = this.enqueueFiles.bind(this);
    this.getContainerClient = this.getContainerClient.bind(this);

    this._queue = new ParallelQueue<UploadFileJob>({
      debugName: 'AzureBlobUploader._queue',
      key: 'fileId',
      maxRunners: MAX_UPLOAD_CONCURRENCY,
      run: this.uploadBlob,
    });
  }

  public initialize(getSasFn: (containerName: string) => Promise<string>) {
    this._getSasFn = getSasFn;
  }

  public destroy() {
    this._getSasFn = null;
    this._abortControllers.forEach((abortController) => abortController.abort());
    this._abortControllers.clear();
    this._streams.onStart.clear();
    this._streams.onProgress.clear();
    this._streams.onComplete.clear();
    this._streams.onError.clear();
    this._queue.destroy();
    this._containerClients.clear();
  }

  public uploadCount() {
    return this._queue.length;
  }

  public enqueueFiles(files: UploadFileJob | UploadFileJob[]) {
    this._queue.enqueue(files);
    this._queue.run();
  }

  private async getContainerClient(containerName: string): Promise<ContainerClient> {
    if (this._getSasFn == null) {
      throw new Error('Cannot acquire a container client without an SAS URL function.');
    }

    if (this._containerClients.has(containerName)) {
      return await this._containerClients.get(containerName)!;
    }

    let containerClientResolver: (value: ContainerClient) => void;
    const containerClientPromise = new Promise<ContainerClient>((resolve) => {
      containerClientResolver = resolve;
    });
    this._containerClients.set(containerName, containerClientPromise);

    const sas = await this._getSasFn(containerName);
    const blobServiceClient = new BlobServiceClient(sas, undefined, {
      retryOptions: {
        maxTries: Number.POSITIVE_INFINITY,
      },
    });
    const containerClient = blobServiceClient.getContainerClient(''); // The container name is already included in the SAS URL.

    containerClientResolver!(containerClient); // Non-null assertion is safe because it was just initialized in the promise constructor.

    return await containerClientPromise;
  }

  private async uploadBlob(job: UploadFileJob) {
    const fileExtension = getFileExtension(job.fileName);
    const contentType = lookupMimeType(job.fileName);

    const blobClient = (await this.getContainerClient(job.containerName)).getBlockBlobClient(`${job.fileId}.${fileExtension}`);

    const blobHeaders: BlobHTTPHeaders = {
      blobContentType: contentType == null ? MIME_TYPES.OCTET_STREAM.contentType : contentType.contentType,
    };

    if (job.compress) {
      blobHeaders.blobContentEncoding = 'deflate';
    }

    const abortController = new AbortController();
    if (this._abortControllers.has(job.fileId)) {
      throw new Error(`File is already being uploaded.  Cancel existing upload first.  FileId: ${job.fileId}.`);
    } else {
      this._abortControllers.set(job.fileId, abortController);
    }

    let startMark: PerformanceMark | null = null;
    let endMark: PerformanceMark | null = null;
    let measure: PerformanceMeasure | null = null;
    let uploadSuccess = false;

    try {
      this._streams.onStart.emit({
        fileId: job.fileId,
        uploadStart: dayjs.utc(),
        url: blobClient.url,
        fileSize: job.fileSize,
        compressedSize: job.compressedSize,
      });

      const persistedMetadata: Record<string, string> = {
        fileId: job.fileId,
      };

      if (job.metadata != null) {
        for (const [metadataKey, metadataValue] of Object.entries(job.metadata)) {
          if (metadataValue == null || (typeof metadataValue === 'string' && !hasText(metadataValue))) {
            continue; // Skip undefined, null, or empty string values.
          } else if (typeof metadataValue === 'string') {
            persistedMetadata[metadataKey] = metadataValue.trim();
          } else {
            persistedMetadata[metadataKey] = metadataValue.toString();
          }
        }
      }

      const materializedFileBuffer = isPromise(job.buffer) ? await job.buffer : job.buffer;

      this._fileUploadContexts.set(job.fileId, {
        fileId: job.fileId,
        fileName: job.fileName,
        compressed: job.compress,
        contentType: contentType?.contentType ?? null,
        contentEncoding: blobHeaders.blobContentEncoding ?? null,
        fileSize: job.fileSize,
        compressedSize: job.compress ? materializedFileBuffer.byteLength : null,
        progressSize: 0,
        isComplete: false,
      });

      startMark = performance.mark(`upload-file:start:${job.fileId}`, {
        detail: { fileId: job.fileId, fileName: job.fileName, fileSize: job.fileSize, uploadSize: job.compress ? job.compressedSize : job.fileSize },
      });

      const response = await blobClient.uploadData(materializedFileBuffer, {
        maxSingleShotSize: 64, // TODO: Don't let this line go to production!  It's way too small and was only used for local testing.
        blobHTTPHeaders: blobHeaders,
        abortSignal: abortController.signal,
        onProgress: (event) => {
          const fileContext = this._fileUploadContexts.get(job.fileId)!;

          const chunkSize = event.loadedBytes - fileContext.progressSize;

          this._fileUploadContexts.set(job.fileId, {
            ...fileContext,
            progressSize: event.loadedBytes,
          });

          this._streams.onProgress.emit({
            fileId: job.fileId,
            fileSize: job.fileSize,
            compressedSize: job.compressedSize,
            progressSize: event.loadedBytes,
            chunkSize,
          });
        },
        metadata: persistedMetadata,
      });

      endMark = performance.mark(`upload-file:end:${job.fileId}`, { detail: { fileId: job.fileId } });
      uploadSuccess = true;

      this._fileUploadContexts.set(job.fileId, {
        ...this._fileUploadContexts.get(job.fileId)!,
        isComplete: true,
      });

      this._abortControllers.delete(job.fileId);

      this._streams.onComplete.emit({
        fileId: job.fileId,
        response,
        uploadEnd: dayjs.utc(),
        uploadSize: materializedFileBuffer.byteLength,
        url: blobClient.url,
        compressed: job.compress,
      });
    } catch (error) {
      // Need to note the time of the error in order to calculate the duration.  But we only do this if the upload has been started.
      if (endMark == null && startMark != null) {
        endMark = performance.mark(`upload-file:end:${job.fileId}`, { detail: { fileId: job.fileId } });
      }

      this._abortControllers.delete(job.fileId);

      this._streams.onError.emit({ fileId: job.fileId, error, uploadEnd: dayjs.utc() });

      throw error;
    } finally {
      // Don't do performance measurements if the upload was aborted or failed before it started.
      if (startMark == null) return;

      if (endMark == null) {
        endMark = performance.mark(`upload-file:end:${job.fileId}`, { detail: { fileId: job.fileId } });
      }

      if (measure == null) {
        measure = performance.measure(`upload-file:${job.fileId}`, {
          detail: { fileId: job.fileId, fileName: job.fileName, fileSize: job.fileSize, uploadSize: job.compress ? job.compressedSize : job.fileSize },
          start: startMark.name,
          end: endMark.name,
        });
      }

      datadogLogs.logger.info('upload-file', {
        fileId: job.fileId,
        fileSize: job.fileSize,
        uploadSize: job.compress ? job.compressedSize : job.fileSize,
        success: uploadSuccess,
      });
    }
  }

  /** Cancel the specified upload. */
  public abort(fileId: string) {
    const abortController = this._abortControllers.get(fileId);

    if (abortController == null) return;

    abortController.abort();
    this._abortControllers.delete(fileId);
  }
}
