/**
* @fileoverview Event stream backed by the events API
*/
// ------------------------------------------------------------------------------
// Requirements
// ------------------------------------------------------------------------------
import { Promise } from 'bluebird';
import qs from 'querystring';
import { Readable } from 'stream';
import util from 'util';
import BoxClient from './box-client';
// ------------------------------------------------------------------------------
// Typedefs
// ------------------------------------------------------------------------------
type Options = {
retryDelay: number;
deduplicationFilterSize: number;
fetchInterval: number;
};
type LongPollInfo = {
max_retries: number;
retry_timeout: number;
url: string;
};
// ------------------------------------------------------------------------------
// Private
// ------------------------------------------------------------------------------
const DEFAULT_OPTIONS: Options = Object.freeze({
deduplicationFilterSize: 5000,
retryDelay: 1000,
fetchInterval: 1000,
});
// ------------------------------------------------------------------------------
// Public
// ------------------------------------------------------------------------------
/**
* Stream of Box events from a given client and point in time.
* @param {BoxClient} client The client to use to get events
* @param {string} streamPosition The point in time to start at
* @param {Object} [options] Optional parameters
* @param {int} [options.retryDelay=1000] Number of ms to wait before retrying after an error
* @param {int} [options.deduplicationFilterSize=5000] Number of IDs to track for deduplication
* @param {int} [options.fetchInterval=1000] Minimunm number of ms between calls for more events
* @constructor
* @extends Readable
*/
class EventStream extends Readable {
_client: BoxClient;
_streamPosition: string;
_longPollInfo?: LongPollInfo;
_longPollRetries: number;
_dedupHash: Record<string, boolean>;
_rateLimiter: Promise<any>;
_options: Options;
_retryTimer?: NodeJS.Timeout | number;
constructor(
client: BoxClient,
streamPosition: string,
options?: Partial<Options>
) {
super({
objectMode: true,
});
/**
* @var {BoxClient} The client for making API calls
* @private
*/
this._client = client;
/**
* @var {string} The latest stream position
* @private
*/
this._streamPosition = streamPosition;
/**
* @var {?Object} The information for how to long poll
* @private
*/
this._longPollInfo = undefined;
/**
* @var {int} The number of long poll requests we've made against one URL so far
* @private
*/
this._longPollRetries = 0;
/**
* @var {Object.<string, boolean>} Hash of event IDs we've already pushed
* @private
*/
this._dedupHash = {};
/**
* Rate limiting promise to ensure that events are not fetched too often,
* initially resolved to allow an immediate API call.
* @var {Promise}
* @private
*/
this._rateLimiter = Promise.resolve();
this._options = Object.assign({}, DEFAULT_OPTIONS, options);
}
/**
* Retrieve the url and params for long polling for new updates
* @returns {Promise} Promise for testing purposes
* @private
*/
getLongPollInfo() {
if (this.destroyed) {
return Promise.resolve(false);
}
return this._client.events
.getLongPollInfo()
.then((longPollInfo: LongPollInfo) => {
// On getting new long poll info, reset everything
this._longPollInfo = longPollInfo;
this._longPollRetries = 0;
return this.doLongPoll();
})
.catch((err: any /* FIXME */) => {
this.emit('error', err);
// Only retry on resolvable errors
if (!err.authExpired) {
this.retryPollInfo();
}
});
}
/**
* Long poll for notification of new events. We do this rather than
* polling for the events directly in order to minimize the number of API
* calls necessary.
* @returns {Promise} Promise for testing pruposes
* @private
*/
doLongPoll() {
if (this.destroyed) {
return Promise.resolve(false);
}
// If we're over the max number of retries, reset
if (this._longPollRetries > this._longPollInfo!.max_retries) {
return this.getLongPollInfo();
}
var url = this._longPollInfo!.url,
qsDelim = url.indexOf('?'),
query = {};
// Break out the query params, otherwise the request URL gets messed up
if (qsDelim > 0) {
query = qs.parse(url.substr(qsDelim + 1));
url = url.substr(0, qsDelim);
}
(query as Record<string, any>).stream_position = this._streamPosition;
var options = {
qs: query,
timeout: this._longPollInfo!.retry_timeout * 1000,
};
this._longPollRetries += 1;
return this._client
.wrapWithDefaultHandler(this._client.get)(url, options)
.then((data: any /* FIXME */) => {
if (this.destroyed) {
return false;
}
if (data.message === 'reconnect') {
return this.getLongPollInfo();
}
// We don't expect any messages other than reconnect and new_change, so if
// we get one just retry the long poll
if (data.message !== 'new_change') {
return this.doLongPoll();
}
return this.fetchEvents();
})
.catch(() => {
this.retryPollInfo();
});
}
/**
* Retries long-polling after a delay.
* Does not attempt if stream is already destroyed.
* @returns {void}
* @private
*/
retryPollInfo() {
if (!this.destroyed) {
this._retryTimer = setTimeout(
() => this.getLongPollInfo(),
this._options.retryDelay
);
}
}
/**
* Fetch the latest group of events and push them into the stream
* @returns {Promise} Promise for testing purposes
* @private
*/
fetchEvents() {
if (this.destroyed) {
return Promise.resolve(false);
}
var eventParams = {
stream_position: this._streamPosition,
limit: 500,
};
// Get new events after the rate limiter expires
return this._rateLimiter.then(() =>
this._client.events
.get(eventParams)
.then((events: any /* FIXME */) => {
// Reset the rate limiter
this._rateLimiter = Promise.delay(this._options.fetchInterval);
// If the response wasn't what we expected, re-poll
if (!events.entries || !events.next_stream_position) {
return this.doLongPoll();
}
this._streamPosition = events.next_stream_position;
// De-duplicate the fetched events, since the API often returns
// the same events at multiple subsequent stream positions
var newEvents = events.entries.filter(
(event: any /* FIXME */) => !this._dedupHash[event.event_id]
);
// If there aren't any non-duplicate events, go back to polling
if (newEvents.length === 0) {
return this.doLongPoll();
}
// Pause the stream to avoid race conditions while pushing in the new events.
// Without this, _read() would be called again from inside each push(),
// resulting in multiple parallel calls to fetchEvents().
// See https://github.com/nodejs/node/issues/3203
var wasPaused = this.isPaused();
this.pause();
// Push new events into the stream
newEvents.forEach((event: any /* FIXME */) => {
this._dedupHash[event.event_id] = true;
this.push(event);
});
if (!wasPaused) {
// This will deliver the events and trigger the next call to _read() once they have been consumed.
this.resume();
}
// Once the deduplication filter gets too big, clean it up
if (
Object.keys(this._dedupHash).length >=
this._options.deduplicationFilterSize
) {
this.cleanupDedupFilter(events.entries);
}
return true;
})
.catch((err: any /* FIXME */) => {
this.emit('error', err);
this.retryPollInfo();
})
);
}
/**
* Clean up the deduplication filter, to prevent it from growing
* too big and eating up memory. We look at the latest set of events
* returned and assume that any IDs not in that set don't need to be
* tracked for deduplication any more.
* @param {Object[]} latestEvents The latest events from the API
* @returns {void}
* @private
*/
cleanupDedupFilter(latestEvents: any /* FIXME */) {
var dedupIDs = Object.keys(this._dedupHash);
dedupIDs.forEach((eventID) => {
var isEventCleared = !latestEvents.find(
(e: any /* FIXME */) => e.event_id === eventID
);
if (isEventCleared) {
delete this._dedupHash[eventID];
}
});
}
/**
* Implementation of the stream-internal read function. This is called
* by the stream whenever it needs more data, and will not be called again
* until data is pushed into the stream.
* @returns {void}
* @private
*/
_read() {
// Start the process of getting new events
this.getLongPollInfo();
}
/**
* Implementation of stream-internal `_destroy` function (v8.0.0 and later).
* Called by stream consumers to effectively stop polling via the public
* `destroy()`.
* @returns {void}
* @private
*/
_destroy() {
clearTimeout(this._retryTimer as number);
delete this._retryTimer;
}
}
// backwards-compat for Node.js pre-v8.0.0
/* istanbul ignore if */
if (typeof Readable.prototype.destroy !== 'function') {
/**
* Destroys the stream. Rough polyfill for `Readable#destroy`.
* @returns {void}
* @public
*/
EventStream.prototype.destroy = function (error?: Error | undefined) {
if (!this.destroyed) {
process.nextTick(() => {
this.emit('close');
});
this.destroyed = true;
this._destroy();
}
return this;
};
}
export = EventStream;