JavaScript*¶
支持运行原生的 JavaScript 脚本,实现自定义的业务逻辑。与旧数据格式不兼容,即无法按 ModelId::PointId
进行数据过滤。
配置详情¶
该算子的配置包括 General,Basic,Input/Output,和 JavaScript 的详细信息,各字段的配置如下:
General¶
名称 |
是否必须 |
描述 |
---|---|---|
Name |
Yes |
算子名称 |
Description |
No |
算子描述 |
Stage Library |
Yes |
算子所属的库 |
Required Fields |
No |
数据必须包含的字段,如果未包含指定字段,则 record 将被过滤掉 |
Preconditions |
No |
数据必须满足的前提条件,如果不满足指定条件,则 record 将被过滤掉。例如: |
On Record Error |
Yes |
对错误数据的处理方式,可选:
|
Basic¶
名称 |
是否必须 |
描述 |
---|---|---|
IO Mapping Method |
Yes |
选择数据输入点和数据输出点的对应关系。可选:
|
Quality Filter |
No |
根据数据质量过滤处理数据,只有符合质量条件的 record 才会进行此次处理 |
Input/Output¶
名称 |
是否必须 |
描述 |
---|---|---|
Input Measurement |
Yes |
数据输入点,输入数据的 MeasurementId 必须匹配输入点,才能够进入后续计算。 |
Output Measurement |
No |
数据输出点,经过 JavaScript 脚本后的输出数据的 MeasurementId 必须匹配输出点,才能够作为真正的输出 record。 |
JavaScript¶
名称 |
是否必须 |
描述 |
---|---|---|
Script |
Yes |
编写自定义 JavaScript 脚本。其中 records 代表所有经过选中的点并经过质量控制后,流入的数据。 |
输出结果¶
运行自定义 JavaScript 脚本后,该算子的输出结果包含在 attr
结构体中。
脚本开发指南¶
Java Script 中,若涉及 Long 类型数值的计算,如: 加减乘除/比较/…,需先使用 parseFloat()
将其转换成 Float 类型,再进行操作。示例:
record.value['time'] = parseInt(parseFloat(record.value['time'])/60000) * 60000;
/**
* Available constants:
* They are to assign a type to a field with a value null.
* NULL_BOOLEAN, NULL_CHAR, NULL_BYTE, NULL_SHORT, NULL_INTEGER, NULL_LONG
* NULL_FLOATNULL_DOUBLE, NULL_DATE, NULL_DATETIME, NULL_TIME, NULL_DECIMAL
* NULL_BYTE_ARRAY, NULL_STRING, NULL_LIST, NULL_MAP
*
* Available Objects:
*
* records: an array of records to process, depending on the JavaScript processor
* processing mode it may have 1 record or all the records in the batch.
*
* state: a dict that is preserved between invocations of this script.
* Useful for caching bits of data e.g. counters.
*
* log.<loglevel>(msg, obj...): use instead of print to send log messages to the log4j log instead of stdout.
* loglevel is any log4j level: e.g. info, error, warn, trace.
*
* output.write(record): writes a record to processor output
*
* error.write(record, message): sends a record to error
*
* sdcFunctions.getFieldNull(Record, 'field path'): Receive a constant defined above
* to check if the field is typed field with value null
* sdcFunctions.createRecord(String recordId): Creates a new record.
* Pass a recordId to uniquely identify the record and include enough information to track down the record source.
* sdcFunctions.createMap(boolean listMap): Create a map for use as a field in a record.
* Pass true to this function to create a list map (ordered map)
*
* sdcFunctions.createEvent(String type, int version): Creates a new event.
* Create new empty event with standard headers.
* sdcFunctions.toEvent(Record): Send event to event stream
* Only events created with sdcFunctions.createEvent are supported.
* sdcFunctions.isPreview(): Determine if pipeline is in preview mode.
*
* sdcFunctions.dateTimeHelper.convertZonedDateTimeToTimestampInMilliseconds(String zonedDateTime, String
* dateTimeFormatterPattern): Convert zoned
* date time to timestamp in milliseconds.
* Example:
* var timestamp = sdcFunctions.dateTimeHelper.convertZonedDateTimeToTimestampInMilliseconds("2021-09-03
* 05:00:00.000
* America/Chicago","yyy-MM-ddHH:mm:ss.SSS [VV]");
* // expected timestamp = 1599127200000
*
* sdcFunctions.dateTimeHelper.convertTimestampInMillisecondsToZonedDateTime(long timestamp, String
* dateTimeFormatterPattern, String timezoneId):
* Convert timestamp in milliseconds to zoned date time.
* Example:
* // var timestamp =
* sdcFunctions.dateTimeHelper.convertTimestampInMillisecondsToZonedDateTime(1599127200000, 'yyy-MM-dd
* HH:mm:ss.SSS [VV]',
* 'America/Chicago');
* // expected dateTime = '2020-09-03 05:00:00.000 America/Chicago'
*
* enosFunc.mcMin(Record,String measurementID): Function for getting the minimum value of the measuring point;
* enosFunc.mcMax(Record,String measurementID): Function for getting the maximum value of the measuring point;
* enosFunc.mcSum(Record,String measurementID): Function for getting the sum of the values of the measuring point;
* enosFunc.mcMean(Record,String measurementID): Function for getting the average of the values of the measuring point;
* enosFunc.mcCov(Record,String measurementID): Function for getting the dispersion rate of the values of the measuring point;
* enosFunc.input(Record,String measurementID): Function for getting the value of the measuring point (supported data types are int、long、short、decimal、boolean、float、double);
*
* Available Record Header Variables:n *
* record.attributes: a map of record header attributes.
*
* record.<header name>: get the value of 'header name'.
*/
// Sample JavaScript code
function generateOutput(record, outputMeasurementId, outputValue) {
var output_record = sdcFunctions.createRecord(true);
output_record.value = record.value;
output_record.value['pointId'] = record.value['pointId'];
output_record.value['value'] = outputValue;
output_record.value['attr'] = record.value['attr'];
output_record.value['assetId'] = record.value['assetId']
output_record.value['modelIdPath'] = record.value['modelIdPath']
output_record.value['measurementId'] = outputMeasurementId
output.write(output_record);
}
for(var i = 0; i < records.length; i++) {
try {
// Change record root field value to a STRING value
//records[i].value = 'Hello ' + i;
// Change record root field value to a MAP value and create an entry
//records[i].value = { V : 'Hello' };
// Access a MAP entry
//records[i].value.X = records[i].value['V'] + ' World';
// Modify a MAP entry
//records[i].value.V = 5;
// Create an ARRAY entry
//records[i].value.A = ['Element 1', 'Element 2'];
// Access a Array entry
//records[i].value.B = records[i].value['A'][0];
// Modify an existing ARRAY entry
//records[i].value.A[0] = 100;
// Assign a integer type to a field and value null
// records[i].value.null_int = NULL_INTEGER
// Check if the field is NULL_INTEGER. If so, assign a value
// if(sdcFunctions.getFieldNull(records[i], '/null_int') == NULL_INTEGER)
// records[i].value.null_int = 123
// Create a new record with map field
// var newRecord = sdcFunctions.createRecord(records[i].sourceId + ':newRecordId');
// newRecord.value = {'field1' : 'val1', 'field2' : 'val2'};
// output.write(newRecord);
// Create a new map and add it to the original record
// var newMap = sdcFunctions.createMap(true);
// newMap['key'] = 'value';
// records[i].value['b'] = newMap;
//Applies if the source uses WHOLE_FILE as data format
//var input_stream = record.value['fileRef'].getInputStream();
//try {
//input_stream.read(); //Process the input stream
//} finally{
//input_stream.close()
//}
// Modify a header attribute entry
// records[i].attributes['name'] = records[i].attributes['first_name'] + ' ' + records[i].attributes['last_name'] //
// Get a record header with field names ex. get sourceId and errorCode
// var sourceId = records[i].sourceId
// var errorCode = ''
// if(records[i].errorCode) {
// errorCode = records[i].errorCode
// }
// var record = records[i];
// var targetMeasurementId = 'measurementId';
// var minValue = enosFunc.mcMin(record, targetMeasurementId);
// var maxValue = enosFunc.mcMax(record, targetMeasurementId);
// var sumValue = enosFunc.mcSum(record, targetMeasurementId);
// var avgValue = enosFunc.mcMean(record, targetMeasurementId);
// var covValue = enosFunc.mcCov(record, targetMeasurementId);
// var inputValue = enosFunc.input(record, targetMeasurementId);
// Write record to processor output
output.write(records[i]);
}
catch (e) {
// trace the exception
error.trace(e);
// Send record to error
error.write(records[i], e);
}
}