import { BlobHTTPHeaders, BlobServiceClient } from '@azure/storage-blob';
import { datadogLogs } from '@datadog/browser-logs';
import dayjs from 'dayjs';

import {
  DataStream,
  EventStream,
  IDataStreamConsumer,
  IEventStreamConsumer,
  IntervalMergeCalculator,
  IntervalPair,
  MIME_TYPES,
  ParallelQueue,
  getFileExtension,
  hasText,
  isPromise,
  lookupMimeType,
} from 'core/utils';

import { CURRENT_THROUGHPUT_DURATION } from '../constants';
import { FileUploadMetadata, UploadCompleteEvent, UploadErrorEvent, UploadProgressEvent, UploadQueueItem, UploadStartEvent } from '../types';

type ThroughputIntervalData = {
  progressSize: number;
  fileSize: number;
};

type FileUploadContext = {
  fileId: string;
  fileName: string;
  contentType: string | null;
  contentEncoding: string | null;
  /** Total number of bytes for the file **AFTER** compression. */
  blobSize: number;
  /** Number of bytes uploaded in the current chunk. */
  lastChunkSize: number | null;
  /** Total number of bytes uploaded so far. */
  progressSize: number;
};

const MAX_UPLOAD_CONCURRENCY = 50; // chrome max total connections

export class AzureBlobUploader {
  static [Symbol.toStringTag]() {
    return 'AzureBlobUploader';
  }

  /** The key is the destination container name. */
  private _blobClients = new Map<string, BlobServiceClient>();

  private _abortControllers = new Map<string, AbortController>();

  /** The key is the full url for the file upload including the SAS. */
  private _fileUploadContexts = new Map<string, FileUploadContext>();

  private _recentActivity: IntervalMergeCalculator<ThroughputIntervalData> | null = null;

  private _intervalCalculatorId: number | null = null;

  private _perfObserver: PerformanceObserver | null = null;

  private _queue: ParallelQueue<UploadQueueItem>;

  public getSasFn: ((containerName: string) => Promise<string>) | null = null;

  private _streams = {
    onStart: new EventStream<UploadStartEvent>(),
    onProgress: new EventStream<UploadProgressEvent>(),
    onComplete: new EventStream<UploadCompleteEvent>(),
    onError: new EventStream<UploadErrorEvent>(),
    throughput: new DataStream<number | null>(null),
  };

  public get streams(): {
    onStart: IEventStreamConsumer<UploadStartEvent>;
    onProgress: IEventStreamConsumer<UploadProgressEvent>;
    onComplete: IEventStreamConsumer<UploadCompleteEvent>;
    onError: IEventStreamConsumer<UploadErrorEvent>;
    throughput: IDataStreamConsumer<number | null>;
  } {
    return this._streams;
  }

  constructor() {
    this.initialize = this.initialize.bind(this);
    this.destroy = this.destroy.bind(this);
    this.initializeClient = this.initializeClient.bind(this);
    this.uploadBlob = this.uploadBlob.bind(this);
    this.abort = this.abort.bind(this);
    this.uploadCount = this.uploadCount.bind(this);
    this.enqueueFile = this.enqueueFile.bind(this);
    this.handlePerformanceObserve = this.handlePerformanceObserve.bind(this);
    this.handleIntervalCalculation = this.handleIntervalCalculation.bind(this);
    this.adjustRecentActivity = this.adjustRecentActivity.bind(this);
    this.run = this.run.bind(this);

    this._queue = new ParallelQueue<UploadQueueItem>({
      key: 'fileId',
      maxRunners: MAX_UPLOAD_CONCURRENCY,
      run: this.uploadBlob,
    });
  }

  public initialize(getSasFn: (containerName: string) => Promise<string>) {
    this.getSasFn = getSasFn;

    // Hook up the performance observer if it's not already running.
    if (this._perfObserver == null) {
      this._perfObserver = new PerformanceObserver(this.handlePerformanceObserve);
      this._perfObserver.observe({ entryTypes: ['resource'] });
    }

    // Start the interval if it's not already running.
    if (this._intervalCalculatorId == null) {
      this._intervalCalculatorId = window.setInterval(this.handleIntervalCalculation, 1000);
    }

    this._recentActivity = new IntervalMergeCalculator();
  }

  public destroy() {
    this.getSasFn = null;
    this._recentActivity = null;
    this._abortControllers.forEach((abortController) => abortController.abort());
    this._abortControllers.clear();
    this._streams.onStart.clear();
    this._streams.onProgress.clear();
    this._streams.onComplete.clear();
    this._streams.onError.clear();
    this._queue.clear();
    this._blobClients.clear();

    if (this._perfObserver != null) {
      this._perfObserver.disconnect();
      this._perfObserver = null;
    }

    if (this._intervalCalculatorId != null) {
      window.clearInterval(this._intervalCalculatorId);
      this._intervalCalculatorId = null;
    }
  }

  public uploadCount() {
    return this._queue.length;
  }

  public enqueueFile(
    fileId: string,
    fileName: string,
    compress: boolean,
    buffer: ArrayBufferLike | Promise<ArrayBufferLike>,
    containerName: string,
    metadata: FileUploadMetadata | null,
  ) {
    this._queue.enqueue({ fileId, fileName, compress, buffer, metadata, containerName });
  }

  /** Start the upload worker queue. */
  public run() {
    this._queue.run();
  }

  private handlePerformanceObserve(entries: PerformanceObserverEntryList, _observer: PerformanceObserver) {
    if (this._recentActivity == null) throw new Error('Recent activity calculator is not initialized.');

    let wasActivityObserved = false;

    for (const entry of entries.getEntries()) {
      const fileInfo = this._fileUploadContexts.get(entry.name);

      // Don't log performance entries that weren't created by this uploader.
      if (fileInfo == null) continue;

      wasActivityObserved = true;

      datadogLogs.logger.info('upload-file-chunk', {
        timestamp: new Date(performance.timeOrigin + entry.startTime).toISOString(),
        duration: entry.duration * 1000000, // Datadog expects durations to be in nanoseconds.
        fileId: fileInfo.fileId,
        blobSize: fileInfo.blobSize,
        chunkSize: fileInfo.lastChunkSize ?? 0,
        progressSize: fileInfo.progressSize,
        ...(fileInfo.contentType != null ? { contentType: fileInfo.contentType } : {}),
        ...(fileInfo.contentEncoding != null ? { contentEncoding: fileInfo.contentEncoding } : {}),
      });

      this._recentActivity.addInterval(performance.timeOrigin + entry.startTime, performance.timeOrigin + entry.startTime + entry.duration, {
        progressSize: fileInfo.progressSize,
        fileSize: fileInfo.blobSize,
      });

      this._streams.onProgress.emit({
        fileId: fileInfo.fileId,
        loadedBytes: fileInfo.progressSize,
        chunkStart: performance.timeOrigin + entry.startTime,
        chunkEnd: performance.timeOrigin + entry.startTime + entry.duration,
        compressed: fileInfo.contentEncoding != null,
      });
    }

    if (wasActivityObserved) {
      this.handleIntervalCalculation();
    }
  }

  private handleIntervalCalculation() {
    if (this._recentActivity == null) throw new Error('Recent activity calculator is not initialized.');

    const now = performance.now();

    this._recentActivity.adjust((interval) => this.adjustRecentActivity(interval, now));
    const results = this._recentActivity.calculate(now);

    throw new Error('TODO: Finish this.');
    //this._streams.throughput.emit(results.data?.adjustedSize ?? null);
  }

  private adjustRecentActivity(interval: IntervalPair<ThroughputIntervalData>, now: number) {
    const threshold = now - CURRENT_THROUGHPUT_DURATION;

    if (interval.start == null) {
      throw new Error('Cannot adjust an interval without a start date.');
    } else if (interval.end != null && interval.end < threshold) {
      // Remove intervals that are older than the threshold.
      return null;
    }

    return interval;
  }

  private async initializeClient(containerName: string) {
    // If the blob client is already initialized, just return.
    if (this._blobClients.has(containerName)) return;

    if (this.getSasFn == null) {
      throw new Error('AzureBlobUploader.getSasFn cannot be null or undefined.');
    }

    const sasUrl = await this.getSasFn(containerName);

    this._blobClients.set(
      containerName,
      new BlobServiceClient(sasUrl, undefined, {
        retryOptions: {
          maxTries: Number.POSITIVE_INFINITY,
        },
      }),
    );
  }

  private async uploadBlob(job: UploadQueueItem) {
    await this.initializeClient(job.containerName);

    const containerClient = this._blobClients.get(job.containerName);

    if (containerClient == null) {
      throw new Error(`Unable to initialize blob upload client for container: "${job.containerName}".`);
    }

    const fileExtension = getFileExtension(job.fileName);
    const contentType = lookupMimeType(job.fileName);

    const blobClient = containerClient.getContainerClient('').getBlockBlobClient(`${job.fileId}.${fileExtension}`);

    const blobHeaders: BlobHTTPHeaders = {
      blobContentType: contentType == null ? MIME_TYPES.OCTET_STREAM.contentType : contentType.contentType,
    };

    if (job.compress) {
      blobHeaders.blobContentEncoding = 'deflate';
    }

    const abortController = new AbortController();
    if (this._abortControllers.has(job.fileId)) {
      throw new Error(`File is already being uploaded.  Cancel existing upload first.  FileId: ${job.fileId}.`);
    } else {
      this._abortControllers.set(job.fileId, abortController);
    }

    try {
      this._streams.onStart.emit({ fileId: job.fileId, uploadStart: dayjs.utc(), url: blobClient.url });

      const persistedMetadata: Record<string, string> = {
        fileId: job.fileId,
      };

      if (job.metadata != null) {
        for (const [metadataKey, metadataValue] of Object.entries(job.metadata)) {
          if (metadataValue == null || (typeof metadataValue === 'string' && !hasText(metadataValue))) {
            continue; // Skip undefined, null, or empty string values.
          } else if (typeof metadataValue === 'string') {
            persistedMetadata[metadataKey] = metadataValue.trim();
          } else {
            persistedMetadata[metadataKey] = metadataValue.toString();
          }
        }
      }

      let chunkStart = performance.now();
      const materializedFileBuffer = isPromise(job.buffer) ? await job.buffer : job.buffer;
      this._fileUploadContexts.set(blobClient.url, {
        fileId: job.fileId,
        fileName: job.fileName,
        contentType: contentType?.contentType ?? null,
        contentEncoding: blobHeaders.blobContentEncoding ?? null,
        blobSize: materializedFileBuffer.byteLength,
        lastChunkSize: null,
        progressSize: 0,
      });
      const response = await blobClient.uploadData(materializedFileBuffer, {
        blobHTTPHeaders: blobHeaders,
        abortSignal: abortController.signal,
        onProgress: (event) => {
          this._fileUploadContexts.set(blobClient.url, {
            ...this._fileUploadContexts.get(blobClient.url)!,
            progressSize: event.loadedBytes,
            lastChunkSize: materializedFileBuffer.byteLength - event.loadedBytes,
          });
          const chunkEnd = performance.now();
          this._streams.onProgress.emit({ fileId: job.fileId, ...event, chunkStart, chunkEnd, compressed: job.compress });
          chunkStart = chunkEnd;

          //console.log('progress', { fileId, ...event });
        },
        metadata: persistedMetadata,
      });

      this._abortControllers.delete(job.fileId);

      this._streams.onComplete.emit({
        fileId: job.fileId,
        response,
        uploadEnd: dayjs.utc(),
        uploadSize: materializedFileBuffer.byteLength,
        url: blobClient.url,
        compressed: job.compress,
      });
    } catch (error) {
      this._abortControllers.delete(job.fileId);

      this._streams.onError.emit({ fileId: job.fileId, error, uploadEnd: dayjs.utc() });

      throw error;
    }
  }

  /** Cancel the specified upload. */
  public abort(fileId: string) {
    const abortController = this._abortControllers.get(fileId);

    if (abortController == null) return;

    abortController.abort();
    this._abortControllers.delete(fileId);
  }
}
