/**
* @fileoverview Enterprise event stream backed by the enterprise events API
*/
// ------------------------------------------------------------------------------
// Requirements
// ------------------------------------------------------------------------------
import { Readable } from 'stream';
import BoxClient from './box-client';
// ------------------------------------------------------------------------------
// Typedefs
// ------------------------------------------------------------------------------
type Options = {
streamPosition?: string;
startDate?: string;
endDate?: string;
eventTypeFilter?: EventType[];
pollingInterval?: number;
chunkSize?: number;
streamType?: 'admin_logs' | 'admin_logs_streaming';
};
type EventType = string /* FIXME */;
// ------------------------------------------------------------------------------
// Private
// ------------------------------------------------------------------------------
const DEFAULT_OPTIONS = Object.freeze({
pollingInterval: 60, // seconds
chunkSize: 500,
streamType: 'admin_logs'
});
// ------------------------------------------------------------------------------
// Public
// ------------------------------------------------------------------------------
/**
* Stream of Box enterprise events.
*
* By default, the stream starts from the current time.
* Pass 'startDate' to start from a specific time.
* Pass 'streamPosition' to start from a previous stream position, or '0' for all available past events (~1 year).
* Once the stream catches up to the current time, it will begin polling every 'pollingInterval' seconds.
* If 'pollingInterval' = 0, then the stream will end when it catches up to the current time (no polling).
*
* @param {BoxClient} client - The client to use to get events
* @param {Object} [options] - Options
* @param {string} [options.streamPosition] - The stream position to start from (pass '0' for all past events)
* @param {string} [options.startDate] - The date to start from
* @param {string} [options.endDate] - The date to end at
* @param {EventType[]} [options.eventTypeFilter] - Array of event types to return
* @param {int} [options.pollingInterval=60] - Polling interval (in seconds). Pass 0 for no polling.
* @param {int} [options.chunkSize=500] - Number of events to fetch per call (max = 500)
* @constructor
* @extends Readable
*/
class EnterpriseEventStream extends Readable {
_client: BoxClient;
_options: Options & Required<Pick<Options, 'pollingInterval' | 'chunkSize'>>;
_streamPosition?: string;
constructor(client: BoxClient, options?: Options) {
super({
objectMode: true,
});
/**
* @var {BoxClient} - The client for making API calls
* @private
*/
this._client = client;
/**
* @var {Object} - Options
* @private
*/
this._options = Object.assign({}, DEFAULT_OPTIONS, options);
// Handle the case where the caller passes streamPosition = 0 instead of streamPosition = '0'.
if (
this._options.streamType === 'admin_logs' &&
!this._options.startDate &&
!this._options.streamPosition &&
(this._options.streamPosition as any) !== 0
) {
// If neither startDate nor streamPosition is specified, start from the current time.
this._options.startDate = new Date()
.toISOString()
.replace(/\.000Z$/, '-00:00');
}
/**
* @var {?string} - The current stream position
* @private
*/
this._streamPosition = this._options.streamPosition;
}
/**
* @returns {?number} - Returns null if no events have been fetched from Box yet.
*/
getStreamPosition() {
return this._streamPosition;
}
/**
* Get the stream state.
*
* @returns {Object} - The stream state
*/
getStreamState() {
// We need to return both streamPosition and startDate, since streamPosition will be null until
// the first set of events is returned from Box.
return {
streamPosition: this._streamPosition,
startDate: this._options.startDate,
endDate: this._options.endDate,
eventTypeFilter: this._options.eventTypeFilter,
};
}
/**
* Set the stream state.
*
* @param {Object} state - The stream state
* @returns {void}
*/
setStreamState(
state: Pick<
Options,
'streamPosition' | 'startDate' | 'endDate' | 'eventTypeFilter'
>
) {
// We need to set both streamPosition and startDate, since streamPosition will be null until
// the first set of events is returned from Box.
this._streamPosition = state.streamPosition;
this._options.startDate = state.startDate;
this._options.endDate = state.endDate;
this._options.eventTypeFilter = state.eventTypeFilter;
}
/**
* Fetch the next chunk of events
*
* If there are no events, poll until events are available.
* If an error occurs, emit the error but continuing polling as usual.
* @param {Function} callback - Passed the array of events
* @returns {void}
* @private
*/
fetchEvents(callback: Function) {
const self = this,
params: {
stream_type?: 'admin_logs' | 'admin_logs_streaming';
stream_position?: string;
created_after?: string;
created_before?: string;
event_type?: string;
limit?: number;
} = {
stream_type: this._options.streamType
};
// Use the current stream position.
// Handle the case where the caller passes streamPosition === 0 instead of streamPosition === '0'.
if (this._streamPosition || (this._streamPosition as any) === 0) {
params.stream_position = this._streamPosition;
}
if (this._options.streamType === 'admin_logs' && this._options.startDate) {
params.created_after = this._options.startDate;
}
if (this._options.streamType === 'admin_logs' && this._options.endDate) {
params.created_before = this._options.endDate;
}
if (this._options.eventTypeFilter) {
params.event_type = this._options.eventTypeFilter.join(',');
}
if (this._options.chunkSize) {
params.limit = this._options.chunkSize;
}
this._client.events.get(params, (
err: any /* FIXME */,
result: any /* FIXME */
) => {
if (err) {
self.emit('error', err);
// If there was a "permanent" error, we would call the callback with it here.
// But it's not clear which errors are truly permanent?
// If Box is down or returning errors for an extended period, we still want to resume when it recovers.
// So, continue polling at the regular frequency.
// Don't use a shorter retry interval (to avoid DDOSing Box).
}
if (err || !result || !result.entries || result.entries.length === 0) {
if (!self._options.pollingInterval) {
// If polling is disabled, end the stream.
callback();
return;
}
// There were no events returned (or an error occurred), so schedule another poll.
const delay = self._options.pollingInterval * 1000;
// Stream readers can use this to flush buffered events to a downstream system.
self.emit('wait', delay);
setTimeout(() => {
self.fetchEvents(callback);
}, delay);
return;
}
// Only update the stream position if there were events returned.
// The API currently returns next_stream_position = 0 if there are no events (may be a bug?).
// But we don't want to start over at the beginning in that case, so ignore it.
self._streamPosition = result.next_stream_position;
// Notify the reader of the new stream position.
// Stream readers can respond to the 'newStreamState' event to persist the stream state.
self.emit('newStreamState', self.getStreamState());
callback(null, result.entries);
});
}
/**
* Implementation of the stream-internal read function. This is called
* by the stream whenever it needs more data, and will not be called again
* until data is pushed into the stream.
* @returns {void}
* @private
*/
_read() {
// Fetch the next chunk of events.
const self = this;
// This will poll forever until events are available.
this.fetchEvents((err: any /* FIXME */, events: any /* FIXME */) => {
if (err || !events || events.length === 0) {
// Close the stream if there was a "permanent" failure or we reached the end of the events.
self.push(null);
return;
}
// Pause the stream to avoid race conditions while pushing in the new events.
// Without this, _read() would be called again from inside each push(),
// resulting in multiple parallel calls to fetchEvents().
// See https://github.com/nodejs/node/issues/3203
const wasPaused = self.isPaused();
self.pause();
// Push all of the events into the stream.
events.forEach((event: any /* FIXME */) => {
self.push(event);
});
if (!wasPaused) {
// This will deliver the events and trigger the next call to _read() once they have been consumed.
self.resume();
}
});
}
}
export = EnterpriseEventStream;