Source: enterprise-event-stream.ts

  1. /**
  2. * @fileoverview Enterprise event stream backed by the enterprise events API
  3. */
  4. // ------------------------------------------------------------------------------
  5. // Requirements
  6. // ------------------------------------------------------------------------------
  7. import { Readable } from 'stream';
  8. import BoxClient from './box-client';
  9. // ------------------------------------------------------------------------------
  10. // Typedefs
  11. // ------------------------------------------------------------------------------
  12. type Options = {
  13. streamPosition?: string;
  14. startDate?: string;
  15. endDate?: string;
  16. eventTypeFilter?: EventType[];
  17. pollingInterval?: number;
  18. chunkSize?: number;
  19. streamType?: 'admin_logs' | 'admin_logs_streaming';
  20. };
  21. type EventType = string /* FIXME */;
  22. // ------------------------------------------------------------------------------
  23. // Private
  24. // ------------------------------------------------------------------------------
  25. const DEFAULT_OPTIONS = Object.freeze({
  26. pollingInterval: 60, // seconds
  27. chunkSize: 500,
  28. streamType: 'admin_logs'
  29. });
  30. // ------------------------------------------------------------------------------
  31. // Public
  32. // ------------------------------------------------------------------------------
  33. /**
  34. * Stream of Box enterprise events.
  35. *
  36. * By default, the stream starts from the current time.
  37. * Pass 'startDate' to start from a specific time.
  38. * Pass 'streamPosition' to start from a previous stream position, or '0' for all available past events (~1 year).
  39. * Once the stream catches up to the current time, it will begin polling every 'pollingInterval' seconds.
  40. * If 'pollingInterval' = 0, then the stream will end when it catches up to the current time (no polling).
  41. *
  42. * @param {BoxClient} client - The client to use to get events
  43. * @param {Object} [options] - Options
  44. * @param {string} [options.streamPosition] - The stream position to start from (pass '0' for all past events)
  45. * @param {string} [options.startDate] - The date to start from
  46. * @param {string} [options.endDate] - The date to end at
  47. * @param {EventType[]} [options.eventTypeFilter] - Array of event types to return
  48. * @param {int} [options.pollingInterval=60] - Polling interval (in seconds). Pass 0 for no polling.
  49. * @param {int} [options.chunkSize=500] - Number of events to fetch per call (max = 500)
  50. * @constructor
  51. * @extends Readable
  52. */
  53. class EnterpriseEventStream extends Readable {
  54. _client: BoxClient;
  55. _options: Options & Required<Pick<Options, 'pollingInterval' | 'chunkSize'>>;
  56. _streamPosition?: string;
  57. constructor(client: BoxClient, options?: Options) {
  58. super({
  59. objectMode: true,
  60. });
  61. /**
  62. * @var {BoxClient} - The client for making API calls
  63. * @private
  64. */
  65. this._client = client;
  66. /**
  67. * @var {Object} - Options
  68. * @private
  69. */
  70. this._options = Object.assign({}, DEFAULT_OPTIONS, options);
  71. // Handle the case where the caller passes streamPosition = 0 instead of streamPosition = '0'.
  72. if (
  73. this._options.streamType === 'admin_logs' &&
  74. !this._options.startDate &&
  75. !this._options.streamPosition &&
  76. (this._options.streamPosition as any) !== 0
  77. ) {
  78. // If neither startDate nor streamPosition is specified, start from the current time.
  79. this._options.startDate = new Date()
  80. .toISOString()
  81. .replace(/\.000Z$/, '-00:00');
  82. }
  83. /**
  84. * @var {?string} - The current stream position
  85. * @private
  86. */
  87. this._streamPosition = this._options.streamPosition;
  88. }
  89. /**
  90. * @returns {?number} - Returns null if no events have been fetched from Box yet.
  91. */
  92. getStreamPosition() {
  93. return this._streamPosition;
  94. }
  95. /**
  96. * Get the stream state.
  97. *
  98. * @returns {Object} - The stream state
  99. */
  100. getStreamState() {
  101. // We need to return both streamPosition and startDate, since streamPosition will be null until
  102. // the first set of events is returned from Box.
  103. return {
  104. streamPosition: this._streamPosition,
  105. startDate: this._options.startDate,
  106. endDate: this._options.endDate,
  107. eventTypeFilter: this._options.eventTypeFilter,
  108. };
  109. }
  110. /**
  111. * Set the stream state.
  112. *
  113. * @param {Object} state - The stream state
  114. * @returns {void}
  115. */
  116. setStreamState(
  117. state: Pick<
  118. Options,
  119. 'streamPosition' | 'startDate' | 'endDate' | 'eventTypeFilter'
  120. >
  121. ) {
  122. // We need to set both streamPosition and startDate, since streamPosition will be null until
  123. // the first set of events is returned from Box.
  124. this._streamPosition = state.streamPosition;
  125. this._options.startDate = state.startDate;
  126. this._options.endDate = state.endDate;
  127. this._options.eventTypeFilter = state.eventTypeFilter;
  128. }
  129. /**
  130. * Fetch the next chunk of events
  131. *
  132. * If there are no events, poll until events are available.
  133. * If an error occurs, emit the error but continuing polling as usual.
  134. * @param {Function} callback - Passed the array of events
  135. * @returns {void}
  136. * @private
  137. */
  138. fetchEvents(callback: Function) {
  139. const self = this,
  140. params: {
  141. stream_type?: 'admin_logs' | 'admin_logs_streaming';
  142. stream_position?: string;
  143. created_after?: string;
  144. created_before?: string;
  145. event_type?: string;
  146. limit?: number;
  147. } = {
  148. stream_type: this._options.streamType
  149. };
  150. // Use the current stream position.
  151. // Handle the case where the caller passes streamPosition === 0 instead of streamPosition === '0'.
  152. if (this._streamPosition || (this._streamPosition as any) === 0) {
  153. params.stream_position = this._streamPosition;
  154. }
  155. if (this._options.streamType === 'admin_logs' && this._options.startDate) {
  156. params.created_after = this._options.startDate;
  157. }
  158. if (this._options.streamType === 'admin_logs' && this._options.endDate) {
  159. params.created_before = this._options.endDate;
  160. }
  161. if (this._options.eventTypeFilter) {
  162. params.event_type = this._options.eventTypeFilter.join(',');
  163. }
  164. if (this._options.chunkSize) {
  165. params.limit = this._options.chunkSize;
  166. }
  167. this._client.events.get(params, (
  168. err: any /* FIXME */,
  169. result: any /* FIXME */
  170. ) => {
  171. if (err) {
  172. self.emit('error', err);
  173. // If there was a "permanent" error, we would call the callback with it here.
  174. // But it's not clear which errors are truly permanent?
  175. // If Box is down or returning errors for an extended period, we still want to resume when it recovers.
  176. // So, continue polling at the regular frequency.
  177. // Don't use a shorter retry interval (to avoid DDOSing Box).
  178. }
  179. if (err || !result || !result.entries || result.entries.length === 0) {
  180. if (!self._options.pollingInterval) {
  181. // If polling is disabled, end the stream.
  182. callback();
  183. return;
  184. }
  185. // There were no events returned (or an error occurred), so schedule another poll.
  186. const delay = self._options.pollingInterval * 1000;
  187. // Stream readers can use this to flush buffered events to a downstream system.
  188. self.emit('wait', delay);
  189. setTimeout(() => {
  190. self.fetchEvents(callback);
  191. }, delay);
  192. return;
  193. }
  194. // Only update the stream position if there were events returned.
  195. // The API currently returns next_stream_position = 0 if there are no events (may be a bug?).
  196. // But we don't want to start over at the beginning in that case, so ignore it.
  197. self._streamPosition = result.next_stream_position;
  198. // Notify the reader of the new stream position.
  199. // Stream readers can respond to the 'newStreamState' event to persist the stream state.
  200. self.emit('newStreamState', self.getStreamState());
  201. callback(null, result.entries);
  202. });
  203. }
  204. /**
  205. * Implementation of the stream-internal read function. This is called
  206. * by the stream whenever it needs more data, and will not be called again
  207. * until data is pushed into the stream.
  208. * @returns {void}
  209. * @private
  210. */
  211. _read() {
  212. // Fetch the next chunk of events.
  213. const self = this;
  214. // This will poll forever until events are available.
  215. this.fetchEvents((err: any /* FIXME */, events: any /* FIXME */) => {
  216. if (err || !events || events.length === 0) {
  217. // Close the stream if there was a "permanent" failure or we reached the end of the events.
  218. self.push(null);
  219. return;
  220. }
  221. // Pause the stream to avoid race conditions while pushing in the new events.
  222. // Without this, _read() would be called again from inside each push(),
  223. // resulting in multiple parallel calls to fetchEvents().
  224. // See https://github.com/nodejs/node/issues/3203
  225. const wasPaused = self.isPaused();
  226. self.pause();
  227. // Push all of the events into the stream.
  228. events.forEach((event: any /* FIXME */) => {
  229. self.push(event);
  230. });
  231. if (!wasPaused) {
  232. // This will deliver the events and trigger the next call to _read() once they have been consumed.
  233. self.resume();
  234. }
  235. });
  236. }
  237. }
  238. export = EnterpriseEventStream;