JavaScript

支持运行原生的Java脚本,实现自定义的业务逻辑。

配置详情

该算子的配置包括 GeneralBasicInput/Output,和 JavaScript 的详细信息,各字段的配置如下:

General

名称

是否必须

描述

Name

Yes

算子名称

Description

No

算子描述

Stage Library

Yes

算子所属的库

Required Fields

No

数据必须包含的字段,如果未包含指定字段,则record将被过滤掉

Preconditions

No

数据必须满足的前提条件,如果不满足指定条件,则record将被过滤掉。例如:${record:value('/value') > 0}。有关EL语句的使用方法,参考 Expression Language

On Record Error

Yes

对错误数据的处理方式,可选:

  • Discard:直接丢弃

  • Send to Error:发送至错误中心

  • Stop Pipeline:停止流任务运行

Basic

名称

是否必须

描述

IO Mapping Method

Yes

选择数据输入点和数据输出点的对应关系。可选:

  • 1:1:每行Input/Output参数配置中,一个数据输入点对应一个数据输出点

  • M:1:Input/Output参数配置中,可配置任意数量的数据输入点和数据输出点

Quality Filter

No

根据数据质量过滤处理数据,只有符合质量条件的record才会进行此次处理

Input/Output

名称

是否必须

描述

Input Point

Yes

数据输入点,格式为:{模型标识}::{测点标识},输入数据的 modelIdPathpointId 必须匹配输入点,才能够进入后续计算。

Output Point

No

数据输出点,格式为:{模型标识}::{测点标识},经过Java脚本后的输出数据的 modelIdPathpointId 必须匹配输出点,才能够作为真正的输出record。

JavaScript

名称

是否必须

描述

Script

Yes

编写自定义Java脚本。其中records代表所有经过选中的点并经过质量控制后,流入的数据。

输出结果

运行自定义Java脚本后,该算子的输出结果包含在 attr 结构体中。

脚本开发指南

Java Script中,若涉及Long类型数值的计算,如: 加减乘除/比较/…,需先使用 parseFloat() 将其转换成Float类型,再进行操作。示例:

record.value['time'] = parseInt(parseFloat(record.value['time'])/60000) * 60000;


/**
 * Available constants:
 *   They are to assign a type to a field with a value null.
 *   NULL_BOOLEAN, NULL_CHAR, NULL_BYTE, NULL_SHORT, NULL_INTEGER, NULL_LONG
 *   NULL_FLOATNULL_DOUBLE, NULL_DATE, NULL_DATETIME, NULL_TIME, NULL_DECIMAL
 *   NULL_BYTE_ARRAY, NULL_STRING, NULL_LIST, NULL_MAP
 *
 * Available Objects:
 *
 *  records: an array of records to process, depending on the JavaScript processor
 *           processing mode it may have 1 record or all the records in the batch.
 *
 *  state: a dict that is preserved between invocations of this script.
 *        Useful for caching bits of data e.g. counters.
 *
 *  log.<loglevel>(msg, obj...): use instead of print to send log messages to the log4j log instead of stdout.
 *                               loglevel is any log4j level: e.g. info, error, warn, trace.
 *
 *  output.write(record): writes a record to processor output
 *
 *  error.write(record, message): sends a record to error
 *
 *  sdcFunctions.getFieldNull(Record, 'field path'): Receive a constant defined above
 *                            to check if the field is typed field with value null
 *  sdcFunctions.createRecord(String recordId): Creates a new record.
 *                            Pass a recordId to uniquely identify the record and include enough information to track down the record source.
 *  sdcFunctions.createMap(boolean listMap): Create a map for use as a field in a record.
 *                            Pass true to this function to create a list map (ordered map)
 *
 *  sdcFunctions.createEvent(String type, int version): Creates a new event.
 *                            Create new empty event with standard headers.
 *  sdcFunctions.toEvent(Record): Send event to event stream
 *                            Only events created with sdcFunctions.createEvent are supported.
 *  sdcFunctions.isPreview(): Determine if pipeline is in preview mode.
 *
 * enosFunc.mcMin(Record,String pointID): Function for getting the minimum value of the measuring point;

 * enosFunc.mcMax(Record,String pointID): Function for getting the maximum value of the measuring point;

 * enosFunc.mcSum(Record,String pointID): Function for getting the sum of the values of the measuring point;

 * enosFunc.mcMean(Record,String pointID): Function for getting the average of the values of the measuring point;

 * enosFunc.mcCov(Record,String pointID): Function for getting the dispersion rate of the values of the measuring point;

 * enosFunc.input(Record,String pointID): Function for getting the value of the measuring point (supported data types are int、long、short、decimal、boolean、float、double);
 *
 * Available Record Header Variables:n *
 *  record.attributes: a map of record header attributes.
 *
 *  record.<header name>: get the value of 'header name'.
 */

// Sample JavaScript code

function generateOutput(record, outputPointId, outputValue) {
    var output_record = sdcFunctions.createRecord(true);
    output_record.value = record.value;
    output_record.value['pointId'] = outputPointId;
    output_record.value['value'] = outputValue;
    output_record.value['attr'] = record.value['attr'];
    output_record.value['assetId']=record.value['assetId']
    output_record.value['modelIdPath']=record.value['modelIdPath']
    output.write(output_record);
}

for(var i = 0; i < records.length; i++) {
  try {
    // Change record root field value to a STRING value
    //records[i].value = 'Hello ' + i;


    // Change record root field value to a MAP value and create an entry
    //records[i].value = { V : 'Hello' };

    // Access a MAP entry
    //records[i].value.X = records[i].value['V'] + ' World';

    // Modify a MAP entry
    //records[i].value.V = 5;

    // Create an ARRAY entry
    //records[i].value.A = ['Element 1', 'Element 2'];

    // Access a Array entry
    //records[i].value.B = records[i].value['A'][0];

    // Modify an existing ARRAY entry
    //records[i].value.A[0] = 100;

    // Assign a integer type to a field and value null
    // records[i].value.null_int = NULL_INTEGER

    // Check if the field is NULL_INTEGER. If so, assign a value
    // if(sdcFunctions.getFieldNull(records[i], '/null_int') == NULL_INTEGER)
    //    records[i].value.null_int = 123

    // Create a new record with map field
    // var newRecord = sdcFunctions.createRecord(records[i].sourceId + ':newRecordId');
    // newRecord.value = {'field1' : 'val1', 'field2' : 'val2'};
    // output.write(newRecord);
    // Create a new map and add it to the original record
    // var newMap = sdcFunctions.createMap(true);
    // newMap['key'] = 'value';
    // records[i].value['b'] = newMap;

    //Applies if the source uses WHOLE_FILE as data format
    //var input_stream = record.value['fileRef'].getInputStream();
    //try {
      //input_stream.read(); //Process the input stream
    //} finally{
      //input_stream.close()
    //}

    // Modify a header attribute entry
    // records[i].attributes['name'] = records[i].attributes['first_name'] + ' ' + records[i].attributes['last_name']    //

    // Get a record header with field names ex. get sourceId and errorCode
    // var sourceId = records[i].sourceId
    // var errorCode = ''
    // if(records[i].errorCode) {
    //     errorCode = records[i].errorCode
    // }

    // var record = records[i];
    // var targetPoint = 'pointId';
    // var minValue = enosFunc.mcMin(record,targetPoint);
    // var maxValue = enosFunc.mcMax(record,targetPoint);
    // var sumValue = enosFunc.mcSum(record,targetPoint);
    // var avgValue = enosFunc.mcMean(record,targetPoint);
    // var covValue = enosFunc.mcCov(record,targetPoint);
    // var inputValue = enosFunc.input(record,targetPoint);


    // Write record to processor output
    output.write(records[i]);
  } catch (e) {
    // Send record to error
    error.write(records[i], e);
  }
}