Source: event-stream.ts

/**
 * @fileoverview Event stream backed by the events API
 */

// ------------------------------------------------------------------------------
// Requirements
// ------------------------------------------------------------------------------

import { Promise } from 'bluebird';
import qs from 'querystring';
import { Readable } from 'stream';
import util from 'util';
import BoxClient from './box-client';

// ------------------------------------------------------------------------------
// Typedefs
// ------------------------------------------------------------------------------

type Options = {
	retryDelay: number;
	deduplicationFilterSize: number;
	fetchInterval: number;
};

type LongPollInfo = {
	max_retries: number;
	retry_timeout: number;
	url: string;
};

// ------------------------------------------------------------------------------
// Private
// ------------------------------------------------------------------------------

const DEFAULT_OPTIONS: Options = Object.freeze({
	deduplicationFilterSize: 5000,
	retryDelay: 1000,
	fetchInterval: 1000,
});

// ------------------------------------------------------------------------------
// Public
// ------------------------------------------------------------------------------

/**
 * Stream of Box events from a given client and point in time.
 * @param {BoxClient} client The client to use to get events
 * @param {string} streamPosition The point in time to start at
 * @param {Object} [options] Optional parameters
 * @param {int} [options.retryDelay=1000] Number of ms to wait before retrying after an error
 * @param {int} [options.deduplicationFilterSize=5000] Number of IDs to track for deduplication
 * @param {int} [options.fetchInterval=1000] Minimunm number of ms between calls for more events
 * @constructor
 * @extends Readable
 */
class EventStream extends Readable {
	_client: BoxClient;
	_streamPosition: string;
	_longPollInfo?: LongPollInfo;
	_longPollRetries: number;
	_dedupHash: Record<string, boolean>;
	_rateLimiter: Promise<any>;
	_options: Options;
	_retryTimer?: NodeJS.Timeout | number;

	constructor(
		client: BoxClient,
		streamPosition: string,
		options?: Partial<Options>
	) {
		super({
			objectMode: true,
		});

		/**
		 * @var {BoxClient} The client for making API calls
		 * @private
		 */
		this._client = client;

		/**
		 * @var {string} The latest stream position
		 * @private
		 */
		this._streamPosition = streamPosition;

		/**
		 * @var {?Object} The information for how to long poll
		 * @private
		 */
		this._longPollInfo = undefined;

		/**
		 * @var {int} The number of long poll requests we've made against one URL so far
		 * @private
		 */
		this._longPollRetries = 0;

		/**
		 * @var {Object.<string, boolean>} Hash of event IDs we've already pushed
		 * @private
		 */
		this._dedupHash = {};

		/**
		 * Rate limiting promise to ensure that events are not fetched too often,
		 * initially resolved to allow an immediate API call.
		 * @var {Promise}
		 * @private
		 */
		this._rateLimiter = Promise.resolve();

		this._options = Object.assign({}, DEFAULT_OPTIONS, options);
	}

	/**
	 * Retrieve the url and params for long polling for new updates
	 * @returns {Promise} Promise for testing purposes
	 * @private
	 */
	getLongPollInfo() {
		if (this.destroyed) {
			return Promise.resolve(false);
		}

		return this._client.events
			.getLongPollInfo()
			.then((longPollInfo: LongPollInfo) => {
				// On getting new long poll info, reset everything
				this._longPollInfo = longPollInfo;
				this._longPollRetries = 0;

				return this.doLongPoll();
			})
			.catch((err: any /* FIXME */) => {
				this.emit('error', err);

				// Only retry on resolvable errors
				if (!err.authExpired) {
					this.retryPollInfo();
				}
			});
	}

	/**
	 * Long poll for notification of new events.	We do this rather than
	 * polling for the events directly in order to minimize the number of API
	 * calls necessary.
	 * @returns {Promise} Promise for testing pruposes
	 * @private
	 */
	doLongPoll() {
		if (this.destroyed) {
			return Promise.resolve(false);
		}

		// If we're over the max number of retries, reset
		if (this._longPollRetries > this._longPollInfo!.max_retries) {
			return this.getLongPollInfo();
		}

		var url = this._longPollInfo!.url,
			qsDelim = url.indexOf('?'),
			query = {};

		// Break out the query params, otherwise the request URL gets messed up
		if (qsDelim > 0) {
			query = qs.parse(url.substr(qsDelim + 1));
			url = url.substr(0, qsDelim);
		}

		(query as Record<string, any>).stream_position = this._streamPosition;

		var options = {
			qs: query,
			timeout: this._longPollInfo!.retry_timeout * 1000,
		};

		this._longPollRetries += 1;
		return this._client
			.wrapWithDefaultHandler(this._client.get)(url, options)
			.then((data: any /* FIXME */) => {
				if (this.destroyed) {
					return false;
				}

				if (data.message === 'reconnect') {
					return this.getLongPollInfo();
				}

				// We don't expect any messages other than reconnect and new_change, so if
				// we get one just retry the long poll
				if (data.message !== 'new_change') {
					return this.doLongPoll();
				}

				return this.fetchEvents();
			})
			.catch(() => {
				this.retryPollInfo();
			});
	}

	/**
	 * Retries long-polling after a delay.
	 * Does not attempt if stream is already destroyed.
	 * @returns {void}
	 * @private
	 */
	retryPollInfo() {
		if (!this.destroyed) {
			this._retryTimer = setTimeout(
				() => this.getLongPollInfo(),
				this._options.retryDelay
			);
		}
	}

	/**
	 * Fetch the latest group of events and push them into the stream
	 * @returns {Promise} Promise for testing purposes
	 * @private
	 */
	fetchEvents() {
		if (this.destroyed) {
			return Promise.resolve(false);
		}

		var eventParams = {
			stream_position: this._streamPosition,
			limit: 500,
		};

		// Get new events after the rate limiter expires
		return this._rateLimiter.then(() =>
			this._client.events
				.get(eventParams)
				.then((events: any /* FIXME */) => {
					// Reset the rate limiter
					this._rateLimiter = Promise.delay(this._options.fetchInterval);

					// If the response wasn't what we expected, re-poll
					if (!events.entries || !events.next_stream_position) {
						return this.doLongPoll();
					}

					this._streamPosition = events.next_stream_position;

					// De-duplicate the fetched events, since the API often returns
					// the same events at multiple subsequent stream positions
					var newEvents = events.entries.filter(
						(event: any /* FIXME */) => !this._dedupHash[event.event_id]
					);

					// If there aren't any non-duplicate events, go back to polling
					if (newEvents.length === 0) {
						return this.doLongPoll();
					}

					// Pause the stream to avoid race conditions while pushing in the new events.
					// Without this, _read() would be called again from inside each push(),
					// resulting in multiple parallel calls to fetchEvents().
					// See https://github.com/nodejs/node/issues/3203
					var wasPaused = this.isPaused();
					this.pause();

					// Push new events into the stream
					newEvents.forEach((event: any /* FIXME */) => {
						this._dedupHash[event.event_id] = true;
						this.push(event);
					});

					if (!wasPaused) {
						// This will deliver the events and trigger the next call to _read() once they have been consumed.
						this.resume();
					}

					// Once the deduplication filter gets too big, clean it up
					if (
						Object.keys(this._dedupHash).length >=
						this._options.deduplicationFilterSize
					) {
						this.cleanupDedupFilter(events.entries);
					}

					return true;
				})
				.catch((err: any /* FIXME */) => {
					this.emit('error', err);

					this.retryPollInfo();
				})
		);
	}

	/**
	 * Clean up the deduplication filter, to prevent it from growing
	 * too big and eating up memory.	We look at the latest set of events
	 * returned and assume that any IDs not in that set don't need to be
	 * tracked for deduplication any more.
	 * @param {Object[]} latestEvents The latest events from the API
	 * @returns {void}
	 * @private
	 */
	cleanupDedupFilter(latestEvents: any /* FIXME */) {
		var dedupIDs = Object.keys(this._dedupHash);

		dedupIDs.forEach((eventID) => {
			var isEventCleared = !latestEvents.find(
				(e: any /* FIXME */) => e.event_id === eventID
			);
			if (isEventCleared) {
				delete this._dedupHash[eventID];
			}
		});
	}

	/**
	 * Implementation of the stream-internal read function.	This is called
	 * by the stream whenever it needs more data, and will not be called again
	 * until data is pushed into the stream.
	 * @returns {void}
	 * @private
	 */
	_read() {
		// Start the process of getting new events
		this.getLongPollInfo();
	}

	/**
	 * Implementation of stream-internal `_destroy` function (v8.0.0 and later).
	 * Called by stream consumers to effectively stop polling via the public
	 * `destroy()`.
	 * @returns {void}
	 * @private
	 */
	_destroy() {
		clearTimeout(this._retryTimer as number);
		delete this._retryTimer;
	}
}

// backwards-compat for Node.js pre-v8.0.0
/* istanbul ignore if */
if (typeof Readable.prototype.destroy !== 'function') {
	/**
	 * Destroys the stream.  Rough polyfill for `Readable#destroy`.
	 * @returns {void}
	 * @public
	 */
	EventStream.prototype.destroy = function (error?: Error | undefined) {
		if (!this.destroyed) {
			process.nextTick(() => {
				this.emit('close');
			});
			this.destroyed = true;
			this._destroy();
		}
		return this;
	};
}

export = EventStream;