Source: event-stream.ts

  1. /**
  2. * @fileoverview Event stream backed by the events API
  3. */
  4. // ------------------------------------------------------------------------------
  5. // Requirements
  6. // ------------------------------------------------------------------------------
  7. import { Promise } from 'bluebird';
  8. import qs from 'querystring';
  9. import { Readable } from 'stream';
  10. import util from 'util';
  11. import BoxClient from './box-client';
  12. // ------------------------------------------------------------------------------
  13. // Typedefs
  14. // ------------------------------------------------------------------------------
  15. type Options = {
  16. retryDelay: number;
  17. deduplicationFilterSize: number;
  18. fetchInterval: number;
  19. };
  20. type LongPollInfo = {
  21. max_retries: number;
  22. retry_timeout: number;
  23. url: string;
  24. };
  25. // ------------------------------------------------------------------------------
  26. // Private
  27. // ------------------------------------------------------------------------------
  28. const DEFAULT_OPTIONS: Options = Object.freeze({
  29. deduplicationFilterSize: 5000,
  30. retryDelay: 1000,
  31. fetchInterval: 1000,
  32. });
  33. // ------------------------------------------------------------------------------
  34. // Public
  35. // ------------------------------------------------------------------------------
  36. /**
  37. * Stream of Box events from a given client and point in time.
  38. * @param {BoxClient} client The client to use to get events
  39. * @param {string} streamPosition The point in time to start at
  40. * @param {Object} [options] Optional parameters
  41. * @param {int} [options.retryDelay=1000] Number of ms to wait before retrying after an error
  42. * @param {int} [options.deduplicationFilterSize=5000] Number of IDs to track for deduplication
  43. * @param {int} [options.fetchInterval=1000] Minimunm number of ms between calls for more events
  44. * @constructor
  45. * @extends Readable
  46. */
  47. class EventStream extends Readable {
  48. _client: BoxClient;
  49. _streamPosition: string;
  50. _longPollInfo?: LongPollInfo;
  51. _longPollRetries: number;
  52. _dedupHash: Record<string, boolean>;
  53. _rateLimiter: Promise<any>;
  54. _options: Options;
  55. _retryTimer?: NodeJS.Timeout | number;
  56. constructor(
  57. client: BoxClient,
  58. streamPosition: string,
  59. options?: Partial<Options>
  60. ) {
  61. super({
  62. objectMode: true,
  63. });
  64. /**
  65. * @var {BoxClient} The client for making API calls
  66. * @private
  67. */
  68. this._client = client;
  69. /**
  70. * @var {string} The latest stream position
  71. * @private
  72. */
  73. this._streamPosition = streamPosition;
  74. /**
  75. * @var {?Object} The information for how to long poll
  76. * @private
  77. */
  78. this._longPollInfo = undefined;
  79. /**
  80. * @var {int} The number of long poll requests we've made against one URL so far
  81. * @private
  82. */
  83. this._longPollRetries = 0;
  84. /**
  85. * @var {Object.<string, boolean>} Hash of event IDs we've already pushed
  86. * @private
  87. */
  88. this._dedupHash = {};
  89. /**
  90. * Rate limiting promise to ensure that events are not fetched too often,
  91. * initially resolved to allow an immediate API call.
  92. * @var {Promise}
  93. * @private
  94. */
  95. this._rateLimiter = Promise.resolve();
  96. this._options = Object.assign({}, DEFAULT_OPTIONS, options);
  97. }
  98. /**
  99. * Retrieve the url and params for long polling for new updates
  100. * @returns {Promise} Promise for testing purposes
  101. * @private
  102. */
  103. getLongPollInfo() {
  104. if (this.destroyed) {
  105. return Promise.resolve(false);
  106. }
  107. return this._client.events
  108. .getLongPollInfo()
  109. .then((longPollInfo: LongPollInfo) => {
  110. // On getting new long poll info, reset everything
  111. this._longPollInfo = longPollInfo;
  112. this._longPollRetries = 0;
  113. return this.doLongPoll();
  114. })
  115. .catch((err: any /* FIXME */) => {
  116. this.emit('error', err);
  117. // Only retry on resolvable errors
  118. if (!err.authExpired) {
  119. this.retryPollInfo();
  120. }
  121. });
  122. }
  123. /**
  124. * Long poll for notification of new events. We do this rather than
  125. * polling for the events directly in order to minimize the number of API
  126. * calls necessary.
  127. * @returns {Promise} Promise for testing pruposes
  128. * @private
  129. */
  130. doLongPoll() {
  131. if (this.destroyed) {
  132. return Promise.resolve(false);
  133. }
  134. // If we're over the max number of retries, reset
  135. if (this._longPollRetries > this._longPollInfo!.max_retries) {
  136. return this.getLongPollInfo();
  137. }
  138. var url = this._longPollInfo!.url,
  139. qsDelim = url.indexOf('?'),
  140. query = {};
  141. // Break out the query params, otherwise the request URL gets messed up
  142. if (qsDelim > 0) {
  143. query = qs.parse(url.substr(qsDelim + 1));
  144. url = url.substr(0, qsDelim);
  145. }
  146. (query as Record<string, any>).stream_position = this._streamPosition;
  147. var options = {
  148. qs: query,
  149. timeout: this._longPollInfo!.retry_timeout * 1000,
  150. };
  151. this._longPollRetries += 1;
  152. return this._client
  153. .wrapWithDefaultHandler(this._client.get)(url, options)
  154. .then((data: any /* FIXME */) => {
  155. if (this.destroyed) {
  156. return false;
  157. }
  158. if (data.message === 'reconnect') {
  159. return this.getLongPollInfo();
  160. }
  161. // We don't expect any messages other than reconnect and new_change, so if
  162. // we get one just retry the long poll
  163. if (data.message !== 'new_change') {
  164. return this.doLongPoll();
  165. }
  166. return this.fetchEvents();
  167. })
  168. .catch(() => {
  169. this.retryPollInfo();
  170. });
  171. }
  172. /**
  173. * Retries long-polling after a delay.
  174. * Does not attempt if stream is already destroyed.
  175. * @returns {void}
  176. * @private
  177. */
  178. retryPollInfo() {
  179. if (!this.destroyed) {
  180. this._retryTimer = setTimeout(
  181. () => this.getLongPollInfo(),
  182. this._options.retryDelay
  183. );
  184. }
  185. }
  186. /**
  187. * Fetch the latest group of events and push them into the stream
  188. * @returns {Promise} Promise for testing purposes
  189. * @private
  190. */
  191. fetchEvents() {
  192. if (this.destroyed) {
  193. return Promise.resolve(false);
  194. }
  195. var eventParams = {
  196. stream_position: this._streamPosition,
  197. limit: 500,
  198. };
  199. // Get new events after the rate limiter expires
  200. return this._rateLimiter.then(() =>
  201. this._client.events
  202. .get(eventParams)
  203. .then((events: any /* FIXME */) => {
  204. // Reset the rate limiter
  205. this._rateLimiter = Promise.delay(this._options.fetchInterval);
  206. // If the response wasn't what we expected, re-poll
  207. if (!events.entries || !events.next_stream_position) {
  208. return this.doLongPoll();
  209. }
  210. this._streamPosition = events.next_stream_position;
  211. // De-duplicate the fetched events, since the API often returns
  212. // the same events at multiple subsequent stream positions
  213. var newEvents = events.entries.filter(
  214. (event: any /* FIXME */) => !this._dedupHash[event.event_id]
  215. );
  216. // If there aren't any non-duplicate events, go back to polling
  217. if (newEvents.length === 0) {
  218. return this.doLongPoll();
  219. }
  220. // Pause the stream to avoid race conditions while pushing in the new events.
  221. // Without this, _read() would be called again from inside each push(),
  222. // resulting in multiple parallel calls to fetchEvents().
  223. // See https://github.com/nodejs/node/issues/3203
  224. var wasPaused = this.isPaused();
  225. this.pause();
  226. // Push new events into the stream
  227. newEvents.forEach((event: any /* FIXME */) => {
  228. this._dedupHash[event.event_id] = true;
  229. this.push(event);
  230. });
  231. if (!wasPaused) {
  232. // This will deliver the events and trigger the next call to _read() once they have been consumed.
  233. this.resume();
  234. }
  235. // Once the deduplication filter gets too big, clean it up
  236. if (
  237. Object.keys(this._dedupHash).length >=
  238. this._options.deduplicationFilterSize
  239. ) {
  240. this.cleanupDedupFilter(events.entries);
  241. }
  242. return true;
  243. })
  244. .catch((err: any /* FIXME */) => {
  245. this.emit('error', err);
  246. this.retryPollInfo();
  247. })
  248. );
  249. }
  250. /**
  251. * Clean up the deduplication filter, to prevent it from growing
  252. * too big and eating up memory. We look at the latest set of events
  253. * returned and assume that any IDs not in that set don't need to be
  254. * tracked for deduplication any more.
  255. * @param {Object[]} latestEvents The latest events from the API
  256. * @returns {void}
  257. * @private
  258. */
  259. cleanupDedupFilter(latestEvents: any /* FIXME */) {
  260. var dedupIDs = Object.keys(this._dedupHash);
  261. dedupIDs.forEach((eventID) => {
  262. var isEventCleared = !latestEvents.find(
  263. (e: any /* FIXME */) => e.event_id === eventID
  264. );
  265. if (isEventCleared) {
  266. delete this._dedupHash[eventID];
  267. }
  268. });
  269. }
  270. /**
  271. * Implementation of the stream-internal read function. This is called
  272. * by the stream whenever it needs more data, and will not be called again
  273. * until data is pushed into the stream.
  274. * @returns {void}
  275. * @private
  276. */
  277. _read() {
  278. // Start the process of getting new events
  279. this.getLongPollInfo();
  280. }
  281. /**
  282. * Implementation of stream-internal `_destroy` function (v8.0.0 and later).
  283. * Called by stream consumers to effectively stop polling via the public
  284. * `destroy()`.
  285. * @returns {void}
  286. * @private
  287. */
  288. _destroy() {
  289. clearTimeout(this._retryTimer as number);
  290. delete this._retryTimer;
  291. }
  292. }
  293. // backwards-compat for Node.js pre-v8.0.0
  294. /* istanbul ignore if */
  295. if (typeof Readable.prototype.destroy !== 'function') {
  296. /**
  297. * Destroys the stream. Rough polyfill for `Readable#destroy`.
  298. * @returns {void}
  299. * @public
  300. */
  301. EventStream.prototype.destroy = function (error?: Error | undefined) {
  302. if (!this.destroyed) {
  303. process.nextTick(() => {
  304. this.emit('close');
  305. });
  306. this.destroyed = true;
  307. this._destroy();
  308. }
  309. return this;
  310. };
  311. }
  312. export = EventStream;