/**
* @fileoverview Upload manager for large file uploads
*/
import { Promise } from 'bluebird';
// -----------------------------------------------------------------------------
// Typedefs
// -----------------------------------------------------------------------------
/**
* Chunk uploaded event
* @event Chunk#uploaded
* @param {UploadPart} data The data of the uploaded chunk
* @private
*/
/**
* Chunk error event
* @event Chunk#error
* @param {Error} err The error that occurred
* @private
*/
/**
* Event for when the upload is successfully aborted
* @event ChunkedUploader#aborted
*/
/**
* Event for when the abort fails because the upload session is not destroyed.
* In general, the abort can be retried, and no new chunks will be uploaded.
* @event ChunkedUploader#abortFailed
* @param {Error} err The error that occurred
*/
/**
* Event for when a chunk fails to upload. Note that the chunk will automatically
* retry until it is successfully uploaded.
* @event ChunkedUploader#chunkError
* @param {Error} err The error that occurred during chunk upload
*/
/**
* Event for when a chunk is successfully uploaded
* @event ChunkedUploader#chunkUploaded
* @param {UploadPart} data The data for the uploaded chunk
*/
/**
* Event for when the entire upload is complete
* @event ChunkedUploader#uploadComplete
* @param {Object} file The file object for the newly-uploaded file
*/
/**
* Event for when an upload fails
* @event ChunkedUploader#error
* @param {Error} err The error that occurred
*/
type ChunkedUploaderOptions = {
retryInterval?: number;
parallelism?: number;
fileAttributes?: Record<string, any>;
};
type UploadSessionInfo = {
id: string;
part_size: number;
};
// -----------------------------------------------------------------------------
// Requirements
// -----------------------------------------------------------------------------
import { EventEmitter } from 'events';
import { Readable as ReadableStream } from 'stream';
import crypto from 'crypto';
import BoxClient from './box-client';
// -----------------------------------------------------------------------------
// Private
// -----------------------------------------------------------------------------
const DEFAULT_OPTIONS = Object.freeze({
parallelism: 4,
retryInterval: 1000,
});
/**
* Chunk of a file to be uploaded, which handles trying to upload itself until
* it succeeds.
* @private
*/
class Chunk extends EventEmitter {
client: BoxClient;
sessionID: string;
chunk: Buffer | string | null;
length: number;
offset: number;
totalSize: number;
options: ChunkedUploaderOptions;
data: any /* FIXME */;
retry: number | NodeJS.Timeout | null;
canceled: boolean;
/**
* Create a Chunk, representing a part of a file being uploaded
* @param {BoxClient} client The Box SDK client
* @param {string} sessionID The ID of the upload session the chunk belongs to
* @param {Buffer|string} chunk The chunk that was uploaded
* @param {int} offset The byte offset within the file where this chunk begins
* @param {int} totalSize The total size of the file this chunk belongs to
* @param {Object} options The options from the ChunkedUploader
* @param {int} options.retryInterval The number of ms to wait before retrying a chunk upload
*/
constructor(
client: BoxClient,
sessionID: string,
chunk: Buffer | string,
offset: number,
totalSize: number,
options: ChunkedUploaderOptions
) {
super();
this.client = client;
this.sessionID = sessionID;
this.chunk = chunk;
this.length = chunk.length;
this.offset = offset;
this.totalSize = totalSize;
this.options = options;
this.data = null;
this.retry = null;
this.canceled = false;
}
/**
* Get the final object representation of this chunk for the API
* @returns {UploadPart} The chunk object
*/
getData() {
return this.data.part;
}
/**
* Upload a chunk to the API
* @returns {void}
* @emits Chunk#uploaded
* @emits Chunk#error
*/
upload() {
this.client.files.uploadPart(
this.sessionID,
this.chunk!,
this.offset,
this.totalSize,
(err: any /* FIXME */, data: any /* FIXME */) => {
if (this.canceled) {
this.chunk = null;
return;
}
if (err) {
// handle the error or retry
if (err.statusCode) {
// an API error, probably not retryable!
this.emit('error', err);
} else {
// maybe a network error, retry
this.retry = setTimeout(
() => this.upload(),
this.options.retryInterval
);
}
return;
}
// Record the chunk data for commit, and try to free up the chunk buffer
this.data = data;
this.chunk = null;
this.emit('uploaded', data);
}
);
}
/**
* Cancel trying to upload a chunk, preventing it from retrying and clearing
* the associated buffer
* @returns {void}
*/
cancel() {
clearTimeout(this.retry as any); // number or NodeJS.Timeout
this.chunk = null;
this.canceled = true;
}
}
// -----------------------------------------------------------------------------
// Public
// -----------------------------------------------------------------------------
/** Manager for uploading a file in chunks */
class ChunkedUploader extends EventEmitter {
_client: BoxClient;
_sessionID: string;
_partSize: number;
_uploadSessionInfo: UploadSessionInfo;
_stream!: ReadableStream | null;
_streamBuffer!: Array<any>;
_file!: Buffer | string | null;
_size: number;
_options: Required<ChunkedUploaderOptions>;
_isStarted: boolean;
_numChunksInFlight: number;
_chunks: Array<any>;
_position: number;
_fileHash: crypto.Hash;
_promise?: Promise<any>;
_resolve?: Function;
_reject?: Function;
/**
* Create an upload manager
* @param {BoxClient} client The client to use to upload the file
* @param {Object} uploadSessionInfo The upload session info to use for chunked upload
* @param {ReadableStream|Buffer|string} file The file to upload
* @param {int} size The size of the file to be uploaded
* @param {Object} [options] Optional parameters
* @param {int} [options.retryInterval=1000] The number of ms to wait before retrying operations
* @param {int} [options.parallelism=4] The number of concurrent chunks to upload
* @param {Object} [options.fileAttributes] Attributes to set on the file during commit
*/
constructor(
client: BoxClient,
uploadSessionInfo: UploadSessionInfo,
file: ReadableStream | Buffer | string,
size: number,
options?: ChunkedUploaderOptions
) {
super();
this._client = client;
this._sessionID = uploadSessionInfo.id;
this._partSize = uploadSessionInfo.part_size;
this._uploadSessionInfo = uploadSessionInfo;
if (file instanceof ReadableStream) {
// Pause the stream so we can read specific chunks from it
this._stream = file.pause();
this._streamBuffer = [];
} else if (file instanceof Buffer || typeof file === 'string') {
this._file = file;
} else {
throw new TypeError('file must be a Stream, Buffer, or string!');
}
this._size = size;
this._options = Object.assign(
{},
DEFAULT_OPTIONS,
options
) as Required<ChunkedUploaderOptions>;
this._isStarted = false;
this._numChunksInFlight = 0;
this._chunks = [];
this._position = 0;
this._fileHash = crypto.createHash('sha1');
}
/**
* Start an upload
* @returns {Promise<Object>} A promise resolving to the uploaded file
*/
start() {
if (this._isStarted) {
return this._promise;
}
// Create the initial chunks
for (let i = 0; i < this._options.parallelism; i++) {
this._getNextChunk((chunk: any /* FIXME */) =>
chunk ? this._uploadChunk(chunk) : this._commit()
);
}
this._isStarted = true;
/* eslint-disable promise/avoid-new */
this._promise = new Promise((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
});
/* eslint-enable promise/avoid-new */
return this._promise;
}
/**
* Abort a running upload, which cancels all currently uploading chunks,
* attempts to free up held memory, and aborts the upload session. This
* cannot be undone or resumed.
* @returns {Promise} A promise resolving when the upload is aborted
* @emits ChunkedUploader#aborted
* @emits ChunkedUploader#abortFailed
*/
abort() {
this._chunks.forEach((chunk) => chunk.removeAllListeners().cancel());
this._chunks = [];
this._file = null;
this._stream = null;
return (
this._client.files
.abortUploadSession(this._sessionID)
/* eslint-disable promise/always-return */
.then(() => {
this.emit('aborted');
})
/* eslint-enable promise/always-return */
.catch((err: any /* FIXME */) => {
this.emit('abortFailed', err);
throw err;
})
);
}
/**
* Get the next chunk of the file to be uploaded
* @param {Function} callback Called with the next chunk of the file to be uploaded
* @returns {void}
* @private
*/
_getNextChunk(callback: Function) {
if (this._position >= this._size) {
callback(null);
return;
}
let buf;
if (this._file) {
// Buffer/string case, just get the slice we need
buf = this._file.slice(this._position, this._position + this._partSize);
} else if (this._streamBuffer.length > 0) {
buf = this._streamBuffer.shift();
} else {
// Stream case, need to read
buf = (this._stream as ReadableStream).read(this._partSize);
if (!buf) {
// stream needs to read more, retry later
setImmediate(() => this._getNextChunk(callback));
return;
} else if (buf.length > this._partSize) {
// stream is done reading and had extra data, buffer the remainder of the file
for (let i = 0; i < buf.length; i += this._partSize) {
this._streamBuffer.push(buf.slice(i, i + this._partSize));
}
buf = this._streamBuffer.shift();
}
}
this._fileHash.update(buf);
let chunk = new Chunk(
this._client,
this._sessionID,
buf,
this._position,
this._size,
this._options
);
this._position += buf.length;
callback(chunk);
}
/**
* Upload a chunk
* @param {Chunk} chunk The chunk to upload
* @returns {void}
* @emits ChunkedUploader#chunkError
* @emits ChunkedUploader#chunkUploaded
*/
_uploadChunk(chunk: any /* FIXME */) {
this._numChunksInFlight += 1;
chunk.on('error', (err: any /* FIXME */) => this.emit('chunkError', err));
chunk.on('uploaded', (data: any /* FIXME */) => {
this._numChunksInFlight -= 1;
this.emit('chunkUploaded', data);
this._getNextChunk((nextChunk: any /* FIXME */) =>
nextChunk ? this._uploadChunk(nextChunk) : this._commit()
);
});
chunk.upload();
this._chunks.push(chunk);
}
/**
* Commit the upload, finalizing it
* @returns {void}
* @emits ChunkedUploader#uploadComplete
* @emits ChunkedUploader#error
*/
_commit() {
if (!this._isStarted || this._numChunksInFlight > 0) {
return;
}
let hash = this._fileHash.digest('base64');
this._isStarted = false;
let options = Object.assign(
{
parts: this._chunks.map((c) => c.getData()),
},
this._options.fileAttributes
);
this._client.files.commitUploadSession(this._sessionID, hash, options, (
err: any /* FIMXE */,
file: any /* FIMXE */
) => {
// It's not clear what the SDK can do here, so we just return the error and session info
// so users can retry if they wish
if (err) {
this.emit('error', {
uploadSession: this._uploadSessionInfo,
error: err,
});
this._reject!(err);
return;
}
this.emit('uploadComplete', file);
this._resolve!(file);
});
}
}
export = ChunkedUploader;