Properties

new StreamAggr(base, arg[, storeName])

Stream Aggregate

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: "createClean",
   schema: [{
       name: "People",
       fields: [
           { name: "Name", type: "string" },
           { name: "Gendre", type: "string" },
       ]
   },
   {
       name: "Laser",
       fields: [
           { name: "Time", type: "datetime" },
           { name: "WaveLength", type: "float" }
       ]
   }]
});

// create a new stream aggregator for 'People' store, get the length of the record name (with the function object)
var aggr = new qm.StreamAggr(base, new function () {
   var numOfAdds = 0;
   var numOfUpdates = 0;
   var numOfDeletes = 0;
   var time = "";
   this.name = 'nameLength',
   this.onAdd = function (rec) {
       numOfAdds += 1;
   };
   this.onUpdate = function (rec) {
       numOfUpdates += 1;
   };
   this.onDelete = function (rec) {
       numOfDeletes += 1;
   };
   this.onTime = function (ts) {
       time = ts;
   };
   this.saveJson = function (limit) {
       return { adds: numOfAdds, updates: numOfUpdates, deletes: numOfDeletes, time: time };
   };
}, "People");

// create a new time series window buffer stream aggregator for 'Laser' store (with the JSON object)
var wavelength = {
    name: "WaveLengthLaser",
    type: "timeSeriesWinBuf",
    store: "Laser",
    timestamp: "Time",
    value: "WaveLength",
    winsize: 10000
}
var sa = base.store("Laser").addStreamAggr(wavelength);
base.close();

Parameters

Name Type Optional Description

base

module:qm.Base

 

The base object on which it's created.

arg

(module:qm~StreamAggregator or function())

 

Constructor arguments. There are two argument types:
1. Using the module:qm~StreamAggregator object,
2. using a function/JavaScript class. The function has defined The object containing the schema of the stream aggregate or the function object defining the operations of the stream aggregate.

storeName

(string or Array of string)

Yes

The store names where the aggregate will be registered.

Properties

init

Returns true when the stream aggregate has enough data to initialize its internal state. Type init.

name

Returns the name of the stream aggregate. Type string.

val

Returns the JSON object of the stream aggregate. Same as the method saveJson. Type object.

Methods

getFeatureSpace() → module:qm.FeatureSpace

Returns a feature space.

Returns

module:qm.FeatureSpace The feature space.

getFloat([str]) → (number or null)

Returns the value of the specific stream aggregator. For return values see module:qm~StreamAggregator.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: 'createClean',
   schema: [{
       name: 'Grades',
       fields: [
           { name: 'Grade', type: 'int' },
           { name: 'Procents', type: 'float' },
           { name: 'Time', type: 'datetime' }
       ]
   }]
});
// create a new time series stream aggregator which takes the values from the 'Procents' field 
// and the timestamp from the 'Time' field. The size of the window is 1 year.
var ts = {
   name: 'GradesAggr',
   type: 'timeSeriesWinBuf',
   store: 'Grades',
   timestamp: 'Time',
   value: 'Procents',
   winsize: 31536000000 
};
var timeSeries = base.store('Grades').addStreamAggr(ts);
// create a new moving average stream aggregator that takes the values from the 
// 'GradesAggr' stream aggregator
var ma = {
   name: 'AverageGrade',
   type: 'ma',
   store: 'Grades',
   inAggr: 'GradesAggr'
}
var averageGrade = base.store('Grades').addStreamAggr(ma);
// add some grades in the 'Grades' store
base.store("Grades").push({ Grade: 7, Procents: 65, Time: '2014-11-23T10:00:00.0' });
base.store("Grades").push({ Grade: 9, Procents: 88, Time: '2014-12-20T12:00:00.0' });
base.store("Grades").push({ Grade: 8, Procents: 70, Time: '2015-02-03T10:00:00.0' });
// get the average grade procents by using the getFloat method
var average = averageGrade.getFloat(); // returns 74 + 1/3
base.close();

Parameter

Name Type Optional Description

str

string

Yes

The string.

Returns

(number or null) A number (stream aggregator specific), possibly null if str was provided.

getFloatAt(idx) → number

Returns the value of the vector containing the values of the stream aggregator at a specific index.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: 'createClean',
   schema: [{
       name: 'MusicSale',
       fields: [
           { name: 'NumberOfAlbums', type: 'float' },
           { name: 'Time', type: 'datetime' }
       ]
   }]
});
// create a time series containing the 'NumberOfAlbums' values and getting the timestamp from the 'Time' field.
// The window size should be 1 week.
var ts = {
   name: 'Sales',
   type: 'timeSeriesWinBuf',
   store: 'MusicSale',
   timestamp: 'Time',
   value: 'NumberOfAlbums',
   winsize: 604800000
};
var weekSales = base.store('MusicSale').addStreamAggr(ts);
// add some records in the store
base.store('MusicSale').push({ NumberOfAlbums: 10, Time: '2015-03-15T00:00:00.0' });
base.store('MusicSale').push({ NumberOfAlbums: 15, Time: '2015-03-18T00:00:00.0' });
base.store('MusicSale').push({ NumberOfAlbums: 30, Time: '2015-03-19T00:00:00.0' });
base.store('MusicSale').push({ NumberOfAlbums: 45, Time: '2015-03-20T00:00:00.0' });
// get the second value of the value vector 
var albums = weekSales.getFloatAt(1); // returns 15
base.close();

Parameter

Name Type Optional Description

idx

number

 

The index.

Returns

number The value of the float vector at position idx.

getFloatLength() → number

Gets the length of the vector containing the values of the stream aggregator.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: 'createClean',
   schema: [{
       name: 'IceCreams',
       fields: [
           { name: 'Type', type: 'string' },
           { name: 'Price', type: 'float' },
           { name: 'TimeOfConsumption', type: 'datetime' }
       ]
   }]
});
// create a time series stream aggregator, that takes values from the 'Price' field and the timestamp
//  from the 'TimeOfConsumation' field of 'IceCream' store. The window size should be 1 day.
var ts = {
   name: 'IcePrice',
   type: 'timeSeriesWinBuf',
   store: 'IceCreams',
   timestamp: 'TimeOfConsumption',
   value: 'Price',
   winsize: 86400000
};
var icePrice = base.store('IceCreams').addStreamAggr(ts);
// add some ice creams in the store
base.store('IceCreams').push({ Type: 'Chocholate', Price: 5, TimeOfConsumption: '2015-07-21T09:00:00.0' });
base.store('IceCreams').push({ Type: 'Blue Sky', Price: 3, TimeOfConsumption: '2015-07-21T14:13:00.0' });
base.store('IceCreams').push({ Type: 'Stracciatella', Price: 5, TimeOfConsumption: '2015-07-21T21:05:00.0' });
// get the number of ice creams consumed by using getFloatLength method
var numberOfIceCreamsEaten = icePrice.getFloatLength(); // returns 3
base.close();
Returns

number The length of the vector.

getFloatVector() → module:la.Vector

Gets the whole vector of values of the stream aggregator.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: 'createClean',
   schema: [{
       name: 'Hospital',
       fields: [
           { name: 'NumberOfPatients', type: 'float' },
           { name: 'Date', type: 'datetime' }
       ]
   }]
});
// create a new time series stream aggregator that takes the values from the 'NumberOfPatients' field
// and the timestamp from the 'Date' field. The window size should be 1 week.
var ts = {
   name: 'WeekPatients',
   type: 'timeSeriesWinBuf',
   store: 'Hospital',
   timestamp: 'Date',
   value: 'NumberOfPatients',
   winsize: 604800000
};
var weekPatients = base.store('Hospital').addStreamAggr(ts);
// add some records in the store
base.store('Hospital').push({ NumberOfPatients: 50, Date: '2015-05-20T00:00:00.0' });
base.store('Hospital').push({ NumberOfPatients: 56, Date: '2015-05-21T00:00:00.0' });
base.store('Hospital').push({ NumberOfPatients: 120, Date: '2015-05-22T00:00:00.0' });
base.store('Hospital').push({ NumberOfPatients: 40, Date: '2015-05-23T00:00:00.0' });
// get the values that are in the time series window buffer as a vector
var values = weekPatients.getFloatVector(); // returns the vector [50, 56, 120, 40]
base.close();
Returns

module:la.Vector The vector containing the values of the buffer.

getInFloatVector() → module:la.Vector

Gets a vector containing the values that are entering the stream aggregator.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: 'createClean',
   schema: [{
       name: 'Noise',
       fields: [
           { name: 'Decibels', type: 'float' },
           { name: 'Time', type: 'datetime' }
       ]
   }]
});
// create a time series stream aggregator that takes the values from the 'Decibels' field
// and timestamps from the 'Time' fields. The window size should be 1 second.
var ts = {
   name: 'Music',
   type: 'timeSeriesWinBuf',
   store: 'Noise',
   timestamp: 'Time',
   value: 'Decibels',
   winsize: 1000
};
var music = base.store('Noise').addStreamAggr(ts);
// add some records in the store
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:00.0' });
base.store('Noise').push({ Decibels: 55, Time: '2015-07-21T14:43:00.200' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:00.800' });
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:01.0' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:01.2' });
// get the in vector
var vec = music.getInFloatVector();
base.close();
Returns

module:la.Vector The vector containing the values that are entering the buffer.

getInteger([str]) → (number or null)

A map from strings to integers

Parameter

Name Type Optional Description

str

string

Yes

The string.

Returns

(number or null) A number (stream aggregator specific), possibly null if str was provided.

getInTimestampVector() → module:la.Vector

Gets a vector containing the timestamps that are entering the stream aggregator.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: 'createClean',
   schema: [{
       name: 'Noise',
       fields: [
           { name: 'Decibels', type: 'float' },
           { name: 'Time', type: 'datetime' }
       ]
   }]
});
// create a time series stream aggregator that takes the values from the 'Decibels' field
// and timestamps from the 'Time' fields. The window size should be 1 second.
var ts = {
   name: 'Music',
   type: 'timeSeriesWinBuf',
   store: 'Noise',
   timestamp: 'Time',
   value: 'Decibels',
   winsize: 1000
};
var music = base.store('Noise').addStreamAggr(ts);
// add some records in the store
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:00.0' });
base.store('Noise').push({ Decibels: 55, Time: '2015-07-21T14:43:00.200' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:00.800' });
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:01.0' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:01.2' });
// get the in timestamps
var vec = music.getInTimestampVector();
base.close();
Returns

module:la.Vector The vector containing the timestamps that are entering the buffer.

getInValueVector() → (module:la.Vector or module:la.SparseMatrix)

Gets the vector of 'just-in' values (values that have just entered the buffer). Values can be floats or sparse vectors.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
     mode: 'createClean',
     schema: [{
       name: 'Docs',
       fields: [
         { name: 'Time', type: 'datetime' },
         { name: 'Text', type: 'string' }
       ]
     }]
});
store = base.store('Docs');
// the following aggregate maintains a window buffer (1000 milliseconds) with no delay
// and contains a categorical feature extractor. The extractor maps records in the buffer
// to sparse vectors (indicator vectors for growing set of categories). Each record that
// enters the buffer updates the feature extractor (potentially introduces a new category,
// which increases the dimensionality).
var aggr = {
     type: 'timeSeriesWinBufFeatureSpace',
     store: 'Docs',
     timestamp: 'Time',
     featureSpace: {
       type: "categorical",
       source: "Docs",
       field: "Text"
     },
     winsize: 1000
};
var streamAggregate = store.addStreamAggr(aggr);
store.push({ Time: '2015-06-10T14:13:32.0', Text: 'a' });
store.push({ Time: '2015-06-10T14:13:33.0', Text: 'b' });
store.push({ Time: '2015-06-10T14:13:34.0', Text: 'c' });
// we have three dimensions, where "a" -> [1,0,0], "b" -> [0,1,0], "c" -> [0,0,1]
// the first record just fell out of the buffer, the third record just entered the buffer
// and buffer currently contains the second and the third record.
// In case of the feature space based window buffer, the vectors of just-in, just-out and in-the-buffer
// values correspond to vectors of sparse vectors = sparse matrices.
streamAggregate.getInValueVector().print(); // one column, one nonzero element at index 2
// = [
// 2 0 1.000000
// ]
streamAggregate.getOutValueVector().print(); // one column, one nonzero element at index 0
// = [
// 0 0 1.000000
// ]
streamAggregate.getValueVector().print(); // two column vectors, each with one nonzero element
// = [
// 1 0 1.000000
// 2 1 1.000000
// ]

base.close();
Returns

(module:la.Vector or module:la.SparseMatrix) The vector or sparse matrix of values that just entered the buffer.

getNumberOfRecords() → number

Gets the number of records in the stream aggregator.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: 'createClean',
   schema: [{
       name: 'MusicSale',
       fields: [
           { name: 'NumberOfAlbums', type: 'float' },
           { name: 'Time', type: 'datetime' }
       ]
   }]
});
// create a time series containing the values from the 'NumberOfAlbums' field and 
// the timestamp from the 'Time' field. The window size should be 1 week.
var ts = {
   name: 'Sales',
   type: 'timeSeriesWinBuf',
   store: 'MusicSale',
   timestamp: 'Time',
   value: 'NumberOfAlbums',
   winsize: 604800000
};
var weekSales = base.store('MusicSale').addStreamAggr(ts);
// add some records in the store
base.store('MusicSale').push({ NumberOfAlbums: 10, Time: '2015-03-15T00:00:00.0' });
base.store('MusicSale').push({ NumberOfAlbums: 15, Time: '2015-03-18T00:00:00.0' });
base.store('MusicSale').push({ NumberOfAlbums: 30, Time: '2015-03-19T00:00:00.0' });
base.store('MusicSale').push({ NumberOfAlbums: 45, Time: '2015-03-20T00:00:00.0' });
// get the number of records in the window buffer
var num = weekSales.getNumberOfRecords(); // returns 4
base.close();
Returns

number The number of records in the buffer.

getOutFloatVector() → module:la.Vector

Gets a vector containing the values that are leaving the stream aggregator.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: 'createClean',
   schema: [{
       name: 'F1',
       fields: [
           { name: 'Driver', type: 'string' },
           { name: 'Speed', type: 'float' },
           { name: 'Time', type: 'datetime' }
       ]
   }]
});
// create a time series stream aggregator that gets the values from the 'Speed' field
// and the timestamp from the 'Time' field. The window size should be 5 minutes.
var ts = {
   name: 'Sensor',
   type: 'timeSeriesWinBuf',
   store: 'F1',
   timestamp: 'Time',
   value: 'Speed',
   winsize: 300000
};
var sensor = base.store('F1').addStreamAggr(ts);
// add some records to the store
base.store('F1').push({ Driver: 'Sebastian Vettel', Speed: 203.4, Time: '2015-07-19T09:32:01.0' });
base.store('F1').push({ Driver: 'Thomas "Tommy" Angelo', Speed: 152.8, Time: '2015-07-19T09:35:23.0' });
base.store('F1').push({ Driver: 'Mark Ham', Speed: 189.5, Time: '2015-07-19T09:38:43.0' });
base.store('F1').push({ Driver: 'Speedy Gonzales', Speed: 171.4, Time: '2015-07-19T09:40:32.0' });
// get the values, that have got out of the window buffer.
// because the window size is 5 seconds, the last value that have left the buffer is 152.8
var left = sensor.getOutFloatVector(); // returns [152.8]
base.close();
Returns

module:la.Vector The vector containing the values that are leaving the buffer.

getOutTimestampVector() → module:la.Vector

Gets a vector containing the timestamps that are leaving the stream aggregator.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: 'createClean',
   schema: [{
       name: 'Noise',
       fields: [
           { name: 'Decibels', type: 'float' },
           { name: 'Time', type: 'datetime' }
       ]
   }]
});
// create a time series stream aggregator that takes the values from the 'Decibels' field
// and timestamps from the 'Time' fields. The window size should be 1 second.
var ts = {
   name: 'Music',
   type: 'timeSeriesWinBuf',
   store: 'Noise',
   timestamp: 'Time',
   value: 'Decibels',
   winsize: 1000
};
var music = base.store('Noise').addStreamAggr(ts);
// add some records in the store
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:00.0' });
base.store('Noise').push({ Decibels: 55, Time: '2015-07-21T14:43:00.200' });
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:00.400' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:00.600' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:00.800' });
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:01.0' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:01.2' });
// get the timestamps that just left the window buffer by adding the last record
var last = music.getOutTimestampVector(); // returns [13081963380000]
base.close();
Returns

module:la.Vector The vector containing the leaving timestamps.

getOutValueVector() → (module:la.Vector or module:la.SparseMatrix)

Gets the vector of 'just-out' values (values that have just fallen out of the buffer). Values can be floats or sparse vectors. For use example see module:qm.StreamAggr#getOutFloatVector example.

Returns

(module:la.Vector or module:la.SparseMatrix) Vector of floats or a vector of sparse vectors.

getParams() → Object

Returns all the parameters of the stream aggregate

Returns

Object parameters

getTimestamp() → number

Returns the timestamp value of the newest record in buffer.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: 'createClean',
   schema: [{
       name: 'GameCollection',
       fields: [
           { name: 'GameName', type: 'string' },
           { name: 'Price', type: 'float' },
           { name: 'ReleaseDate', type: 'datetime' }
       ]
   }]
});
// create a new time series stream aggregator which takes the values from the 'Price' field
// and the timestamps from the 'ReleaseDate' field. The window size should be 1 month.
var ts = {
   name: 'GameSeries',
   type: 'timeSeriesWinBuf',
   store: 'GameCollection',
   timestamp: 'ReleaseDate',
   value: 'Price',
   winsize: 2678400000
};
var timeSeries = base.store('GameCollection').addStreamAggr(ts);
// create a new sum stream aggregator
var sum = {
   name: 'SumPrice',
   type: 'winBufSum',
   store: 'GameCollection',
   inAggr: 'GameSeries'
};
var priceSum = base.store('GameCollection').addStreamAggr(sum);
// put some records in the store
base.store('GameCollection').push({ GameName: 'Tetris', Price: 0, ReleaseDate: '1984-06-06T00:00:00.0' });
base.store('GameCollection').push({ GameName: 'Super Mario Bros.', Price: 100, ReleaseDate: '1985-09-13T00:00:00.0' });
base.store('GameCollection').push({ GameName: 'The Legend of Zelda', Price: 90, ReleaseDate: '1986-02-21T00:00:00.0 '});
// get the timestamp of the last bought game by using getTimestamp
var date = priceSum.getTimestamp(); // returns 12153801600000 (the miliseconds since 1601-01-01T00:00:00.0)
base.close();
Returns

number The timestamp of the newest record. It represents the number of miliseconds between the record time and 01.01.1601 time: 00:00:00.0.

getTimestampAt(idx) → number

Gets the timestamp from the timestamp vector of the stream aggregator at the specific index.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: 'createClean',
   schema: [{
       name: 'Route66',
       fields: [
           { name: 'NumberOfCars', type: 'float' },
           { name: 'Time', type: 'datetime' }
       ]
   }]
});
// create a time series stream aggregator that takes the values from the 'NumberOfCars' field
// and the timestamps from the 'Time' field. The window size should be 1 day.
var ts = {
   name: 'Traffic',
   type: 'timeSeriesWinBuf',
   store: 'Route66',
   timestamp: 'Time',
   value: 'NumberOfCars',
   winsize: 86400000
};
var traffic = base.store('Route66').addStreamAggr(ts);
// add some records in the store
base.store('Route66').push({ NumberOfCars: 100, Time: '2015-06-15T06:00:00.0' });
base.store('Route66').push({ NumberOfCars: 88, Time: '2015-06-15T:10:00.0' });
base.store('Route66').push({ NumberOfCars: 60, Time: '2015-06-15T13:00:00.0' });
base.store('Route66').push({ NumberOfCars: 90, Time: '2015-06-15T18:00:00.0' });
base.store('Route66').push({ NumberOfCars: 110, Time: '2015-06-16T00:00:00.0' });
// get the third timestamp in the buffer
var time = traffic.getTimestampAt(2); // returns 13078864800000
base.close();

Parameter

Name Type Optional Description

idx

number

 

The index.

Returns

number The timestamp of the timestamp vector at position idx.

getTimestampLength() → number

Gets the length of the timestamp vector of the stream aggregator.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: 'createClean',
   schema: [{
       name: 'Medicine',
       fields: [
           { name: 'NumberOfPills', type: 'float' },
           { name: 'Time', type: 'datetime' }
       ]
   }]
});
// create a time series stream aggregator that takes the values from the 'NumberOfPills' field
// and the timestamp from the 'Time' field. The window size should be 1 week.
var ts = {
   name: 'WeekPills',
   type: 'timeSeriesWinBuf',
   store: 'Medicine',
   timestamp: 'Time',
   value: 'NumberOfPills',
   winsize: 604800000
};
var weekly = base.store('Medicine').addStreamAggr(ts);
// add some records in the store
base.store('Medicine').push({ NumberOfPills: 4, Time: '2015-07-21T09:00:00.0' });
base.store('Medicine').push({ NumberOfPills: 5, Time: '2015-07-21T19:00:00.0' });
base.store('Medicine').push({ NumberOfPills: 4, Time: '2015-07-22T09:00:00.0' });
base.store('Medicine').push({ NumberOfPills: 5, Time: '2015-07-22T19:00:00.0' });
base.store('Medicine').push({ NumberOfPills: 4, Time: '2015-07-23T09:00:00.0' });
base.store('Medicine').push({ NumberOfPills: 6, Time: '2015-07-23T19:00:00.0' });
base.store('Medicine').push({ NumberOfPills: 4, Time: '2015-07-24T09:00:00.0' });
// get the length of the timestamp vector
var length = weekly.getTimestampLength(); // returns 7
base.close();
Returns

number The length of the timestamp vector.

getTimestampVector() → module:la.Vector

Gets the vector containing the timestamps of the stream aggregator.

Example

// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
   mode: 'createClean',
   schema: [{
       name: 'Signals',
       fields: [
           { name: 'BeepLoudness', type: 'float' },
           { name: 'Time', type: 'datetime' }
       ]
   }]
});
// create a time series stream aggregator that gets the values from the 'BeepLoudness' field and
// the timestamp from the 'Time' field. The window size should be 10 seconds.
var ts = {
   name: 'SignalBeep',
   type: 'timeSeriesWinBuf',
   store: 'Signals',
   timestamp: 'Time',
   value: 'BeepLoudness',
   winsize: 10000
};
var signalBeep = base.store('Signals').addStreamAggr(ts);
// add some records to the store
base.store('Signals').push({ BeepLoudness: 10, Time: '2015-07-21T12:30:30.0' });
base.store('Signals').push({ BeepLoudness: 25, Time: '2015-07-21T12:30:31.0' });
base.store('Signals').push({ BeepLoudness: 20, Time: '2015-07-21T12:30:32.0' });
// get the timestamp vector of signalBeep
var vec = signalBeep.getTimestampVector(); // returns vector [13081955430000, 13081955431000, 13081955432000]
base.close();
Returns

module:la.Vector The vector containing the timestamps.

getValueVector() → (module:la.Vector or module:la.SparseMatrix)

Gets the vector of values in the buffer. Values can be floats or sparse vectors. For use example see module:qm.SreamAggr#getInValueVector.

Returns

(module:la.Vector or module:la.SparseMatrix) Vector of floats or a vector of sparse vectors

load(fin) → module:qm.StreamAggr

Loads the stream aggregator, that has been previously saved.

Parameter

Name Type Optional Description

fin

module:fs.FIn

 

The input stream.

Returns

module:qm.StreamAggr Self.

loadStateJson(state) → module:qm.StreamAggr

Loads the stream aggregator from the state.

Parameter

Name Type Optional Description

state

Object

 

The state.

Returns

module:qm.StreamAggr Self.

onAdd(rec[, Caller]) → module:qm.StreamAggr

Executes the function when a new record is put in store. For use example see module:qm.StreamAggr constructor example.

Parameters

Name Type Optional Description

rec

module:qm.Record

 

The record given to the stream aggregator.

Caller

module:qm.StreamAggr

Yes

Caller stream aggregate.

Returns

module:qm.StreamAggr Self. Values in the stream aggregator are changed as defined in the inner onAdd function.

onDelete(rec[, Caller]) → module:qm.StreamAggr

Executes the function when a record in the store is deleted. For use example see module:qm.StreamAggr constructor example.

Parameters

Name Type Optional Description

rec

module:qm.Record

 

The deleted record given to the stream aggregator.

Caller

module:qm.StreamAggr

Yes

Caller stream aggregate.

Returns

module:qm.StreamAggr Self. The values in the stream aggregator are changed as defined in the inner onDelete function.

onStep([Caller]) → module:qm.StreamAggr

Executes the function that updates the aggregate. For use example see module:qm.StreamAggr constructor example.

Parameter

Name Type Optional Description

Caller

module:qm.StreamAggr

Yes

Caller stream aggregate.

Returns

module:qm.StreamAggr Self. Values in the stream aggregator are changed as defined in the inner onTime function.

onTime(ts[, Caller]) → module:qm.StreamAggr

Executes the function that updates the aggregate at a given timestamp. For use example see module:qm.StreamAggr constructor example.

Parameters

Name Type Optional Description

ts

TmMsec

 

Timestamp in milliseconds.

Caller

module:qm.StreamAggr

Yes

Caller stream aggregate.

Returns

module:qm.StreamAggr Self. Values in the stream aggregator are changed as defined in the inner onTime function.

onUpdate(rec[, Caller]) → module:qm.StreamAggr

Executes the function when a record in the store is updated. For use example see module:qm.StreamAggr constructor example.

Parameters

Name Type Optional Description

rec

module:qmRecord

 

The updated record given to the stream aggregator.

Caller

module:qm.StreamAggr

Yes

Caller stream aggregate.

Returns

module:qm.StreamAggr Self. Values in the stream aggregator are changed as defined in the inner onUpdate function.

reset() → module:qm.StreamAggr

Resets the state of the aggregate.

Example

// import the qm module
var qm = require('qminer');
// create a base with a simple store
var base = new qm.Base({
   mode: "createClean",
   schema: [
   {
       name: "Heat",
       fields: [
           { name: "Celsius", type: "float" },
           { name: "Time", type: "datetime" }
       ]
   }]
});

// create a new time series stream aggregator for the 'Heat' store, that takes the values from the 'Celsius' field
// and the timestamp from the 'Time' field. The size of the window should be 1 day.
var timeser = {
   name: 'TimeSeriesAggr',
   type: 'timeSeriesWinBuf',
   store: 'Heat',
   timestamp: 'Time',
   value: 'Celsius',
   winsize: 86400000
};
var timeSeries = base.store("Heat").addStreamAggr(timeser);

// add a moving average aggregator, that is connected with the 'TimeSeriesAggr' aggregator
var ma = {
   name: 'movingAverageAggr',
   type: 'ma',
   store: 'Heat',
   inAggr: 'TimeSeriesAggr'
};
var movingAverage = base.store("Heat").addStreamAggr(ma);
// reset the moving average aggregate
movingAverage.reset();
base.close();
Returns

module:qm.StreamAggr Self. The state has been reset.

save(fout) → module:fs.FOut

Saves the current state of the stream aggregator.

Parameter

Name Type Optional Description

fout

module:fs.FOut

 

The output stream.

Returns

module:fs.FOut The output stream fout.

saveJson([limit]) → Object

When executed it return a JSON object as defined by the user. For use example see module:qm.StreamAggr constructor example.

Parameter

Name Type Optional Description

limit

number

Yes

The meaning is specific to each type of stream aggregator.

Returns

Object A JSON object as defined by the user.

saveStateJson() → Object

Returns the current state of the stream aggregate as a json.

Returns

Object JSON that represents the state.

setParams(params)

Sets one or more parameters.

Parameter

Name Type Optional Description

params

Object

 

JSON representation of the parameters