qm. StreamAggr
Source: qminer_aggrdoc.
Represents a stream aggregate. The class can construct these module:qm~StreamAggregator objects. Also turn to these stream aggregators to see which methods are implemented.
Methods
new StreamAggr(base, arg[, storeName])
Stream Aggregate
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: "createClean",
schema: [{
name: "People",
fields: [
{ name: "Name", type: "string" },
{ name: "Gendre", type: "string" },
]
},
{
name: "Laser",
fields: [
{ name: "Time", type: "datetime" },
{ name: "WaveLength", type: "float" }
]
}]
});
// create a new stream aggregator for 'People' store, get the length of the record name (with the function object)
var aggr = new qm.StreamAggr(base, new function () {
var numOfAdds = 0;
var numOfUpdates = 0;
var numOfDeletes = 0;
var time = "";
this.name = 'nameLength',
this.onAdd = function (rec) {
numOfAdds += 1;
};
this.onUpdate = function (rec) {
numOfUpdates += 1;
};
this.onDelete = function (rec) {
numOfDeletes += 1;
};
this.onTime = function (ts) {
time = ts;
};
this.saveJson = function (limit) {
return { adds: numOfAdds, updates: numOfUpdates, deletes: numOfDeletes, time: time };
};
}, "People");
// create a new time series window buffer stream aggregator for 'Laser' store (with the JSON object)
var wavelength = {
name: "WaveLengthLaser",
type: "timeSeriesWinBuf",
store: "Laser",
timestamp: "Time",
value: "WaveLength",
winsize: 10000
}
var sa = base.store("Laser").addStreamAggr(wavelength);
base.close();
Parameters
Name | Type | Optional | Description |
---|---|---|---|
base |
|
The base object on which it's created. |
|
arg |
(module:qm~StreamAggregator or function()) |
|
Constructor arguments. There are two argument types:
|
storeName |
(string or Array of string) |
Yes |
The store names where the aggregate will be registered. |
Properties
init
Returns true when the stream aggregate has enough data to initialize its internal state. Type init
.
name
Returns the name of the stream aggregate. Type string
.
val
Returns the JSON object of the stream aggregate. Same as the method saveJson. Type object
.
Methods
getFeatureSpace() → module:qm.FeatureSpace
Returns a feature space.
- Returns
-
module:qm.FeatureSpace
B The feature space.
getFloat([str]) → (number or null)
Returns the value of the specific stream aggregator. For return values see module:qm~StreamAggregator.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'Grades',
fields: [
{ name: 'Grade', type: 'int' },
{ name: 'Procents', type: 'float' },
{ name: 'Time', type: 'datetime' }
]
}]
});
// create a new time series stream aggregator which takes the values from the 'Procents' field
// and the timestamp from the 'Time' field. The size of the window is 1 year.
var ts = {
name: 'GradesAggr',
type: 'timeSeriesWinBuf',
store: 'Grades',
timestamp: 'Time',
value: 'Procents',
winsize: 31536000000
};
var timeSeries = base.store('Grades').addStreamAggr(ts);
// create a new moving average stream aggregator that takes the values from the
// 'GradesAggr' stream aggregator
var ma = {
name: 'AverageGrade',
type: 'ma',
store: 'Grades',
inAggr: 'GradesAggr'
}
var averageGrade = base.store('Grades').addStreamAggr(ma);
// add some grades in the 'Grades' store
base.store("Grades").push({ Grade: 7, Procents: 65, Time: '2014-11-23T10:00:00.0' });
base.store("Grades").push({ Grade: 9, Procents: 88, Time: '2014-12-20T12:00:00.0' });
base.store("Grades").push({ Grade: 8, Procents: 70, Time: '2015-02-03T10:00:00.0' });
// get the average grade procents by using the getFloat method
var average = averageGrade.getFloat(); // returns 74 + 1/3
base.close();
Parameter
Name | Type | Optional | Description |
---|---|---|---|
str |
string |
Yes |
The string. |
- Returns
-
(number or null)
B A number (stream aggregator specific), possibly null ifstr
was provided.
getFloatAt(idx) → number
Returns the value of the vector containing the values of the stream aggregator at a specific index.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'MusicSale',
fields: [
{ name: 'NumberOfAlbums', type: 'float' },
{ name: 'Time', type: 'datetime' }
]
}]
});
// create a time series containing the 'NumberOfAlbums' values and getting the timestamp from the 'Time' field.
// The window size should be 1 week.
var ts = {
name: 'Sales',
type: 'timeSeriesWinBuf',
store: 'MusicSale',
timestamp: 'Time',
value: 'NumberOfAlbums',
winsize: 604800000
};
var weekSales = base.store('MusicSale').addStreamAggr(ts);
// add some records in the store
base.store('MusicSale').push({ NumberOfAlbums: 10, Time: '2015-03-15T00:00:00.0' });
base.store('MusicSale').push({ NumberOfAlbums: 15, Time: '2015-03-18T00:00:00.0' });
base.store('MusicSale').push({ NumberOfAlbums: 30, Time: '2015-03-19T00:00:00.0' });
base.store('MusicSale').push({ NumberOfAlbums: 45, Time: '2015-03-20T00:00:00.0' });
// get the second value of the value vector
var albums = weekSales.getFloatAt(1); // returns 15
base.close();
Parameter
Name | Type | Optional | Description |
---|---|---|---|
idx |
number |
|
The index. |
- Returns
-
number
B The value of the float vector at positionidx
.
getFloatLength() → number
Gets the length of the vector containing the values of the stream aggregator.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'IceCreams',
fields: [
{ name: 'Type', type: 'string' },
{ name: 'Price', type: 'float' },
{ name: 'TimeOfConsumption', type: 'datetime' }
]
}]
});
// create a time series stream aggregator, that takes values from the 'Price' field and the timestamp
// from the 'TimeOfConsumation' field of 'IceCream' store. The window size should be 1 day.
var ts = {
name: 'IcePrice',
type: 'timeSeriesWinBuf',
store: 'IceCreams',
timestamp: 'TimeOfConsumption',
value: 'Price',
winsize: 86400000
};
var icePrice = base.store('IceCreams').addStreamAggr(ts);
// add some ice creams in the store
base.store('IceCreams').push({ Type: 'Chocholate', Price: 5, TimeOfConsumption: '2015-07-21T09:00:00.0' });
base.store('IceCreams').push({ Type: 'Blue Sky', Price: 3, TimeOfConsumption: '2015-07-21T14:13:00.0' });
base.store('IceCreams').push({ Type: 'Stracciatella', Price: 5, TimeOfConsumption: '2015-07-21T21:05:00.0' });
// get the number of ice creams consumed by using getFloatLength method
var numberOfIceCreamsEaten = icePrice.getFloatLength(); // returns 3
base.close();
- Returns
-
number
B The length of the vector.
getFloatVector() → module:la.Vector
Gets the whole vector of values of the stream aggregator.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'Hospital',
fields: [
{ name: 'NumberOfPatients', type: 'float' },
{ name: 'Date', type: 'datetime' }
]
}]
});
// create a new time series stream aggregator that takes the values from the 'NumberOfPatients' field
// and the timestamp from the 'Date' field. The window size should be 1 week.
var ts = {
name: 'WeekPatients',
type: 'timeSeriesWinBuf',
store: 'Hospital',
timestamp: 'Date',
value: 'NumberOfPatients',
winsize: 604800000
};
var weekPatients = base.store('Hospital').addStreamAggr(ts);
// add some records in the store
base.store('Hospital').push({ NumberOfPatients: 50, Date: '2015-05-20T00:00:00.0' });
base.store('Hospital').push({ NumberOfPatients: 56, Date: '2015-05-21T00:00:00.0' });
base.store('Hospital').push({ NumberOfPatients: 120, Date: '2015-05-22T00:00:00.0' });
base.store('Hospital').push({ NumberOfPatients: 40, Date: '2015-05-23T00:00:00.0' });
// get the values that are in the time series window buffer as a vector
var values = weekPatients.getFloatVector(); // returns the vector [50, 56, 120, 40]
base.close();
- Returns
-
module:la.Vector
B The vector containing the values of the buffer.
getInFloatVector() → module:la.Vector
Gets a vector containing the values that are entering the stream aggregator.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'Noise',
fields: [
{ name: 'Decibels', type: 'float' },
{ name: 'Time', type: 'datetime' }
]
}]
});
// create a time series stream aggregator that takes the values from the 'Decibels' field
// and timestamps from the 'Time' fields. The window size should be 1 second.
var ts = {
name: 'Music',
type: 'timeSeriesWinBuf',
store: 'Noise',
timestamp: 'Time',
value: 'Decibels',
winsize: 1000
};
var music = base.store('Noise').addStreamAggr(ts);
// add some records in the store
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:00.0' });
base.store('Noise').push({ Decibels: 55, Time: '2015-07-21T14:43:00.200' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:00.800' });
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:01.0' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:01.2' });
// get the in vector
var vec = music.getInFloatVector();
base.close();
- Returns
-
module:la.Vector
B The vector containing the values that are entering the buffer.
getInteger([str]) → (number or null)
A map from strings to integers
Parameter
Name | Type | Optional | Description |
---|---|---|---|
str |
string |
Yes |
The string. |
- Returns
-
(number or null)
B A number (stream aggregator specific), possibly null ifstr
was provided.
getInTimestampVector() → module:la.Vector
Gets a vector containing the timestamps that are entering the stream aggregator.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'Noise',
fields: [
{ name: 'Decibels', type: 'float' },
{ name: 'Time', type: 'datetime' }
]
}]
});
// create a time series stream aggregator that takes the values from the 'Decibels' field
// and timestamps from the 'Time' fields. The window size should be 1 second.
var ts = {
name: 'Music',
type: 'timeSeriesWinBuf',
store: 'Noise',
timestamp: 'Time',
value: 'Decibels',
winsize: 1000
};
var music = base.store('Noise').addStreamAggr(ts);
// add some records in the store
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:00.0' });
base.store('Noise').push({ Decibels: 55, Time: '2015-07-21T14:43:00.200' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:00.800' });
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:01.0' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:01.2' });
// get the in timestamps
var vec = music.getInTimestampVector();
base.close();
- Returns
-
module:la.Vector
B The vector containing the timestamps that are entering the buffer.
getInValueVector() → (module:la.Vector or module:la.SparseMatrix)
Gets the vector of 'just-in' values (values that have just entered the buffer). Values can be floats or sparse vectors.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'Docs',
fields: [
{ name: 'Time', type: 'datetime' },
{ name: 'Text', type: 'string' }
]
}]
});
store = base.store('Docs');
// the following aggregate maintains a window buffer (1000 milliseconds) with no delay
// and contains a categorical feature extractor. The extractor maps records in the buffer
// to sparse vectors (indicator vectors for growing set of categories). Each record that
// enters the buffer updates the feature extractor (potentially introduces a new category,
// which increases the dimensionality).
var aggr = {
type: 'timeSeriesWinBufFeatureSpace',
store: 'Docs',
timestamp: 'Time',
featureSpace: {
type: "categorical",
source: "Docs",
field: "Text"
},
winsize: 1000
};
var streamAggregate = store.addStreamAggr(aggr);
store.push({ Time: '2015-06-10T14:13:32.0', Text: 'a' });
store.push({ Time: '2015-06-10T14:13:33.0', Text: 'b' });
store.push({ Time: '2015-06-10T14:13:34.0', Text: 'c' });
// we have three dimensions, where "a" -> [1,0,0], "b" -> [0,1,0], "c" -> [0,0,1]
// the first record just fell out of the buffer, the third record just entered the buffer
// and buffer currently contains the second and the third record.
// In case of the feature space based window buffer, the vectors of just-in, just-out and in-the-buffer
// values correspond to vectors of sparse vectors = sparse matrices.
streamAggregate.getInValueVector().print(); // one column, one nonzero element at index 2
// = [
// 2 0 1.000000
// ]
streamAggregate.getOutValueVector().print(); // one column, one nonzero element at index 0
// = [
// 0 0 1.000000
// ]
streamAggregate.getValueVector().print(); // two column vectors, each with one nonzero element
// = [
// 1 0 1.000000
// 2 1 1.000000
// ]
base.close();
- Returns
-
(module:la.Vector or module:la.SparseMatrix)
B The vector or sparse matrix of values that just entered the buffer.
getNumberOfRecords() → number
Gets the number of records in the stream aggregator.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'MusicSale',
fields: [
{ name: 'NumberOfAlbums', type: 'float' },
{ name: 'Time', type: 'datetime' }
]
}]
});
// create a time series containing the values from the 'NumberOfAlbums' field and
// the timestamp from the 'Time' field. The window size should be 1 week.
var ts = {
name: 'Sales',
type: 'timeSeriesWinBuf',
store: 'MusicSale',
timestamp: 'Time',
value: 'NumberOfAlbums',
winsize: 604800000
};
var weekSales = base.store('MusicSale').addStreamAggr(ts);
// add some records in the store
base.store('MusicSale').push({ NumberOfAlbums: 10, Time: '2015-03-15T00:00:00.0' });
base.store('MusicSale').push({ NumberOfAlbums: 15, Time: '2015-03-18T00:00:00.0' });
base.store('MusicSale').push({ NumberOfAlbums: 30, Time: '2015-03-19T00:00:00.0' });
base.store('MusicSale').push({ NumberOfAlbums: 45, Time: '2015-03-20T00:00:00.0' });
// get the number of records in the window buffer
var num = weekSales.getNumberOfRecords(); // returns 4
base.close();
- Returns
-
number
B The number of records in the buffer.
getOutFloatVector() → module:la.Vector
Gets a vector containing the values that are leaving the stream aggregator.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'F1',
fields: [
{ name: 'Driver', type: 'string' },
{ name: 'Speed', type: 'float' },
{ name: 'Time', type: 'datetime' }
]
}]
});
// create a time series stream aggregator that gets the values from the 'Speed' field
// and the timestamp from the 'Time' field. The window size should be 5 minutes.
var ts = {
name: 'Sensor',
type: 'timeSeriesWinBuf',
store: 'F1',
timestamp: 'Time',
value: 'Speed',
winsize: 300000
};
var sensor = base.store('F1').addStreamAggr(ts);
// add some records to the store
base.store('F1').push({ Driver: 'Sebastian Vettel', Speed: 203.4, Time: '2015-07-19T09:32:01.0' });
base.store('F1').push({ Driver: 'Thomas "Tommy" Angelo', Speed: 152.8, Time: '2015-07-19T09:35:23.0' });
base.store('F1').push({ Driver: 'Mark Ham', Speed: 189.5, Time: '2015-07-19T09:38:43.0' });
base.store('F1').push({ Driver: 'Speedy Gonzales', Speed: 171.4, Time: '2015-07-19T09:40:32.0' });
// get the values, that have got out of the window buffer.
// because the window size is 5 seconds, the last value that have left the buffer is 152.8
var left = sensor.getOutFloatVector(); // returns [152.8]
base.close();
- Returns
-
module:la.Vector
B The vector containing the values that are leaving the buffer.
getOutTimestampVector() → module:la.Vector
Gets a vector containing the timestamps that are leaving the stream aggregator.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'Noise',
fields: [
{ name: 'Decibels', type: 'float' },
{ name: 'Time', type: 'datetime' }
]
}]
});
// create a time series stream aggregator that takes the values from the 'Decibels' field
// and timestamps from the 'Time' fields. The window size should be 1 second.
var ts = {
name: 'Music',
type: 'timeSeriesWinBuf',
store: 'Noise',
timestamp: 'Time',
value: 'Decibels',
winsize: 1000
};
var music = base.store('Noise').addStreamAggr(ts);
// add some records in the store
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:00.0' });
base.store('Noise').push({ Decibels: 55, Time: '2015-07-21T14:43:00.200' });
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:00.400' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:00.600' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:00.800' });
base.store('Noise').push({ Decibels: 54, Time: '2015-07-21T14:43:01.0' });
base.store('Noise').push({ Decibels: 53, Time: '2015-07-21T14:43:01.2' });
// get the timestamps that just left the window buffer by adding the last record
var last = music.getOutTimestampVector(); // returns [13081963380000]
base.close();
- Returns
-
module:la.Vector
B The vector containing the leaving timestamps.
getOutValueVector() → (module:la.Vector or module:la.SparseMatrix)
Gets the vector of 'just-out' values (values that have just fallen out of the buffer). Values can be floats or sparse vectors. For use example see module:qm.StreamAggr#getOutFloatVector example.
- Returns
-
(module:la.Vector or module:la.SparseMatrix)
B Vector of floats or a vector of sparse vectors.
getParams() → Object
Returns all the parameters of the stream aggregate
- Returns
-
Object
B parameters
getTimestamp() → number
Returns the timestamp value of the newest record in buffer.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'GameCollection',
fields: [
{ name: 'GameName', type: 'string' },
{ name: 'Price', type: 'float' },
{ name: 'ReleaseDate', type: 'datetime' }
]
}]
});
// create a new time series stream aggregator which takes the values from the 'Price' field
// and the timestamps from the 'ReleaseDate' field. The window size should be 1 month.
var ts = {
name: 'GameSeries',
type: 'timeSeriesWinBuf',
store: 'GameCollection',
timestamp: 'ReleaseDate',
value: 'Price',
winsize: 2678400000
};
var timeSeries = base.store('GameCollection').addStreamAggr(ts);
// create a new sum stream aggregator
var sum = {
name: 'SumPrice',
type: 'winBufSum',
store: 'GameCollection',
inAggr: 'GameSeries'
};
var priceSum = base.store('GameCollection').addStreamAggr(sum);
// put some records in the store
base.store('GameCollection').push({ GameName: 'Tetris', Price: 0, ReleaseDate: '1984-06-06T00:00:00.0' });
base.store('GameCollection').push({ GameName: 'Super Mario Bros.', Price: 100, ReleaseDate: '1985-09-13T00:00:00.0' });
base.store('GameCollection').push({ GameName: 'The Legend of Zelda', Price: 90, ReleaseDate: '1986-02-21T00:00:00.0 '});
// get the timestamp of the last bought game by using getTimestamp
var date = priceSum.getTimestamp(); // returns 12153801600000 (the miliseconds since 1601-01-01T00:00:00.0)
base.close();
- Returns
-
number
B The timestamp of the newest record. It represents the number of miliseconds between the record time and 01.01.1601 time: 00:00:00.0.
getTimestampAt(idx) → number
Gets the timestamp from the timestamp vector of the stream aggregator at the specific index.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'Route66',
fields: [
{ name: 'NumberOfCars', type: 'float' },
{ name: 'Time', type: 'datetime' }
]
}]
});
// create a time series stream aggregator that takes the values from the 'NumberOfCars' field
// and the timestamps from the 'Time' field. The window size should be 1 day.
var ts = {
name: 'Traffic',
type: 'timeSeriesWinBuf',
store: 'Route66',
timestamp: 'Time',
value: 'NumberOfCars',
winsize: 86400000
};
var traffic = base.store('Route66').addStreamAggr(ts);
// add some records in the store
base.store('Route66').push({ NumberOfCars: 100, Time: '2015-06-15T06:00:00.0' });
base.store('Route66').push({ NumberOfCars: 88, Time: '2015-06-15T:10:00.0' });
base.store('Route66').push({ NumberOfCars: 60, Time: '2015-06-15T13:00:00.0' });
base.store('Route66').push({ NumberOfCars: 90, Time: '2015-06-15T18:00:00.0' });
base.store('Route66').push({ NumberOfCars: 110, Time: '2015-06-16T00:00:00.0' });
// get the third timestamp in the buffer
var time = traffic.getTimestampAt(2); // returns 13078864800000
base.close();
Parameter
Name | Type | Optional | Description |
---|---|---|---|
idx |
number |
|
The index. |
- Returns
-
number
B The timestamp of the timestamp vector at positionidx
.
getTimestampLength() → number
Gets the length of the timestamp vector of the stream aggregator.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'Medicine',
fields: [
{ name: 'NumberOfPills', type: 'float' },
{ name: 'Time', type: 'datetime' }
]
}]
});
// create a time series stream aggregator that takes the values from the 'NumberOfPills' field
// and the timestamp from the 'Time' field. The window size should be 1 week.
var ts = {
name: 'WeekPills',
type: 'timeSeriesWinBuf',
store: 'Medicine',
timestamp: 'Time',
value: 'NumberOfPills',
winsize: 604800000
};
var weekly = base.store('Medicine').addStreamAggr(ts);
// add some records in the store
base.store('Medicine').push({ NumberOfPills: 4, Time: '2015-07-21T09:00:00.0' });
base.store('Medicine').push({ NumberOfPills: 5, Time: '2015-07-21T19:00:00.0' });
base.store('Medicine').push({ NumberOfPills: 4, Time: '2015-07-22T09:00:00.0' });
base.store('Medicine').push({ NumberOfPills: 5, Time: '2015-07-22T19:00:00.0' });
base.store('Medicine').push({ NumberOfPills: 4, Time: '2015-07-23T09:00:00.0' });
base.store('Medicine').push({ NumberOfPills: 6, Time: '2015-07-23T19:00:00.0' });
base.store('Medicine').push({ NumberOfPills: 4, Time: '2015-07-24T09:00:00.0' });
// get the length of the timestamp vector
var length = weekly.getTimestampLength(); // returns 7
base.close();
- Returns
-
number
B The length of the timestamp vector.
getTimestampVector() → module:la.Vector
Gets the vector containing the timestamps of the stream aggregator.
Example
// import qm module
var qm = require('qminer');
// create a simple base containing one store
var base = new qm.Base({
mode: 'createClean',
schema: [{
name: 'Signals',
fields: [
{ name: 'BeepLoudness', type: 'float' },
{ name: 'Time', type: 'datetime' }
]
}]
});
// create a time series stream aggregator that gets the values from the 'BeepLoudness' field and
// the timestamp from the 'Time' field. The window size should be 10 seconds.
var ts = {
name: 'SignalBeep',
type: 'timeSeriesWinBuf',
store: 'Signals',
timestamp: 'Time',
value: 'BeepLoudness',
winsize: 10000
};
var signalBeep = base.store('Signals').addStreamAggr(ts);
// add some records to the store
base.store('Signals').push({ BeepLoudness: 10, Time: '2015-07-21T12:30:30.0' });
base.store('Signals').push({ BeepLoudness: 25, Time: '2015-07-21T12:30:31.0' });
base.store('Signals').push({ BeepLoudness: 20, Time: '2015-07-21T12:30:32.0' });
// get the timestamp vector of signalBeep
var vec = signalBeep.getTimestampVector(); // returns vector [13081955430000, 13081955431000, 13081955432000]
base.close();
- Returns
-
module:la.Vector
B The vector containing the timestamps.
getValueVector() → (module:la.Vector or module:la.SparseMatrix)
Gets the vector of values in the buffer. Values can be floats or sparse vectors. For use example see module:qm.SreamAggr#getInValueVector.
- Returns
-
(module:la.Vector or module:la.SparseMatrix)
B Vector of floats or a vector of sparse vectors
load(fin) → module:qm.StreamAggr
Loads the stream aggregator, that has been previously saved.
Parameter
Name | Type | Optional | Description |
---|---|---|---|
fin |
|
The input stream. |
- Returns
-
module:qm.StreamAggr
B Self.
loadStateJson(state) → module:qm.StreamAggr
Loads the stream aggregator from the state.
Parameter
Name | Type | Optional | Description |
---|---|---|---|
state |
Object |
|
The state. |
- Returns
-
module:qm.StreamAggr
B Self.
onAdd(rec[, Caller]) → module:qm.StreamAggr
Executes the function when a new record is put in store. For use example see module:qm.StreamAggr constructor example.
Parameters
Name | Type | Optional | Description |
---|---|---|---|
rec |
|
The record given to the stream aggregator. |
|
Caller |
Yes |
Caller stream aggregate. |
- Returns
-
module:qm.StreamAggr
B Self. Values in the stream aggregator are changed as defined in the inner onAdd function.
onDelete(rec[, Caller]) → module:qm.StreamAggr
Executes the function when a record in the store is deleted. For use example see module:qm.StreamAggr constructor example.
Parameters
Name | Type | Optional | Description |
---|---|---|---|
rec |
|
The deleted record given to the stream aggregator. |
|
Caller |
Yes |
Caller stream aggregate. |
- Returns
-
module:qm.StreamAggr
B Self. The values in the stream aggregator are changed as defined in the inner onDelete function.
onStep([Caller]) → module:qm.StreamAggr
Executes the function that updates the aggregate. For use example see module:qm.StreamAggr constructor example.
Parameter
Name | Type | Optional | Description |
---|---|---|---|
Caller |
Yes |
Caller stream aggregate. |
- Returns
-
module:qm.StreamAggr
B Self. Values in the stream aggregator are changed as defined in the inner onTime function.
onTime(ts[, Caller]) → module:qm.StreamAggr
Executes the function that updates the aggregate at a given timestamp. For use example see module:qm.StreamAggr constructor example.
Parameters
Name | Type | Optional | Description |
---|---|---|---|
ts |
TmMsec |
|
Timestamp in milliseconds. |
Caller |
Yes |
Caller stream aggregate. |
- Returns
-
module:qm.StreamAggr
B Self. Values in the stream aggregator are changed as defined in the inner onTime function.
onUpdate(rec[, Caller]) → module:qm.StreamAggr
Executes the function when a record in the store is updated. For use example see module:qm.StreamAggr constructor example.
Parameters
Name | Type | Optional | Description |
---|---|---|---|
rec |
module:qmRecord |
|
The updated record given to the stream aggregator. |
Caller |
Yes |
Caller stream aggregate. |
- Returns
-
module:qm.StreamAggr
B Self. Values in the stream aggregator are changed as defined in the inner onUpdate function.
reset() → module:qm.StreamAggr
Resets the state of the aggregate.
Example
// import the qm module
var qm = require('qminer');
// create a base with a simple store
var base = new qm.Base({
mode: "createClean",
schema: [
{
name: "Heat",
fields: [
{ name: "Celsius", type: "float" },
{ name: "Time", type: "datetime" }
]
}]
});
// create a new time series stream aggregator for the 'Heat' store, that takes the values from the 'Celsius' field
// and the timestamp from the 'Time' field. The size of the window should be 1 day.
var timeser = {
name: 'TimeSeriesAggr',
type: 'timeSeriesWinBuf',
store: 'Heat',
timestamp: 'Time',
value: 'Celsius',
winsize: 86400000
};
var timeSeries = base.store("Heat").addStreamAggr(timeser);
// add a moving average aggregator, that is connected with the 'TimeSeriesAggr' aggregator
var ma = {
name: 'movingAverageAggr',
type: 'ma',
store: 'Heat',
inAggr: 'TimeSeriesAggr'
};
var movingAverage = base.store("Heat").addStreamAggr(ma);
// reset the moving average aggregate
movingAverage.reset();
base.close();
- Returns
-
module:qm.StreamAggr
B Self. The state has been reset.
save(fout) → module:fs.FOut
Saves the current state of the stream aggregator.
Parameter
Name | Type | Optional | Description |
---|---|---|---|
fout |
|
The output stream. |
- Returns
-
module:fs.FOut
B The output streamfout
.
saveJson([limit]) → Object
When executed it return a JSON object as defined by the user. For use example see module:qm.StreamAggr constructor example.
Parameter
Name | Type | Optional | Description |
---|---|---|---|
limit |
number |
Yes |
The meaning is specific to each type of stream aggregator. |
- Returns
-
Object
B A JSON object as defined by the user.
saveStateJson() → Object
Returns the current state of the stream aggregate as a json.
- Returns
-
Object
B JSON that represents the state.
setParams(params)
Sets one or more parameters.
Parameter
Name | Type | Optional | Description |
---|---|---|---|
params |
Object |
|
JSON representation of the parameters |