消费订阅数据(EnOS Cloud 用户)


数据订阅任务启动运行之后,可以使用数据订阅 SDK 开发应用,消费已订阅的数据。


本文介绍使用数据订阅 SDK 的安装和消费订阅数据的代码示例。

EnOS Cloud 支持以下数据订阅 SDK:

  • Java SDK

  • Python SDK

  • .NET SDK

对 EnOS SDK 的详细介绍、最新版本及下载地址,访问 EnOS SDKs 和工具

使用 Java SDK

数据订阅 Java SDK 的安装和消费订阅数据的代码示例如下:

安装数据订阅 Java SDK

  1. 访问数据订阅 Java SDK 的 Maven 仓库,获取依赖信息:EnOS Cloud Maven 仓库

  2. 在 Java 开发项目文件中添加如下 Maven 依赖,安装数据订阅 SDK:

    <dependency>
      <groupId>com.envisioniot</groupId>
      <artifactId>subscription-client</artifactId>
      <version>5.0.2</version>
    </dependency>
    <dependency>
      <groupId>com.envisioniot</groupId>
      <artifactId>enos-subscribe-impl</artifactId>
      <version>5.0.2</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.16</version>
    </dependency>
    

消费实时数据代码示例

以下示例为使用指定的 Consumer Group 消费订阅的资产实时数据。如果订阅的数据量较大,可使用同一 Consumer Group 的2个 Consumer Client 同时消费订阅数据,提高消费数据的效率。

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.data.IDataHandler;
import com.envisioniot.sub.client.data.IDataService;
import com.envisioniot.sub.common.model.dto.StreamMessage;

public class DataServiceDemo {
    public static void main(String[] args) throws Exception {
        // 订阅服务地址,根据EnOS环境填写
        String host = "subscription_server";
        // 订阅服务端口,根据EnOS环境填写
        int port = 9001;
        // 应用身份验证,在应用创建时生成
        String accessKey = "access_key";
        // 应用身份验证,在应用创建时生成
        String secretKey = "secret_key";

        // 订阅ID,创建订阅任务时指定或生成
        String subId = "subscription_id";

        // 订阅分组,相关概念见订阅SDK参考
        String consumerGroup = "consumer_group";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey);

        // 根据订阅类型获取相应的service,本示例获取实时数据service
        IDataService dataService = eosClient.getDataService();

        // 在启动订阅之前需要创建订阅数据处理函数
        IDataHandler dataHandler = new IDataHandler(){
            public void dataRead(StreamMessage message) {
                System.out.println(message);
            }
        };

        // 调用subscribe函数创建订阅连接,调用后订阅连接被创建
        // 同时指定订阅分组,关于订阅分组的概念见订阅SDK参考说明
        dataService.subscribe(dataHandler, subId, consumerGroup);
    }
}

备注

  • 在以上示例中, hostport 指订阅服务的地址和端口号。不同的云服务和实例的服务地址和端口号不同。请登录 EnOS 管理控制台,点击页面右上角 帮助 > 环境信息 获取对应环境的订阅服务地址和端口信息。

  • 每个 Topic Partition 数量为2,即同一个订阅Topic最多只支持2个 Consumer Client 同时消费数据。

  • 一个 Consumer 实例只能消费一个 Topic。

  • 数据在 Topic 中的存储时长默认为3天。

消费基础告警数据代码示例

以下示例为不指定 Consumer Group 消费订阅的告警引擎数据。

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;

public class AlertServiceDemo1 {
    public static void main(String[] args) throws Exception {
        // 订阅服务地址,根据EnOS环境填写
        String host = "subscription_server";
        // 订阅服务端口,根据EnOS环境填写
        int port = 9001;
        // 应用身份验证,在应用创建时生成
        String accessKey = "access_key";
        // 应用身份验证,在应用创建时生成
        String secretKey = "secret_key";

        // 订阅ID,创建订阅任务时指定或生成
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey);

        // 根据订阅类型获取相应的service,本示例获取告警service
        IAlertService alertService = eosClient.getAlertService();

        // 在启动订阅之前需要创建订阅数据处理函数
        IAlertHandler alertHandler = new IAlertHandler (){
            @Override
            public void alertRead(Alert alert) {
                System.out.println(alert);
            }
        };

        // 需要调用subscribe函数创建订阅连接,调用后订阅连接被创建
        alertService.subscribe(alertHandler, subId);

    }
}

备注

除以上代码示例为,可参考 HistoryEvent 结构体字段 中的字段及说明,编写消费基础告警引擎数据的代码。

消费高级告警数据代码示例

以下示例为不指定 Consumer Group 消费订阅的告警引擎数据。

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.advancedalert.IAdvancedAlertHandler;
import com.envisioniot.sub.client.advancedalert.IAdvancedAlertService;
import com.envisioniot.sub.common.model.Alert;

public class AlertServiceDemo2 {
    public static void main(String[] args) throws Exception {
        // 订阅服务地址,根据EnOS环境填写
        String host = "subscription_server";
        // 订阅服务端口,根据EnOS环境填写
        int port = 9001;
        // 应用身份验证,在应用创建时生成
        String accessKey = "access_key";
        // 应用身份验证,在应用创建时生成
        String secretKey = "secret_key";

        // 订阅ID,创建订阅任务时指定或生成
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey);

        // 根据订阅类型获取相应的service,本示例获取告警service
        IAdvancedAlertService alertService = eosClient.getAdvancedAlertService();

        // 在启动订阅之前需要创建订阅数据处理函数
        IAdvancedAlertHandler alertHandler = new IAdvancedAlertHandler(){
            @Override
            public void alertRead(Alert alert) {
                System.out.println(alert);
            }
        };

        // 需要调用subscribe函数创建订阅连接,调用后订阅连接被创建
        alertService.subscribe(alertHandler, subId);

    }
}

备注

除以上代码示例为,可参考 AlertVo 结构体字段 中的字段及说明,编写消费高级告警引擎数据的代码。

批量消费订阅数据示例

以下示例仅展示高级告警和实时数据的订阅类型的使用。

  • 高级告警:getAdvancedAlertService(boolean isBatch)

  • 实时数据:getDataService(boolean isBatch)

  • 在回调函数中增加:

    eosClient. getAdvancedAlertService(true).subscribe(new IAdvancedAlertHandler() {
        @Override
        public void advancedAlertReads(List<String> alerts) {
            System.out.println("get=>" + alerts);
        }
    }, subId);
    

备注

增加的回调方法为带有 s 的函数,并且回调方法中形参的类型为数组。

消费离线数据代码示例

以下示例为不指定 Consumer Group 消费订阅的资产离线数据。

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.data.IDataHandler;
import com.envisioniot.sub.client.data.IDataService;
import com.envisioniot.sub.common.model.dto.StreamMessage;

public class DataServiceDemo {
    public static void main(String[] args) throws Exception {
        // 订阅服务地址,根据EnOS环境填写
        String host = "subscription_server";
        // 订阅服务端口,根据EnOS环境填写
        int port = 9001;
        // 应用身份验证,在应用创建时生成
        String accessKey = "access_key";
        // 应用身份验证,在应用创建时生成
        String secretKey = "secret_key";

        // 订阅ID,创建订阅任务时指定或生成
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey);

        // 根据订阅类型获取相应的service,本示例获取离线数据service
        IDataService dataService = eosClient.getOfflineDataService();

        // 在启动订阅之前需要创建订阅数据处理函数
        IDataHandler dataHandler = new IDataHandler(){
            public void dataRead(StreamMessage message) {
                System.out.println(message);
            }
        };

        // 调用subscribe函数创建订阅连接,调用后订阅连接被创建
        dataService.subscribe(dataHandler, subId);
    }
}

消费事件数据代码示例

以下示例为不指定 Consumer Group 消费订阅的资产事件数据。

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;

public class EventServiceDemo1 {
    public static void main(String[] args) throws Exception {
        // 订阅服务地址,根据EnOS环境填写
        String host = "subscription_server";
        // 订阅服务端口,根据EnOS环境填写
        int port = 9001;
        // 应用身份验证,在应用创建时生成
        String accessKey = "access_key";
        // 应用身份验证,在应用创建时生成
        String secretKey = "secret_key";

        // 订阅ID,创建订阅任务时指定或生成
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey);

        // 根据订阅类型获取相应的service,本示例获取事件service
        IEventService eventService = eosClient.getEventService();

        // 在启动订阅之前需要创建订阅数据处理函数
        IEventHandler eventHandler = new IEventHandler (){
            @Override
            public void eventRead(String event) {
                System.out.println(event);
            }
        };

        // 需要调用subscribe函数创建订阅连接,调用后订阅连接被创建
        eventService.subscribe(eventHandler, subId);

    }
}

消费设备事件上报数据代码示例

以下示例为不指定 Consumer Group 消费订阅的设备事件上报数据。

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.report.IReportHandler;
import com.envisioniot.sub.client.report.IReportService;
import com.envisioniot.sub.common.model.report.Report;

public class ReportServiceDemo {

    public static void main(String[] args) throws Exception {
        // 订阅服务地址,根据EnOS环境填写
        String host = "subscription_server";
        // 订阅服务端口,根据EnOS环境填写
        int port = 9001;
        // 应用身份验证,在应用创建时生成
        String accessKey = "access_key";
        // 应用身份验证,在应用创建时生成
        String secretKey = "secret_key";

        // 订阅ID,创建订阅任务时指定或生成
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey);

        // 根据订阅类型获取相应的service,本示例获取设备事件上报service
        IReportService reportService = eosClient.getReportService();

        // 在启动订阅之前需要创建订阅数据处理函数
        IReportHandler reportHandler = new IReportHandler() {
            @Override
            public void eventRead(Report report) {
                System.out.println(report);
            }
        };

        // 需要调用subscribe函数创建订阅连接,调用后订阅连接被创建
        reportService.subscribe(reportHandler, subId);
    }
}

使用 Python SDK

数据订阅 Python SDK 的安装和消费订阅数据的代码示例如下:

安装数据订阅 Python SDK

数据订阅 Python SDK 支持 Python 2.7 和 Python 3.4 及以上版本。

  1. 安装以下依赖模块:

    • six

    • google.protobuf

    • websocket_client

  2. 访问数据订阅 Python SDK 的仓库,获取安装代码:https://github.com/EnvisionIot/enos-subscription-service-sdk-python

  3. 使用 pip 安装数据订阅 Python SDK:

pip install enos-subscribe

消费实时数据代码示例

以下示例为使用 Python SDK 消费订阅的资产实时数据代码示例:

from enos_subscribe import DataClient

if __name__ == '__main__':
    client = DataClient(host='sub-host', port='sub-port',
                        access_key='Your Access Key of this subscription',
                        access_secret='Your Access Secret of this subscription')

    client.subscribe(sub_id='Your subscription Id')

    for message in client:
        print(message)

备注

  • 在以上示例中, sub-hostsub-port 指订阅服务的地址和端口号。不同的云服务和实例的服务地址和端口号不同。请登录 EnOS 管理控制台,点击页面右上角 帮助 > 环境信息 获取对应环境的订阅服务地址和端口信息。

  • 数据在 Topic 中的存储时长默认为3天。

消费告警数据代码示例

以下示例为使用 Python SDK 消费订阅的告警引擎数据代码示例:

from enos_subscribe import AlertClient

if __name__ == '__main__':
    client = AlertClient(host='sub-host', port='sub-port',
                        access_key='Your Access Key of this subscription',
                        access_secret='Your Access Secret of this subscription')

    client.subscribe(sub_id='Your subscription Id')

    for message in client:
        print(message)

备注

除以上代码示例为,可参考 消费告警数据代码字段说明,编写消费告警引擎数据的代码。

消费离线数据代码示例

以下示例为使用 Python SDK 消费订阅的离线数据代码示例:

from enos_subscribe import OfflineClient

if __name__ == '__main__':
    client = OfflineClient(host='sub-host', port='sub-port',
                        access_key='Your Access Key of this subscription',
                        access_secret='Your Access Secret of this subscription')

    client.subscribe(sub_id='Your subscription Id')

    for message in client:
        print(message)

消费事件数据代码示例

以下示例为使用 Python SDK 消费订阅的事件数据代码:

from enos_subscribe import EventClient

if __name__ == '__main__':
    client = EventClient(host='sub-host', port='sub-port',
                        access_key='Your Access Key of this subscription',
                        access_secret='Your Access Secret of this subscription')

    client.subscribe(sub_id='Your subscription Id')

    for message in client:
        print(message)

使用 .NET SDK

数据订阅 .NET SDK 的安装和消费订阅数据的代码示例如下:

安装数据订阅 .NET SDK

数据订阅 .NET SDK 支持 .NET Framework 4.7 及以上版本。


你可通过程序包管理器控制台或使用 SDK 源代码安装最新版的数据订阅 .NET SDK。

通过程序包管理器控制台安装

使用以下命令通过程序包管理器控制台安装数据订阅 .NET SDK:

Install-Package enos_subscription -Version 2.4.2

使用源代码安装

  1. 访问数据订阅 .NET SDK 的仓库,获取安装代码:https://github.com/EnvisionIot/enos-subscription-service-sdk-dotnet

  2. 在 Visual Studio 中,将源代码项目添加到解决方案中,并将其添加为项目的引用。

安装依赖模块

数据订阅 .NET SDK 需要以下依赖模块:

消费实时数据代码示例

以下示例为使用 .NET SDK 消费订阅的资产实时数据代码示例:

using enos_subscription.client;

using (DataClient client = new DataClient("sub-host", "sub-port", "Your Access Key of this subscription", "Your Access Secret of this subscription")) {

    client.subscribe("Your subscription Id", "Your consumer group");

    foreach (var message in client.GetMessages())
    {
       //do something with the message
    }
}

备注

  • 在以上示例中, sub-hostsub-port 指订阅服务的地址和端口号。不同的云服务和实例的服务地址和端口号不同。请登录 EnOS 管理控制台,点击页面右上角 帮助 > 环境信息 获取对应环境的订阅服务地址和端口信息。

  • 数据在 Topic 中的存储时长默认为3天。

消费告警数据代码示例

以下示例为使用 .NET SDK 消费订阅的告警引擎数据代码示例:

using enos_subscription.client;

using (AlertClient client = new AlertClient("sub-host", "sub-port", "Your Access Key of this subscription", "Your Access Secret of this subscription")) {

    client.subscribe("Your subscription Id", "Your consumer group");

    foreach (var message in client.GetMessages())
    {
       //do something with the message
    }
}

备注

除以上代码示例为,可参考 消费告警数据代码字段说明,编写消费告警引擎数据的代码。

消费离线数据代码示例

以下示例为使用 .NET SDK 消费订阅的离线数据代码示例:

using enos_subscription.client;

using (OfflineClient client = new OfflineClient("sub-host", "sub-port", "Your Access Key of this subscription", "Your Access Secret of this subscription")) {

    client.subscribe("Your subscription Id", "Your consumer group");

    foreach (var message in client.GetMessages())
    {
       //do something with the message
    }
}

查看订阅任务的运行情况

当产生的订阅数据的数据量较大时,你可以查看订阅任务的运行情况,确保订阅数据被及时消费。

  1. 登录 EnOS 管理控制台,在订阅任务列表的 操作 一栏中,选择 更多 > 运行情况

  2. 在弹窗中查看产生订阅数据的速率和消费订阅数据的速率,判断消费数据是否有延迟。

    _images/subscription_statistics.png

    备注

    在上图示例中,Producer Rates 显示实时数据通道产生订阅数据的速率,Consumer Rates 显示各个 Consumer Group 消费订阅数据的速率。offset 线和数字代表该订阅任务的数据总量;Consumer Group 线和数字代表该 Consumer Group 消费数据的历史和未被消费的数据量。

如果出现消费数据延迟,可使用同一Consumer Group 的2个 Consumer Client 同时消费订阅数据,提高消费数据的速度。