Unit 4. Developing a Stream Data Processing Job

EnOS Stream Analytics service provides a user-friendly UI for designing stream data processing jobs (pipelines) with StreamSets operators. Developers can quickly configure a pipeline by adding operators (stages) to the pipeline, thus completing data ingestion, filtering, processing, and storage tasks without programming.


In this unit, we will develop a stream data processing job with StreamSets operators to calculate the daily energy production of wind turbines, daily energy production of wind farms, and carbon reduction data of wind farms. Detailed scenario of this lab is as follows:

  1. Simulate the real-time energy production meter reading data of each wind turbine.

  2. Calculate the daily energy production of each wind turbine using the simulated meter reading data.

  3. Calculate the daily energy production of each wind farm using the daily energy production of all wind turbines.

  4. Calculate the daily carbon reduction of each wind farm using the daily energy production of the wind farm.


To meet the requirement of the above business scenario, we need to use the following StreamSets operators:


Operator

Description

EDH Kafka Consumer User (Data Source)

Getting complete data records from Kafka

Point Selector

Specifying data records of the eos_turbine::ammeter measuring point as the input data

Last Record Appender

Appending the last record of the eos_turbine::ammeter measuring point of a wind turbine to the attr field of the current record.

Cumulant Decomposer

Getting the delta value between the current record and the last record in the attr field, for calculating energy production data in the time interval.

Fixed Time Window Aggregator

Calculating the daily energy production data of a wind turbine by summing up the energy production data in all time intervals of the day. The output results will be used for further calculation and sent to Kafka as well.

TSL Parent Asset Lookup

Querying the parent node (wind farm) of a wind turbine on the asset tree.

Record Generator

Generating a new record for triggering the calculation of wind farm energy production by a 1-minute frequency.

Python Evaluator 1

Appending parent node information to the generated triggering point and tagging the triggering point with Python scripts.

Latest Record Merger

Merging the triggering point and daily energy production data of a wind turbine by parent node information.

Python Evaluator 2

Calculating the daily energy production data of a wind farm by summing up the daily energy production data of all wind turbines with Python scripts. The output results will be used for further calculation and sent to Kafka as well.

Internal HTTP Client

Getting the carbon reduction parameter by calling the EnOS Get Asset API.

Python Evaluator 3

Calculating the daily carbon reduction of a wind farm with Python scripts. The output results will be sent to Kafka.

EDH Kafka Producer User (Data Destination)

Sending all the output results to Kafka.


The business scenario is as depicted in the following figure:


../../_images/scenario.png


For detailed information about the StreamSets operators, see Calculator Library 0.1.0 Documentation.

Creating a StreamSets pipeline

Take the following steps to create a StreamSets pipeline:

  1. Download the StreamSets pipeline configuration template from https://support.envisioniot.com/docs/data-asset/zh_CN/2.1.0/_static/streamsets_pipeline_demo.json (right click the link and save the streamsets_pipeline_demo.json file to a local directory).

  2. Enter the name and description of the stream processing job.

  3. From the Template drop-down list, select Origin Pipeline.

  4. From the Operator Version drop-down list, select the installed StreamSets calculator library version.

  5. For Message Channel, select the source of data to be processed. For this tutorial, select Real-Time.

  6. Click OK to create the stream processing job with the basic settings above. See the following example:


../../_images/creating_streamsets_pipeline1.png

Adding operators to the pipeline

Now we can add the needed operators to the pipeline and connect the operators with arrows to form the pipeline.

  1. Select the arrow between the EDH Kafka Consumer User and EDH Kafka Producer User operators and click the Delete icon to remove the connection.

    ../../_images/disconnecting_source_destination.png
  2. Click the Stage Library icon in the upper right corner of the page, click the Point Selector operator to add it to the pipeline canvas.

  3. Connect the output point of the EDH Kafka Consumer User operator to the input point of the Point Selector operator.

    ../../_images/connecting_source_selector.png
  4. Repeat steps 2 and 3 to add the remaining operators to the pipeline and connect them by the order shown in the following figure.

    ../../_images/added_all_operators.png
  5. Click the Auto Arrange icon auto_arrange_icon to align the display of operators in the pipeline.

  6. Click the Save in the toolbar to save the changes. Every time you update the configuration of the pipeline, remember to save the changes until the pipeline is ready to be published.

Configuring operator parameters

After the pipeline is created, we can now configure the parameters for the added operators. Select one of the operators and complete the configuration of each tab. For the General and Basic tabs, use the default configuration. For the other tabs, follow the instructions below for each operator.

Point Selector

Complete the configuration of Input/Output with the following settings:


Field

Value

Description

Input Point

eos_turbine::ammeter

Getting the ammeter point data from Kafka as the input


See the following example:


../../_images/point_selector_config.png

Last Record Appender

Complete the configuration of Input/Output with the following settings:


Field

Value

Description

Input Point

eos_turbine::ammeter

Receiving the wind turbine energy production data as input

Conditions

*

Always replacing the current record with the last record.

Output Point

eos_turbine::ammeter_withLast

Receiving the output result, which contains the current and last record of wind turbine energy production data.


See the following example:


../../_images/last_record_appender_config.png


Cumulant Decomposer

Complete the configuration of Input/Output with the following settings:


Field

Value

Description

Input Point

eos_turbine::ammeter_withLast

Receiving the current and last record of wind turbine energy production data as input

Scale Type

1

Using fixed scale type for the electric energy meter.

Scale

100

Specifying the scale value of the electric energy meter. Options are:

Slope Type

1

Using fixed slope threshold.

Min Slope

0

Specifying the lower limit of the slope threshold.

Max Slope

100000

Specifying the upper limit of the slope threshold.

Output Point

eos_turbine::delta_production

Receiving the output result, which contains the calculated energy production data of wind turbines in the time interval.


See the following example:


../../_images/cumulant_decomposer_config.png

Fixed Time Window Aggregator

Complete the configuration of TriggerConfig with the following settings:


Field

Value

Description

Latency (Minute)

0

Disabling data latency setting.

Early Trigger

Enable

Enabling early output of intermediate results before the time window is closed.

Early Trigger Type

By Fixed Frequency

Generating early output of intermediate results by fixed frequency.

Early Trigger Frequency (Minute)

1

Generating early output by a 1-minute frequency


See the following example:


../../_images/fixed_time_window_aggregator_config_1.png


Complete the configuration of Input/Output with the following settings:


Field

Value

Description

Input Point

eos_turbine::delta_production

Receiving the calculated energy production data of wind turbines in all time intervals as input

Fixed Window Size

1

Specifying the duration for the fixed time window.

Fixed Window Unit

day

Selecting the unit for the fixed time window. For this tutorial, we will calculate the daily energy production data of wind turbines.

Aggregator Policy

sum

Summing up the energy production data of wind turbines in all time intervals of 1 day.

Output Point

eos_turbine::production_daily

Receiving the output result, which contains the calculated daily energy production data of wind turbines. The output results will be used for further calculation and sent to Kafka as well.


See the following example:


../../_images/fixed_time_window_aggregator_config_2.png

TSL Parent Asset Lookup

Complete the configuration of Input/Output with the following settings:


Field

Value

Description

Input Point

eos_turbine::production_daily

Receiving the calculated daily energy production data of wind turbines as input.

Output Point

eos_turbine::production_daily_withparent

Receiving the output result, which contains the calculated daily energy production data of wind turbines and the wind farm information.


See the following example:


../../_images/tsl_parent_asset_lookup_config_1.png


Complete the configuration of Criteria with the following settings:


Field

Value

Description

Tree Tag

eos_tree::true

Specifying the tag of the eos_tree asset tree that is created in Unit 1 .

Attribute

All

Querying wind farm metadata by all asset attribute keys.

Tag

None

Do not query parent asset metadata by asset tags.

Extra

None

Do not query parent asset metadata by extra information.


See the following example:


../../_images/tsl_parent_asset_lookup_config_2.png

Record Generator

Complete the configuration of Basic with the following settings:


Field

Value

Description

Trigger Type

By Fixed Frequency

Generating new records by fixed frequency.

Query Frequency

1 Minute

Generating new records by 1-minute frequency.


See the following example:


../../_images/record_generator_config_1.png


Complete the configuration of Input/Output with the following settings:


Field

Value

Description

Generate Type

ByModelIDs

Generating new records by model IDs.

Output Point

eos_site::production_day_trigger

Receiving the output result, which contains the calculated daily energy production data of wind turbines and the generated data record for triggering calculation of wind farm energy production data.


See the following example:


../../_images/record_generator_config_2.png


Complete the configuration of Record Generate with the following settings:


Field

Value

Description

Data Source

Generate New Point

Creating a point and generating data for the point by the specified methods.

Date Format

SimpleDateFormat

Using simple date format for start time

StartTime

2019-08-26T00:00:00+08:00

Specifying the time stamp for the start time of the new point.

Time Interval

1 Minute

Specifying the time interval for generating new point data.

Value Generator

Quickly Random Number(Double)

Generating point data randomly.


See the following example:


../../_images/record_generator_config_3.png

Python Evaluator 1

Complete the configuration of Input/Output with the following settings:


Field

Value

Description

Input Point

eos_site::production_day_trigger

Receiving the calculated daily energy production data of wind turbines and the generated data record for triggering calculation of wind farm energy production data as input.

Output Point

eos_site::production_day_trigger_withtag

Receiving the output result, which contains the calculated daily energy production data of wind turbines and the triggering points appended with wind farm information and tags.


See the following example:


../../_images/python_evaluator1_config.png


Under the Script tab, enter the following script in the Python Script field for appending wind farm information to the generated triggering point and tagging the triggering point:

import time

for record in records:
  try:
    parentRecord = {'id':record.value['assetId']}
    record.value['attr']=dict()
    record.value['attr']['tslParentLookup']=[]
    record.value['attr']['tslParentLookup'].append (parentRecord)
    record.value['attr']['triggerTag'] = True
    record.value['time'] = long(round(time.time() * 1000))
    record.value['pointId']='production_day_trigger_withtag'
    output.write(record)
  except Exception as e:
    # Send record to error
    error.write(record, str(e))

Latest Record Merger

Complete the configuration of Input/Output with the following settings:


Field

Value

Description

Input Point

eos_turbine::production_daily_withparent

Receiving the calculated daily energy production data of wind turbines and the wind farm information as input.

Input Point

eos_site::production_day_trigger_withtag

Receiving the calculated daily energy production data of wind turbines and the triggering points appended with wind farm information and tags as input. Specifying this point as the trigger of processing.

Output Point

eos_site::turbine_merger

Receiving the output result, which contains the merged data records of energy production data of wind turbines according to wind farm attribute.


See the following example:


../../_images/latest_record_merger_config_1.png


Complete the configuration of MergeConfig with the following settings:


Field

Value

Description

Merged By Expression

${record:value(‘/attr/tslParentLookup[0]/id’)}

Specifying the expression for generating tags that are used to merge the latest records.

Cache Expire Time (Minute)

1440

Specifying the cache expiring time after the tags are not updated.


See the following example:


../../_images/latest_record_merger_config_2.png

Python Evaluator 2

Complete the configuration of Input/Output with the following settings:


Field

Value

Description

Input Point

eos_site::turbine_merger

Receiving the merged data records of energy production data of wind turbines according to wind farm attribute as input.

Output Point

eos_site::production_day

Receiving the output result, which contains the calculated daily energy production data of wind farms. The output results will be used for further calculation and sent to Kafka as well.


See the following example:


../../_images/python_evaluator2_config.png


Under the Script tab, enter the following script in the Python Script field for calculating the daily energy production data of wind farms:

from decimal import Decimal


for record in records:
    try:
        mergedRecords = record.value['attr']['latestRecordMerger']['mergedRecords']
        triggerRecords = []
        calculateRecords = []
        for mergedRecord in mergedRecords:
            isTriggerPoint = mergedRecord['attr'].has_key('triggerTag') and mergedRecord['attr']['triggerTag']
            if isTriggerPoint:
                triggerRecords.append(mergedRecord)
            else:
                calculateRecords.append(mergedRecord)

        if len(triggerRecords) == 1:
            v_sum = '0.0'
            for calculateRecord in calculateRecords:
                v_sum = str(Decimal(str(calculateRecord['value'])) + Decimal(v_sum))
            record.value['value'] = float(v_sum)

            record.value['time'] = triggerRecords[0]['time']/60000*60000
            record.value['assetId'] = triggerRecords[0]['assetId']
            record.value['modelId'] = triggerRecords[0]['modelId']
            record.value['modelIdPath'] = triggerRecords[0]['modelIdPath']
            record.value['pointId'] = 'production_day'
            record.value['attr'] = dict()
            output.write(record)
        else:
            error.write(record, "invalid record")
    except Exception as e:
        # Send record to error
        error.write(record, str(e))

Internal HTTP Client

Complete the configuration of Input/Output with the following settings:


Field

Value

Description

Input Point

eos_site::production_day

Receiving the daily energy production data of wind farms as input.

Output Point

eos_site::with_carbonReduction

Receiving the output result, which contains the daily energy production data of wind farms and the queried carbon reduction parameter.


See the following example:


../../_images/internal_http_client_config_1.png


Complete the configuration of Request with the following settings:


Field

Value

Description

Request Method

Get

Selecting the API request method. For the Get Asset API, select the Get method.

Request URL

https://{enos-api-gateway}/asset-service/v2.1/assets?action=get

Specifying the API request URL. To get the API gateway URL, go to EnOS Management Console > Help > Environment Information > API Gateway.

Param Key

assetId

Specifying the following values for the assetId parameter:

  • Position: URL Query

  • URLParam Type: ByField

  • Param Value: /assetId

Param Key

orgId

Specifying the following values for the orgId parameter:

  • Position: URL Query

  • URLParam Type: ByField

  • Param Value: /orgId

Access Key

access_key

Specifying the access key of the application which is used for API request authentication. For detailed information, see Registering and Managing Applications .

Secret Key

YourSecretKey

Specifying the secret key of the application.


See the following example:


../../_images/internal_http_client_config_2.png

Python Evaluator 3

Complete the configuration of Input/Output with the following settings:


Field

Value

Description

Input Point

eos_site::with_carbonReduction

Receiving the daily energy production data of wind farms and the queried carbon reduction parameter as input.

Output Point

eos_site::carbon.reduction.daily

Receiving the output result, which contains the calculated daily carbon reduction data of wind farms. The output results will be sent to Kafka.


See the following example:


../../_images/python_evaluator3_config.png


Under the Script tab, enter the following script in the Python Script field for calculating the daily carbon reduction data of wind farms:

import json

for record in records:
  try:
    record.value['pointId'] = 'carbon.reduction.daily'
    production = record.value['value']
    json_response = json.loads(record.value['attr']['httpClient']['response'])
    carbon= json_response['data']['attributes']['carbon.reduction.param']
    record.value['value']= production/carbon

    output.write(record)
  except Exception as e:
    # Send record to error
    error.write(record, str(e))

Validating and running the pipeline

When the configuration of the operators is completed, we can now validate the configuration and start running the pipeline.

  1. Save the configuration of the pipeline.

  2. Click the Validate icon validate_icon in the toolbar to verify the configuration of all the operators.

    ../../_images/validating_pipeline.png
  3. If the validation fails, update the configuration of the operators accordingly.

  4. If the validation is successful, click the Release icon in the toolbar to publish the pipeline.

  5. Run and monitor the running status and results of the pipeline in the Stream Operation page. For detailed steps, see Maintaining Stream Processing Jobs.

Next Unit

Viewing Stored Data with Data Insights