import { reconcileEntryInto } from "big-m"
import { defined, Possible } from "big-m/dist/types/utils"
import { isNone, isSome, map, none, Option, some } from "fp-ts/lib/Option"
import { pipe } from "fp-ts/lib/pipeable"
import {
  Observable,
  Subscriber
} from "rxjs"
import {
  Subscription
} from "rxjs/internal/Subscription"
import { ref, Ref } from "vue"
import { spliceOutItem } from "./arrays"
import { defer, ms } from "./async"
import { buildError, ErrorBuilder } from "./errors"
import { methodOf, noop } from "./functions"
import { forEach, maxByNonScalar } from "./iterables"
import { lazy } from "./lazy"
import { consume, reconcileAddToSet } from "./map"

export function listenToObservableOnce<T>(observable: Observable<T>, onValue: (t: T) => void, onError = (noop as any as (e: Error) => void)) {
  const subscription: Subscription = observable.subscribe(
    async (t) => {
      onValue(t)
      await ms(0)
      subscription.unsubscribe()
    },
    async (e) => {
      onError(e)
      await ms(0)
      subscription.unsubscribe()
    }
  )
}

export function waitForObservable<T>(observable: Observable<T>) {
  return new Promise<T>((resolve, reject) => {
    listenToObservableOnce(
      observable,
      resolve,
      reject
    )
  })
}

export function waitForObservableCondition<T>(observable: Observable<T>, condition: (t: T) => boolean) {
  return new Promise<T>((resolve, reject) => {
    const subscription: Subscription = observable.subscribe(
      async t => {
        if (condition(t)) {
          resolve(t)
          await ms(0)
          subscription.unsubscribe()
        }
      },
      e => (subscription.unsubscribe(), reject(e))
    )
  })
}

export function waitForObservableEnd<T>(observable: Observable<T>) {
  let item: Possible<T> = undefined
  return new Promise<T>((resolve, reject) => {
    observable.subscribe(
      _item => item = _item,
      reject,
      () => resolve(defined(item, "Observable ended without yielding a value"))
    )
  })
}

export type SyncedObservable<T> = {
  ref: Ref<Possible<T>>,
  syncedObservable: Observable<T>,
}

export function syncObservable<T>(o: Observable<T>) {
  const retRef: Ref<Possible<T>> = ref(undefined)
  const initialValue = defer<T>()

  const syncedObservable = new Observable<T>(
    subscriber => {
      const subscription = o.subscribe(
        value => {
          retRef.value = value
          initialValue.resolve(value)
          subscriber.next(value)
        }
      )

      return methodOf(subscription, 'unsubscribe')
    }
  )

  async function sample() {
    await initialValue.promise
    return retRef.value as T
  }

  syncedObservable.subscribe()

  return {
    ref: retRef,
    sample,
    syncedObservable
  }
}

export function populateSet<T>(observable: Observable<T>, set: Set<T> = new Set()) {
  observable.subscribe(
    set.add.bind(set)
  )

  return set
}

/**
 * Pass events and errors from an Observable to a Subscriber.
 */
export function pipeObservableToSubscriber<T>(observable: Observable<T>, subscriber: Subscriber<T>) {
  return observable.subscribe(
    emitted => subscriber.next(emitted),
    error => subscriber.error(error)
  )
}

/**
 * Generate an Observable that can be extended by a caller so that it emits values
 * from other Observables.
 * 
 * An error event from any implicated Observable
 * will also terminate this one.
 */
export function extensibleObservable<T>() {
  const observables: Set<Observable<T>> = new Set()
  const subscribers: Set<Subscriber<T>> = new Set()
  const subscriberToSubscriptions: Map<Subscriber<T>, Set<Subscription>> = new Map()
  const observableToSubscriptions: Map<Observable<T>, Set<Subscription>> = new Map()

  function addSubscription(
    observable: Observable<T>,
    subscriber: Subscriber<T>
  ) {
    const subscription = pipeObservableToSubscriber(observable, subscriber)
    observables.add(observable)
    subscribers.add(subscriber)
    reconcileEntryInto(
      subscriberToSubscriptions,
      subscriber,
      subscription,
      reconcileAddToSet()
    )
    reconcileEntryInto(
      observableToSubscriptions,
      observable,
      subscription,
      reconcileAddToSet()
    )
  }

  function removeObservable(
    observable: Observable<T>
  ) {
    observables.delete(observable)
    const subscriptionsOpt = consume(observableToSubscriptions, observable)

    pipe(
      subscriptionsOpt,
      map(
        subscriptions => subscriptions.forEach(
          subscription => {
            subscription.unsubscribe()
            subscriberToSubscriptions.forEach(
              subscriberSubscriptions => subscriberSubscriptions.delete(subscription)
            )
          }
        )
      )
    )
  }

  function removeSubscriber(
    subscriber: Subscriber<T>
  ) {
    subscribers.delete(subscriber)
    const subscriptionsOpt = consume(subscriberToSubscriptions, subscriber)

    pipe(
      subscriptionsOpt,
      map(
        subscriptions => subscriptions.forEach(
          subscription => {
            subscription.unsubscribe()
            observableToSubscriptions.forEach(
              observableSubscriptions => observableSubscriptions.delete(subscription)
            )
          }
        )
      )
    )
  }

  const observable = new Observable<T>(
    subscriber => {
      subscribers.add(subscriber)
      observables.forEach(observable => addSubscription(observable, subscriber))

      return () => removeSubscriber(subscriber)
    }
  )

  const extend = (observable: Observable<T>) => {
    observables.add(observable)
    subscribers.forEach(subscriber => addSubscription(observable, subscriber))

    return () => removeObservable(observable)
  }

  return {
    extend,
    observable
  }
}

function normalizeEventQueue<T>(arg: Possible<T[] | (() => Iterable<T>)>): Iterable<T> {
  return typeof arg === "function"
    ? arg()
    : typeof arg === "undefined"
      ? []
      : arg
}

/**
 * 
 * @param initializeSharedState Code to initialize an object that will only be generated
 * once, and its data used in every subscriber's logic. 'eventQueue' has special meaning in this context: it will feed all its values to each new subscriber.
 * Receives 'next', 'complete', and 'error' callbacks. These may be used in exactly the
 * same way as one would use them in the body of a subscribe() function. *All* subscribers
 * to the produced Observable will be notified of each such event.
 * @param subscribe The functions defining how each subscriber generates its output using
 * the shared state object.
 * @returns An Observable that can be subscribed to any number of times without duplicating
 * the work encapsulated in initializeSharedState.
 */
export function statefulObservable<T>(
  initializeSharedState: (subscriberMethods: {
    next: (t: T) => void,
    error: (e: Error) => void,
    complete: () => void,
  }) => {
    eventQueue?: T[] | (() => Iterable<T>),
  }
): Observable<T> {
  const subscribers: Subscriber<T>[] = []
  let errorState: Option<Error> = none
  let endedState = false
  const next = (t: T) => subscribers.forEach(subscriber => subscriber.next(t))
  const error = (e: Error) => {
    errorState = some(e)
    subscribers.forEach(s => s.error(e))
    subscribers.splice(0, subscribers.length)
  }
  const complete = () => {
    endedState = true
    subscribers.forEach(s => s.complete())
    subscribers.splice(0, subscribers.length)
  }

  const sharedState = lazy(() => initializeSharedState({ next, error, complete }))

  return new Observable<T>(
    subscriber => {
      const shared = sharedState()
      const normalizedEventQueue = normalizeEventQueue(shared.eventQueue)

      if (isSome(errorState)) {
        subscriber.error(errorState.value)
      } else {
        forEach(
          normalizedEventQueue,
          methodOf(subscriber, 'next')
        )

        if (endedState) {
          subscriber.complete()
        } else {
          subscribers.push(subscriber)
        }
      }

      return () => spliceOutItem(subscribers, subscriber)
    }
  )
}

export function timeoutObservable<T>(inObservable: Observable<T>, timeoutMs: number, errorBuilder: ErrorBuilder<[number, Observable<T>]> = (ms: number) => `Expected observable to yield a value within ${ms}ms`) {
  return new Observable(
    subscriber => {
      let hasYielded = false

      ms(timeoutMs).then(
        () => {
          hasYielded || subscriber.error(
            buildError(
              errorBuilder,
              timeoutMs,
              inObservable
            )
          )
        }
      )

      return inObservable.subscribe(
        input => {
          subscriber.next(input)
          hasYielded = true
        },
        error => subscriber.error(error),
        () => subscriber.complete()
      )
    }
  )
}

type PromiseWithPriority<T> = {
  priority?: number | "TOP" | "LOWEST",
  promise: Promise<T>,
}

function toNumericalPriority(
  priority: number | "TOP" | "LOWEST" | undefined
) {
  if (priority === "TOP") {
    return Infinity
  } else if (priority === "LOWEST") {
    return -Infinity
  } else if (priority === undefined) {
    return 0
  } else {
    return priority
  }
}

function winsPriority(contender: { priority: number, index: number }, champion: { winningPriority: number, winningIndex: number }) {
  return contender.priority > champion.winningPriority || (contender.priority === champion.winningPriority && contender.index > champion.winningIndex)
}

function normalize<T>(promiseObj: Promise<T> | PromiseWithPriority<T>) {
  return {
    promise: ("promise" in promiseObj) ? promiseObj.promise : promiseObj,
    priority: toNumericalPriority(
      ("promise" in promiseObj) ? promiseObj.priority : undefined
    )
  }
}

export function promiseRaceToObservable<T>(promises: (Promise<T> | PromiseWithPriority<T>)[], errorPolicy = "throwAll" as "throwAll" | "recoverOnAnyValue" | "recoverOnTopValue") {
  return new Observable(
    subscriber => {
      const promiseCount = promises.length
      let endedCount = 0
      let state: Option<{
        value: T,
        winningPriority: number,
        winningIndex: number,
      }> = none

      const normalized = promises.map(normalize)

      const maxPriorityPromiseObj = lazy(
        () => maxByNonScalar(
          normalized,
          ({ priority }, { priority: winningPriority }, index, winningIndex) => winsPriority({ priority, index }, { winningPriority, winningIndex }) ? 1 : -1,
        )
      )

      normalized.forEach(
        (promiseObj, index) => {
          const { promise, priority } = promiseObj

          promise.then(
            resolution => {
              endedCount++

              if (isNone(state) || winsPriority({ priority, index }, state.value)) {
                subscriber.next(resolution)
                state = some({
                  value: resolution,
                  winningPriority: priority,
                  winningIndex: index
                })
              }

              if (endedCount === promiseCount) {
                subscriber.complete()
              }
            }
          ).catch(
            e => {
              if (errorPolicy === "throwAll") {
                subscriber.error(e)
              } else if (errorPolicy === "recoverOnAnyValue") {
                endedCount++
                if (endedCount === promiseCount && isNone(state)) {
                  subscriber.error(e)
                }
              } else {
                endedCount++
                if (promiseObj === maxPriorityPromiseObj()) {
                  subscriber.error(e)
                }
              }
            }
          )
        }
      )
    }
  )
}
