Source: chunked-uploader.ts

  1. /**
  2. * @fileoverview Upload manager for large file uploads
  3. */
  4. import { Promise } from 'bluebird';
  5. // -----------------------------------------------------------------------------
  6. // Typedefs
  7. // -----------------------------------------------------------------------------
  8. /**
  9. * Chunk uploaded event
  10. * @event Chunk#uploaded
  11. * @param {UploadPart} data The data of the uploaded chunk
  12. * @private
  13. */
  14. /**
  15. * Chunk error event
  16. * @event Chunk#error
  17. * @param {Error} err The error that occurred
  18. * @private
  19. */
  20. /**
  21. * Event for when the upload is successfully aborted
  22. * @event ChunkedUploader#aborted
  23. */
  24. /**
  25. * Event for when the abort fails because the upload session is not destroyed.
  26. * In general, the abort can be retried, and no new chunks will be uploaded.
  27. * @event ChunkedUploader#abortFailed
  28. * @param {Error} err The error that occurred
  29. */
  30. /**
  31. * Event for when a chunk fails to upload. Note that the chunk will automatically
  32. * retry until it is successfully uploaded.
  33. * @event ChunkedUploader#chunkError
  34. * @param {Error} err The error that occurred during chunk upload
  35. */
  36. /**
  37. * Event for when a chunk is successfully uploaded
  38. * @event ChunkedUploader#chunkUploaded
  39. * @param {UploadPart} data The data for the uploaded chunk
  40. */
  41. /**
  42. * Event for when the entire upload is complete
  43. * @event ChunkedUploader#uploadComplete
  44. * @param {Object} file The file object for the newly-uploaded file
  45. */
  46. /**
  47. * Event for when an upload fails
  48. * @event ChunkedUploader#error
  49. * @param {Error} err The error that occurred
  50. */
  51. type ChunkedUploaderOptions = {
  52. retryInterval?: number;
  53. parallelism?: number;
  54. fileAttributes?: Record<string, any>;
  55. };
  56. type UploadSessionInfo = {
  57. id: string;
  58. part_size: number;
  59. };
  60. // -----------------------------------------------------------------------------
  61. // Requirements
  62. // -----------------------------------------------------------------------------
  63. import { EventEmitter } from 'events';
  64. import { Readable as ReadableStream } from 'stream';
  65. import crypto from 'crypto';
  66. import BoxClient from './box-client';
  67. // -----------------------------------------------------------------------------
  68. // Private
  69. // -----------------------------------------------------------------------------
  70. const DEFAULT_OPTIONS = Object.freeze({
  71. parallelism: 4,
  72. retryInterval: 1000,
  73. });
  74. /**
  75. * Chunk of a file to be uploaded, which handles trying to upload itself until
  76. * it succeeds.
  77. * @private
  78. */
  79. class Chunk extends EventEmitter {
  80. client: BoxClient;
  81. sessionID: string;
  82. chunk: Buffer | string | null;
  83. length: number;
  84. offset: number;
  85. totalSize: number;
  86. options: ChunkedUploaderOptions;
  87. data: any /* FIXME */;
  88. retry: number | NodeJS.Timeout | null;
  89. canceled: boolean;
  90. /**
  91. * Create a Chunk, representing a part of a file being uploaded
  92. * @param {BoxClient} client The Box SDK client
  93. * @param {string} sessionID The ID of the upload session the chunk belongs to
  94. * @param {Buffer|string} chunk The chunk that was uploaded
  95. * @param {int} offset The byte offset within the file where this chunk begins
  96. * @param {int} totalSize The total size of the file this chunk belongs to
  97. * @param {Object} options The options from the ChunkedUploader
  98. * @param {int} options.retryInterval The number of ms to wait before retrying a chunk upload
  99. */
  100. constructor(
  101. client: BoxClient,
  102. sessionID: string,
  103. chunk: Buffer | string,
  104. offset: number,
  105. totalSize: number,
  106. options: ChunkedUploaderOptions
  107. ) {
  108. super();
  109. this.client = client;
  110. this.sessionID = sessionID;
  111. this.chunk = chunk;
  112. this.length = chunk.length;
  113. this.offset = offset;
  114. this.totalSize = totalSize;
  115. this.options = options;
  116. this.data = null;
  117. this.retry = null;
  118. this.canceled = false;
  119. }
  120. /**
  121. * Get the final object representation of this chunk for the API
  122. * @returns {UploadPart} The chunk object
  123. */
  124. getData() {
  125. return this.data.part;
  126. }
  127. /**
  128. * Upload a chunk to the API
  129. * @returns {void}
  130. * @emits Chunk#uploaded
  131. * @emits Chunk#error
  132. */
  133. upload() {
  134. this.client.files.uploadPart(
  135. this.sessionID,
  136. this.chunk!,
  137. this.offset,
  138. this.totalSize,
  139. (err: any /* FIXME */, data: any /* FIXME */) => {
  140. if (this.canceled) {
  141. this.chunk = null;
  142. return;
  143. }
  144. if (err) {
  145. // handle the error or retry
  146. if (err.statusCode) {
  147. // an API error, probably not retryable!
  148. this.emit('error', err);
  149. } else {
  150. // maybe a network error, retry
  151. this.retry = setTimeout(
  152. () => this.upload(),
  153. this.options.retryInterval
  154. );
  155. }
  156. return;
  157. }
  158. // Record the chunk data for commit, and try to free up the chunk buffer
  159. this.data = data;
  160. this.chunk = null;
  161. this.emit('uploaded', data);
  162. }
  163. );
  164. }
  165. /**
  166. * Cancel trying to upload a chunk, preventing it from retrying and clearing
  167. * the associated buffer
  168. * @returns {void}
  169. */
  170. cancel() {
  171. clearTimeout(this.retry as any); // number or NodeJS.Timeout
  172. this.chunk = null;
  173. this.canceled = true;
  174. }
  175. }
  176. // -----------------------------------------------------------------------------
  177. // Public
  178. // -----------------------------------------------------------------------------
  179. /** Manager for uploading a file in chunks */
  180. class ChunkedUploader extends EventEmitter {
  181. _client: BoxClient;
  182. _sessionID: string;
  183. _partSize: number;
  184. _uploadSessionInfo: UploadSessionInfo;
  185. _stream!: ReadableStream | null;
  186. _streamBuffer!: Array<any>;
  187. _file!: Buffer | string | null;
  188. _size: number;
  189. _options: Required<ChunkedUploaderOptions>;
  190. _isStarted: boolean;
  191. _numChunksInFlight: number;
  192. _chunks: Array<any>;
  193. _position: number;
  194. _fileHash: crypto.Hash;
  195. _promise?: Promise<any>;
  196. _resolve?: Function;
  197. _reject?: Function;
  198. /**
  199. * Create an upload manager
  200. * @param {BoxClient} client The client to use to upload the file
  201. * @param {Object} uploadSessionInfo The upload session info to use for chunked upload
  202. * @param {ReadableStream|Buffer|string} file The file to upload
  203. * @param {int} size The size of the file to be uploaded
  204. * @param {Object} [options] Optional parameters
  205. * @param {int} [options.retryInterval=1000] The number of ms to wait before retrying operations
  206. * @param {int} [options.parallelism=4] The number of concurrent chunks to upload
  207. * @param {Object} [options.fileAttributes] Attributes to set on the file during commit
  208. */
  209. constructor(
  210. client: BoxClient,
  211. uploadSessionInfo: UploadSessionInfo,
  212. file: ReadableStream | Buffer | string,
  213. size: number,
  214. options?: ChunkedUploaderOptions
  215. ) {
  216. super();
  217. this._client = client;
  218. this._sessionID = uploadSessionInfo.id;
  219. this._partSize = uploadSessionInfo.part_size;
  220. this._uploadSessionInfo = uploadSessionInfo;
  221. if (file instanceof ReadableStream) {
  222. // Pause the stream so we can read specific chunks from it
  223. this._stream = file.pause();
  224. this._streamBuffer = [];
  225. } else if (file instanceof Buffer || typeof file === 'string') {
  226. this._file = file;
  227. } else {
  228. throw new TypeError('file must be a Stream, Buffer, or string!');
  229. }
  230. this._size = size;
  231. this._options = Object.assign(
  232. {},
  233. DEFAULT_OPTIONS,
  234. options
  235. ) as Required<ChunkedUploaderOptions>;
  236. this._isStarted = false;
  237. this._numChunksInFlight = 0;
  238. this._chunks = [];
  239. this._position = 0;
  240. this._fileHash = crypto.createHash('sha1');
  241. }
  242. /**
  243. * Start an upload
  244. * @returns {Promise<Object>} A promise resolving to the uploaded file
  245. */
  246. start() {
  247. if (this._isStarted) {
  248. return this._promise;
  249. }
  250. // Create the initial chunks
  251. for (let i = 0; i < this._options.parallelism; i++) {
  252. this._getNextChunk((chunk: any /* FIXME */) =>
  253. chunk ? this._uploadChunk(chunk) : this._commit()
  254. );
  255. }
  256. this._isStarted = true;
  257. /* eslint-disable promise/avoid-new */
  258. this._promise = new Promise((resolve, reject) => {
  259. this._resolve = resolve;
  260. this._reject = reject;
  261. });
  262. /* eslint-enable promise/avoid-new */
  263. return this._promise;
  264. }
  265. /**
  266. * Abort a running upload, which cancels all currently uploading chunks,
  267. * attempts to free up held memory, and aborts the upload session. This
  268. * cannot be undone or resumed.
  269. * @returns {Promise} A promise resolving when the upload is aborted
  270. * @emits ChunkedUploader#aborted
  271. * @emits ChunkedUploader#abortFailed
  272. */
  273. abort() {
  274. this._chunks.forEach((chunk) => chunk.removeAllListeners().cancel());
  275. this._chunks = [];
  276. this._file = null;
  277. this._stream = null;
  278. return (
  279. this._client.files
  280. .abortUploadSession(this._sessionID)
  281. /* eslint-disable promise/always-return */
  282. .then(() => {
  283. this.emit('aborted');
  284. })
  285. /* eslint-enable promise/always-return */
  286. .catch((err: any /* FIXME */) => {
  287. this.emit('abortFailed', err);
  288. throw err;
  289. })
  290. );
  291. }
  292. /**
  293. * Get the next chunk of the file to be uploaded
  294. * @param {Function} callback Called with the next chunk of the file to be uploaded
  295. * @returns {void}
  296. * @private
  297. */
  298. _getNextChunk(callback: Function) {
  299. if (this._position >= this._size) {
  300. callback(null);
  301. return;
  302. }
  303. let buf;
  304. if (this._file) {
  305. // Buffer/string case, just get the slice we need
  306. buf = this._file.slice(this._position, this._position + this._partSize);
  307. } else if (this._streamBuffer.length > 0) {
  308. buf = this._streamBuffer.shift();
  309. } else {
  310. // Stream case, need to read
  311. buf = (this._stream as ReadableStream).read(this._partSize);
  312. if (!buf) {
  313. // stream needs to read more, retry later
  314. setImmediate(() => this._getNextChunk(callback));
  315. return;
  316. } else if (buf.length > this._partSize) {
  317. // stream is done reading and had extra data, buffer the remainder of the file
  318. for (let i = 0; i < buf.length; i += this._partSize) {
  319. this._streamBuffer.push(buf.slice(i, i + this._partSize));
  320. }
  321. buf = this._streamBuffer.shift();
  322. }
  323. }
  324. this._fileHash.update(buf);
  325. let chunk = new Chunk(
  326. this._client,
  327. this._sessionID,
  328. buf,
  329. this._position,
  330. this._size,
  331. this._options
  332. );
  333. this._position += buf.length;
  334. callback(chunk);
  335. }
  336. /**
  337. * Upload a chunk
  338. * @param {Chunk} chunk The chunk to upload
  339. * @returns {void}
  340. * @emits ChunkedUploader#chunkError
  341. * @emits ChunkedUploader#chunkUploaded
  342. */
  343. _uploadChunk(chunk: any /* FIXME */) {
  344. this._numChunksInFlight += 1;
  345. chunk.on('error', (err: any /* FIXME */) => this.emit('chunkError', err));
  346. chunk.on('uploaded', (data: any /* FIXME */) => {
  347. this._numChunksInFlight -= 1;
  348. this.emit('chunkUploaded', data);
  349. this._getNextChunk((nextChunk: any /* FIXME */) =>
  350. nextChunk ? this._uploadChunk(nextChunk) : this._commit()
  351. );
  352. });
  353. chunk.upload();
  354. this._chunks.push(chunk);
  355. }
  356. /**
  357. * Commit the upload, finalizing it
  358. * @returns {void}
  359. * @emits ChunkedUploader#uploadComplete
  360. * @emits ChunkedUploader#error
  361. */
  362. _commit() {
  363. if (!this._isStarted || this._numChunksInFlight > 0) {
  364. return;
  365. }
  366. let hash = this._fileHash.digest('base64');
  367. this._isStarted = false;
  368. let options = Object.assign(
  369. {
  370. parts: this._chunks.map((c) => c.getData()),
  371. },
  372. this._options.fileAttributes
  373. );
  374. this._client.files.commitUploadSession(this._sessionID, hash, options, (
  375. err: any /* FIMXE */,
  376. file: any /* FIMXE */
  377. ) => {
  378. // It's not clear what the SDK can do here, so we just return the error and session info
  379. // so users can retry if they wish
  380. if (err) {
  381. this.emit('error', {
  382. uploadSession: this._uploadSessionInfo,
  383. error: err,
  384. });
  385. this._reject!(err);
  386. return;
  387. }
  388. this.emit('uploadComplete', file);
  389. this._resolve!(file);
  390. });
  391. }
  392. }
  393. export = ChunkedUploader;