Source: enterprise-event-stream.ts

/**
 * @fileoverview Enterprise event stream backed by the enterprise events API
 */

// ------------------------------------------------------------------------------
// Requirements
// ------------------------------------------------------------------------------

import { Readable } from 'stream';
import BoxClient from './box-client';

// ------------------------------------------------------------------------------
// Typedefs
// ------------------------------------------------------------------------------

type Options = {
	streamPosition?: string;
	startDate?: string;
	endDate?: string;
	eventTypeFilter?: EventType[];
	pollingInterval?: number;
	chunkSize?: number;
	streamType?: 'admin_logs' | 'admin_logs_streaming';
};

type EventType = string /* FIXME */;

// ------------------------------------------------------------------------------
// Private
// ------------------------------------------------------------------------------

const DEFAULT_OPTIONS = Object.freeze({
	pollingInterval: 60, // seconds
	chunkSize: 500,
	streamType: 'admin_logs'
});

// ------------------------------------------------------------------------------
// Public
// ------------------------------------------------------------------------------

/**
 * Stream of Box enterprise events.
 *
 * By default, the stream starts from the current time.
 * Pass 'startDate' to start from a specific time.
 * Pass 'streamPosition' to start from a previous stream position, or '0' for all available past events (~1 year).
 * Once the stream catches up to the current time, it will begin polling every 'pollingInterval' seconds.
 * If 'pollingInterval' = 0, then the stream will end when it catches up to the current time (no polling).
 *
 * @param {BoxClient} client - The client to use to get events
 * @param {Object} [options] - Options
 * @param {string} [options.streamPosition] - The stream position to start from (pass '0' for all past events)
 * @param {string} [options.startDate] - The date to start from
 * @param {string} [options.endDate] - The date to end at
 * @param {EventType[]} [options.eventTypeFilter] - Array of event types to return
 * @param {int} [options.pollingInterval=60] - Polling interval (in seconds).  Pass 0 for no polling.
 * @param {int} [options.chunkSize=500] - Number of events to fetch per call (max = 500)
 * @constructor
 * @extends Readable
 */
class EnterpriseEventStream extends Readable {
	_client: BoxClient;
	_options: Options & Required<Pick<Options, 'pollingInterval' | 'chunkSize'>>;
	_streamPosition?: string;

	constructor(client: BoxClient, options?: Options) {
		super({
			objectMode: true,
		});

		/**
		 * @var {BoxClient} - The client for making API calls
		 * @private
		 */
		this._client = client;

		/**
		 * @var {Object} - Options
		 * @private
		 */
		this._options = Object.assign({}, DEFAULT_OPTIONS, options);

		// Handle the case where the caller passes streamPosition = 0 instead of streamPosition = '0'.
		if (
			this._options.streamType === 'admin_logs' &&
			!this._options.startDate &&
			!this._options.streamPosition &&
			(this._options.streamPosition as any) !== 0
		) {
			// If neither startDate nor streamPosition is specified, start from the current time.
			this._options.startDate = new Date()
				.toISOString()
				.replace(/\.000Z$/, '-00:00');
		}

		/**
		 * @var {?string} - The current stream position
		 * @private
		 */
		this._streamPosition = this._options.streamPosition;
	}

	/**
	 * @returns {?number} - Returns null if no events have been fetched from Box yet.
	 */
	getStreamPosition() {
		return this._streamPosition;
	}

	/**
	 * Get the stream state.
	 *
	 * @returns {Object} - The stream state
	 */
	getStreamState() {
		// We need to return both streamPosition and startDate, since streamPosition will be null until
		// the first set of events is returned from Box.
		return {
			streamPosition: this._streamPosition,
			startDate: this._options.startDate,
			endDate: this._options.endDate,
			eventTypeFilter: this._options.eventTypeFilter,
		};
	}

	/**
	 * Set the stream state.
	 *
	 * @param {Object} state - The stream state
	 * @returns {void}
	 */
	setStreamState(
		state: Pick<
			Options,
			'streamPosition' | 'startDate' | 'endDate' | 'eventTypeFilter'
		>
	) {
		// We need to set both streamPosition and startDate, since streamPosition will be null until
		// the first set of events is returned from Box.
		this._streamPosition = state.streamPosition;
		this._options.startDate = state.startDate;
		this._options.endDate = state.endDate;
		this._options.eventTypeFilter = state.eventTypeFilter;
	}

	/**
	 * Fetch the next chunk of events
	 *
	 * If there are no events, poll until events are available.
	 * If an error occurs, emit the error but continuing polling as usual.
	 * @param {Function} callback - Passed the array of events
	 * @returns {void}
	 * @private
	 */
	fetchEvents(callback: Function) {
		const self = this,
			params: {
				stream_type?: 'admin_logs' | 'admin_logs_streaming';
				stream_position?: string;
				created_after?: string;
				created_before?: string;
				event_type?: string;
				limit?: number;
			} = {
				stream_type: this._options.streamType
			};

		// Use the current stream position.
		// Handle the case where the caller passes streamPosition === 0 instead of streamPosition === '0'.
		if (this._streamPosition || (this._streamPosition as any) === 0) {
			params.stream_position = this._streamPosition;
		}

		if (this._options.streamType === 'admin_logs' && this._options.startDate) {
			params.created_after = this._options.startDate;
		}

		if (this._options.streamType === 'admin_logs' && this._options.endDate) {
			params.created_before = this._options.endDate;
		}

		if (this._options.eventTypeFilter) {
			params.event_type = this._options.eventTypeFilter.join(',');
		}

		if (this._options.chunkSize) {
			params.limit = this._options.chunkSize;
		}

		this._client.events.get(params, (
			err: any /* FIXME */,
			result: any /* FIXME */
		) => {
			if (err) {
				self.emit('error', err);
				// If there was a "permanent" error, we would call the callback with it here.
				// But it's not clear which errors are truly permanent?
				// If Box is down or returning errors for an extended period, we still want to resume when it recovers.
				// So, continue polling at the regular frequency.
				// Don't use a shorter retry interval (to avoid DDOSing Box).
			}

			if (err || !result || !result.entries || result.entries.length === 0) {
				if (!self._options.pollingInterval) {
					// If polling is disabled, end the stream.
					callback();
					return;
				}

				// There were no events returned (or an error occurred), so schedule another poll.
				const delay = self._options.pollingInterval * 1000;

				// Stream readers can use this to flush buffered events to a downstream system.
				self.emit('wait', delay);

				setTimeout(() => {
					self.fetchEvents(callback);
				}, delay);
				return;
			}

			// Only update the stream position if there were events returned.
			// The API currently returns next_stream_position = 0 if there are no events (may be a bug?).
			// But we don't want to start over at the beginning in that case, so ignore it.
			self._streamPosition = result.next_stream_position;

			// Notify the reader of the new stream position.
			// Stream readers can respond to the 'newStreamState' event to persist the stream state.
			self.emit('newStreamState', self.getStreamState());

			callback(null, result.entries);
		});
	}

	/**
	 * Implementation of the stream-internal read function.	This is called
	 * by the stream whenever it needs more data, and will not be called again
	 * until data is pushed into the stream.
	 * @returns {void}
	 * @private
	 */
	_read() {
		// Fetch the next chunk of events.
		const self = this;

		// This will poll forever until events are available.
		this.fetchEvents((err: any /* FIXME */, events: any /* FIXME */) => {
			if (err || !events || events.length === 0) {
				// Close the stream if there was a "permanent" failure or we reached the end of the events.
				self.push(null);
				return;
			}

			// Pause the stream to avoid race conditions while pushing in the new events.
			// Without this, _read() would be called again from inside each push(),
			// resulting in multiple parallel calls to fetchEvents().
			// See https://github.com/nodejs/node/issues/3203
			const wasPaused = self.isPaused();
			self.pause();

			// Push all of the events into the stream.
			events.forEach((event: any /* FIXME */) => {
				self.push(event);
			});

			if (!wasPaused) {
				// This will deliver the events and trigger the next call to _read() once they have been consumed.
				self.resume();
			}
		});
	}
}

export = EnterpriseEventStream;