消费订阅数据(EnOS Edge 用户)


数据订阅任务启动运行之后,可以使用数据订阅 SDK 开发应用,消费已订阅的数据。


本文介绍使用数据订阅 SDK 的安装和消费订阅数据的代码示例。

EnOS Edge 支持以下数据订阅 SDK:

  • Java SDK

  • Go SDK

  • C SDK

对 EnOS SDK 的详细介绍、最新版本及下载地址,访问 EnOS SDKs 和工具

使用 Java SDK

数据订阅 Java SDK 的安装和消费订阅数据的代码示例如下:

安装数据订阅 Java SDK

  1. 访问数据订阅 Java SDK 的 Maven 仓库,获取依赖信息 EnOS Edge Maven 仓库。

  2. 在 Java 开发项目文件中添加如下 Maven 依赖,安装数据订阅 SDK:

    <dependency>
        <groupId>com.envisioniot</groupId>
        <artifactId>subscription-client</artifactId>
        <version>5.0.2</version>
    </dependency>
    <dependency>
        <groupId>com.envisioniot</groupId>
        <artifactId>edge-subscribe-impl</artifactId>
        <version>1.0.5</version>
    </dependency>
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.0</version>
    </dependency>
    

消费实时数据代码示例

以下示例为使用指定的 Consumer Group 消费订阅的资产实时数据。如果订阅的数据量较大,可使用同一 Consumer Group 的2个 Consumer Client 同时消费订阅数据,提高消费数据的效率。

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.data.IDataHandler;
import com.envisioniot.sub.client.data.IDataService;
import com.envisioniot.sub.common.model.dto.StreamMessage;

public class DataServiceDemo {
    public static void main(String[] args) throws Exception {
        // 订阅服务地址,根据EnOS环境填写
        String host = "subscription_server";
        // 订阅服务端口,根据EnOS环境填写
        int port = 9001;
        // 应用身份验证,在应用创建时生成
        String accessKey = "access_key";
        // 应用身份验证,在应用创建时生成
        String secretKey = "secret_key";

        // 订阅ID,创建订阅任务时指定或生成
        String subId = "subscription_id";

        // 订阅分组,相关概念见订阅SDK参考
        String consumerGroup = "consumer_group";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey, true);

        // 根据订阅类型获取相应的service,本示例获取实时数据service
        IDataService dataService = eosClient.getDataService();

        // 在启动订阅之前需要创建订阅数据处理函数
        IDataHandler dataHandler = new IDataHandler(){
            public void dataRead(StreamMessage message) {
                System.out.println(message);
            }
        };

        // 调用subscribe函数创建订阅连接,调用后订阅连接被创建
        // 同时指定订阅分组,关于订阅分组的概念见订阅SDK参考说明
        dataService.subscribe(dataHandler, subId, consumerGroup);
    }
}

备注

  • 在以上示例中, hostport 指订阅服务的地址和端口号。不同的云服务和实例的服务地址和端口号不同。请登录 EnOS 管理控制台,点击页面右上角 帮助 > 环境信息 获取对应环境的订阅服务地址和端口信息。

  • 每个 Topic Partition 数量为2,即同一个订阅Topic最多只支持2个 Consumer Client 同时消费数据。

  • 一个 Consumer 实例只能消费一个 Topic。

  • 数据在 Topic 中的存储时长默认为3天。

消费告警数据代码示例

以下示例为不指定 Consumer Group 消费订阅的告警引擎数据。

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;

public class AlertServiceDemo1 {
    public static void main(String[] args) throws Exception {
        // 订阅服务地址,根据EnOS环境填写
        String host = "subscription_server";
        // 订阅服务端口,根据EnOS环境填写
        int port = 9001;
        // 应用身份验证,在应用创建时生成
        String accessKey = "access_key";
        // 应用身份验证,在应用创建时生成
        String secretKey = "secret_key";

        // 订阅ID,创建订阅任务时指定或生成
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey, true);

        // 根据订阅类型获取相应的service,本示例获取告警service
        IAlertService alertService = eosClient.getAlertService();

        // 在启动订阅之前需要创建订阅数据处理函数
        IAlertHandler alertHandler = new IAlertHandler (){
            @Override
            public void alertRead(Alert alert) {
                System.out.println(alert);
            }
        };

        // 需要调用subscribe函数创建订阅连接,调用后订阅连接被创建
        alertService.subscribe(alertHandler, subId);

    }
}

备注

除以上代码示例为,可参考 消费告警数据代码字段说明,编写消费告警引擎数据的代码。

消费事件数据代码示例

以下示例为不指定 Consumer Group 消费订阅的资产事件数据。

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;

public class EventServiceDemo1 {
    public static void main(String[] args) throws Exception {
        // 订阅服务地址,根据EnOS环境填写
        String host = "subscription_server";
        // 订阅服务端口,根据EnOS环境填写
        int port = 9001;
        // 应用身份验证,在应用创建时生成
        String accessKey = "access_key";
        // 应用身份验证,在应用创建时生成
        String secretKey = "secret_key";

        // 订阅ID,创建订阅任务时指定或生成
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey, true);

        // 根据订阅类型获取相应的service,本示例获取事件service
        IEventService eventService = eosClient.getEventService();

        // 在启动订阅之前需要创建订阅数据处理函数
        IEventHandler eventHandler = new IEventHandler (){
            @Override
            public void eventRead(String event) {
                System.out.println(event);
            }
        };

        // 需要调用subscribe函数创建订阅连接,调用后订阅连接被创建
        eventService.subscribe(eventHandler, subId);

    }
}

使用 Go SDK

数据订阅 Java SDK 的安装和消费订阅数据的代码示例如下,更多信息参见 数据订阅 Go SDK

配置数据订阅 Go SDK

  1. 获取数据订阅 Go SDK 源码

    git clone https://github.com/EnvisionIot/edge-subscription-service-sdk-go
    
  2. 调用 NewSubscriber() 函数初始化创建一个订阅者

    import (
        "edge-subscription-go/subscribe"
    )
    
    func main(){
        edgeServiceIp := "127.0.0.1"
        edgeServicePort := 9150
        subTopicName := "DATASVC.SUB.group"
        subChannelName := "real_time_point"
        accessKey := "a-b-c"
        appSecret := "a-d-f"
        subId := "subTopicId"
        f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
        if err != nil{
            //init failed, return
            return
        }
    }
    
  3. 根据订阅类型,定义相应的消息处理回调函数。

    import (
        "edge-subscription-go/subscribe/record"
        "fmt"
    )
    
    func main(){
        ...
        functionRealTimePoint := func(records []record.SSRecordPoint) {
                for _, point := range records{
                    fmt.Printf("real time point value = %s\n", point.GetValue())
                }
        }
        ...
    }
    

消费实时数据代码示例

import (
    "edge-subscription-go/subscribe"
    "edge-subscription-go/subscribe/record"
    "fmt"
)

func main(){
    //初始化创建一个订阅者
    edgeServiceIp := "127.0.0.1"
    edgeServicePort := 9150
    subTopicName := "DATASVC.SUB.group"
    subChannelName := "real_time_point"
    accessKey := "accessKey"
    appSecret := "appSecret"
    subId := "subTopicId"
    f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
    if err != nil{
        return
    }

    //定义实时数据处理回调函数
    functionRealTimePoint := func(records []record.SSRecordPoint) {
        for _, point := range records{
            fmt.Printf("real time point value = %s\n", point.GetValue())
        }
    }
    //开始订阅接受数据
    _ = f.SubRealTimePoint(functionRealTimePoint)
}

消费告警数据代码示例

import (
    "edge-subscription-go/subscribe"
    "edge-subscription-go/subscribe/alarm"
    "fmt"
)

func main(){
    //初始化创建一个订阅者
    edgeServiceIp := "127.0.0.1"
    edgeServicePort := 9150
    subTopicName := "DATASVC.SUB.group"
    subChannelName := "alarm_data"
    accessKey := "accessKey"
    appSecret := "appSecret"
    subId := "subTopicId"
    f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
    if err != nil{
        return
    }

    //定义告警数据处理回调函数
    functionAlarmData := func(records []alarm.AlarmRecord) {
        for _, point := range records{
            fmt.Printf("alarm data value = %s\n", point.GetValue())
        }
    }
    //开始订阅接受数据
    _ = f.SubAlarmData(functionAlarmData)
}

备注

除以上代码示例为,可参考 消费告警数据代码字段说明,编写消费告警引擎数据的代码。

消费设备异步控制的返回代码示例

import (
    "edge-subscription-go/subscribe"
    "edge-subscription-go/subscribe/control"
    "fmt"
)

func main(){
    //初始化创建一个订阅者
    edgeServiceIp := "127.0.0.1"
    edgeServicePort := 9150
    subTopicName := "DATASVC.SUB.group"
    subChannelName := "alarm_data"
    accessKey := "accessKey"
    appSecret := "appSecret"
    subId := "subTopicId"
    f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
    if err != nil{
        return
    }

    //定义控制反较处理回调函数
    functionControlResponse := func(records []control.ControlResponsePoint) {
        for _, point := range records{
            fmt.Printf("control response value = %s\n", point.GetInputData())
        }
    }
    //开始订阅接受数据
    _ = f.SubControlResponse(functionControlResponse)
}

消费设备异步设点的返回结果示例

import (
    "edge-subscription-go/subscribe"
    "edge-subscription-go/subscribe/control"
    "fmt"
)

func main(){
    //初始化创建一个订阅者
    edgeServiceIp := "127.0.0.1"
    edgeServicePort := 9150
    subTopicName := "DATASVC.SUB.group"
    subChannelName := "alarm_data"
    accessKey := "accessKey"
    appSecret := "appSecret"
    subId := "subTopicId"
    f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
    if err != nil{
        return
    }

    //定义设备设点反较处理回调函数
    functionSetPointResponse := func(records []setpoint.SetMeasurepointResponsePoint) {
        for _, point := range records{
            fmt.Printf("set measurePoint input value = %s\n", point.GetInputData())
        }
    }
    //开始订阅接受数据
    _ = f.SubSetMeasurePointResponse(functionSetPointResponse)
}

使用 C SDK

数据订阅 C SDK 的安装和消费订阅数据的代码示例如下,更多信息参见 数据订阅 C SDK

配置数据订阅 C SDK

  1. 使用数据订阅 C SDK 需要先安装 CMake

  2. 编译

    • 调试版本

      cmake -D PLATFORM_DEFINE=linux_x86_normal_64_local -D DEBUG_MODE=ON .
      make //使用 "make VERBOSE=1" 获取更多信息
      
    • 发布版本

      cmake -D PLATFORM_DEFINE=linux_x86_normal_64_local -D DEBUG_MODE=OFF .
      make //使用 "make VERBOSE=1" 获取更多信息
      

消费实时数据代码示例

以下示例为使用 C SDK 消费订阅的资产实时数据代码示例:

#include "edge_service_c_api/edge_service_c_api.h"

struct my_ctx {
    int aa;
    char bb[128];
};

void my_log_writer(struct LogInfoBox *log_info, void *user_ctx) {
    if (log_info == NULL) {
        return;
    }
    printf("[TEST-%s]:%s(file=%s, function=%s, line=%d)\n", log_info->module_name, log_info->log_str,
           log_info->file_path,
           log_info->func_name, log_info->line_num);

    //注意用完之后要释放
    delete_log_info_box(log_info);
}

void print_data_subscribe_msg(struct DataSubscribeStruct *dss_ptr) {
    printf("---------------------->\n");
    if (dss_ptr == NULL) {
        printf("dss_ptr == NULL\n");
        return;
    }
    printf("dss_ptr->point_count=%d\n", dss_ptr->point_count);
    int ii = 0;
    for (ii = 0; ii < dss_ptr->point_count; ii++) {
        //调用方自定义数据处理,具体结构参照以下实时数据订阅结构
    }
}

void connect_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:connected, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void close_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:closed, channel_id=%s(file=%s, function=%s, line=%d)\n", channel_id,
           __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void
msg_callback(void *work_ctx, char *channel_id, struct DataServiceMessage *msg, void *user_ctx) {
    printf("[DATASERVICE]:recv msg, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        //注意用完后要释放
        delete_data_service_msg(msg);
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);

    printf("[DATASERVICE]:msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
           msg->topic_type, __FILE__, __FUNCTION__, __LINE__);

    switch (msg->topic_type) {
        case TOPIC_TYPE_DATA_SUBSCRIBE:
        case TOPIC_TYPE_DATA_SUBSCRIBE_ALL:
            print_data_subscribe_msg((struct DataSubscribeStruct *) (msg->msg));
            break;
        default:
            printf("[DATASERVICE]:unsupported msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
                   msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
            break;
    }

    //注意用完后要释放
    delete_data_service_msg(msg);
    return;
}

int main(int argc, char *argv[]) {
    int ret = 0;
    struct IPBox *ip_list = NULL;
    char topic_name[60];
    memset(topic_name, 0, sizeof(topic_name));
    char consumerGroup[60];
    memset(consumerGroup, 0, sizeof(consumerGroup));

    //需要连接的ip
    ip_list = add_ip_to_ip_box(ip_list, "127.0.0.1");
    //订阅的topic类型,实时数据订阅可以选TOPIC_TYPE_DATA_SUBSCRIBE或TOPIC_TYPE_AUTO
    int topic_type = TOPIC_TYPE_DATA_SUBSCRIBE;
    //端口
    int port = EDGE_DATASERVICE_DEFAULT_PORT;
    //topic名字,实时数据订阅的topic都是以DATASVC.SUB.开头
    snprintf(topic_name, sizeof(topic_name), "%s", "DATASVC.SUB.APP.SUBTEST");
    //consumerGroup,非必填,如果consumerGroup是NULL,则默认为default,详细说明参考new_data_service_ctx函数注释
    snprintf(consumerGroup, sizeof(consumerGroup), "%s", "default");

    struct DataServiceCtx *ctx = NULL;

    //1.init_edge_service_ctx;初始化全局变量,非线程安全
    ret = init_edge_service_ctx();
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:init_edge_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        return -1;
    }

    struct my_ctx *user_ctx = (struct my_ctx *) malloc(sizeof(struct my_ctx));
    if (user_ctx == NULL) {
        printf("[DATASERVICE_TEST]:user_ctx malloc error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        uninit_edge_service_ctx();
        return -1;
    }
    memset(user_ctx, 0, sizeof(struct my_ctx));
    user_ctx->aa = 10;
    snprintf(user_ctx->bb, sizeof(user_ctx->bb), "%s", "user_custom_info");

    //2.set_log_level(EDGE_LOG_INFO);设置日志等级,不设置默认为EDGE_LOG_INFO,线程安全,可以随时调用这个函数,实时生效
    set_log_level(EDGE_LOG_INFO);
    //3.set_log_writer(my_log_writer, user_ctx);设置打印函数,sdk内部需要打印时会调用这个打印函数打印,如果不设置,默认打印到命令行终端,打印函数中注意数据用完后需要delete_log_info_box释放,非线程安全,一开始设置一次就可以了
    set_log_writer(my_log_writer, user_ctx);

    //4.调用new_data_service_ctx函数,初始化必要的上下文,线程安全,注意当只填一个IP时,无论这个IP是主还是备,都会去向这个IP订阅数据,填多个IP时,会根据主备情况自动切换
    ctx = new_data_service_ctx(
            ip_list,
            port,
            "your accessKey",
            "your secretKey",
            topic_name,
            consumerGroup,
            topic_type,
            1,
            user_ctx,
            connect_callback,
            close_callback,
            msg_callback
    );

    if (ctx == NULL) {
        printf("[DATASERVICE_TEST]:new_data_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //5.调用data_service_ctx_start函数,启动相关模块,开始从服务端接收数据,非线程安全
    ret = data_service_ctx_start(ctx);
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:data_service_ctx_start error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        delete_data_service_ctx(ctx);
        ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //运行60s后退出
    edge_sleep(60000);

    //不使用时记得stop和delete

    //ip_box不使用时是需要释放的
    delete_ip_box(ip_list);

    //6.可以调用data_service_ctx_stop函数暂停接收数据,start和stop的调用要成对,不能在没有调用start的情况下调用stop,也不能再已经start的情况下调用start,非线程安全
    data_service_ctx_stop(ctx);
    //7.delete_data_service_ctx;释放new_data_service_ctx占用的资源,退出时需要调用,需要在调用stop之后调用该函数,非线程安全
    delete_data_service_ctx(ctx);
    //8.uninit_edge_service_ctx;释放init_edge_service_ctx占用的资源,退出时需要调用,需要在调用delete_data_service_ctx之后调用该函数,非线程安全
    uninit_edge_service_ctx();

    free(user_ctx);

    return 0;
}

实时数据订阅结构

struct DataSubscribeStruct {
    int point_count;//points中数组元素的个数
    struct DataSubscribeSubStruct *points;//struct DataSubscribeSubStruct的一个数组,points是这个数组的头指针
};

struct DataSubscribeSubStruct {
    char *orgid;//组织id
    char *modelid;//模型id
    char *assetid;//资产id
    char *collectdeviceid;//设备id(一个设备在edge内部的id)
    char *pointid;//模型点id
    int64_t time;//时间戳(utc毫秒)
    char *value;//值的字符串表示,如"3.3", "[1,2,3]", "[\"a\", \"b\", \"c\"]", "test"
    int32_t quality;//质量位
    int64_t dq;//edge流式计算使用的质量位
    char *modelpath;//模型id的全路径,如/parent/child
    char *policytype;//表示tsdb的存储类型,参考POLICY_TYPE
    char *signaltype;//表示数据点的类型,参考SIGNAL_TYPE
    int hasquality;//该模型点是否定义了质量位
    char *datatype;//value字段的数据类型,参考DATATYPE定义
    char *subdatatype;//value数组元素的数据类型,只有当datatype为ARRAY时才有效,参考SUB_DATATYPE
    char *attr;//保留字段
    int32_t usingoem;//time字段是否使用的是oem时标,0时使用edge系统时标,1时使用设备上送的oem时间戳
    int64_t oemtime;//设备上送的oem时间戳(utc毫秒)
    int32_t pointtype;//模型点类型,参考POINT_TYPE
};

消费设备异步控制的返回结果示例

以下示例为使用 C SDK 消费订阅的异步控制返回结果代码示例:

#include "edge_service_c_api/edge_service_c_api.h"

struct my_ctx {
    int aa;
    char bb[128];
};

void my_log_writer(struct LogInfoBox *log_info, void *user_ctx) {
    if (log_info == NULL) {
        return;
    }
    printf("[TEST-%s]:%s(file=%s, function=%s, line=%d)\n", log_info->module_name, log_info->log_str,
           log_info->file_path,
           log_info->func_name, log_info->line_num);

    //注意用完之后要释放
    delete_log_info_box(log_info);
}

void print_control_response_msg(struct ControlResponseStruct *crs_ptr) {
    printf("---------------------->\n");
    if (crs_ptr == NULL) {
        printf("crs_ptr == NULL\n");
        return;
    }
    printf("crs_ptr->point_count=%d\n", crs_ptr->point_count);
    int ii = 0;
    for (ii = 0; ii < crs_ptr->point_count; ii++) {
        //调用方自定义数据处理,具体结构参照以下设备异步控制返回结果数据订阅结构
    }
}

void connect_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:connected, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void close_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:closed, channel_id=%s(file=%s, function=%s, line=%d)\n", channel_id,
           __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void
msg_callback(void *work_ctx, char *channel_id, struct DataServiceMessage *msg, void *user_ctx) {
    printf("[DATASERVICE]:recv msg, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        //注意用完后要释放
        delete_data_service_msg(msg);
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);

    printf("[DATASERVICE]:msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
           msg->topic_type, __FILE__, __FUNCTION__, __LINE__);

    switch (msg->topic_type) {
        case TOPIC_TYPE_CONTROL_RESPONSE:
            print_control_response_msg((struct ControlResponseStruct *) (msg->msg));
            break;
        default:
            printf("[DATASERVICE]:unsupported msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
                   msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
            break;
    }

    //注意用完后要释放
    delete_data_service_msg(msg);
    return;
}

int main(int argc, char *argv[]) {
    int ret = 0;
    struct IPBox *ip_list = NULL;
    char topic_name[60];
    memset(topic_name, 0, sizeof(topic_name));
    char consumerGroup[60];
    memset(consumerGroup, 0, sizeof(consumerGroup));

    //需要连接的ip
    ip_list = add_ip_to_ip_box(ip_list, "127.0.0.1");
    //订阅的topic类型,控制反较数据订阅可以选TOPIC_TYPE_CONTROL_RESPONSE或TOPIC_TYPE_AUTO
    int topic_type = TOPIC_TYPE_CONTROL_RESPONSE;
    //端口
    int port = EDGE_DATASERVICE_DEFAULT_PORT;
    //topic名字,控制反较数据订阅的topic都是以DATASVC.CONTROL.开头
    snprintf(topic_name, sizeof(topic_name), "%s", "DATASVC.CONTROL.APP.SUBTEST");
    //consumerGroup,非必填,如果consumerGroup是NULL,则默认为default,详细说明参考new_data_service_ctx函数注释
    snprintf(consumerGroup, sizeof(consumerGroup), "%s", "default");

    struct DataServiceCtx *ctx = NULL;

    //1.init_edge_service_ctx;初始化全局变量,非线程安全
    ret = init_edge_service_ctx();
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:init_edge_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        return -1;
    }

    struct my_ctx *user_ctx = (struct my_ctx *) malloc(sizeof(struct my_ctx));
    if (user_ctx == NULL) {
        printf("[DATASERVICE_TEST]:user_ctx malloc error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        uninit_edge_service_ctx();
        return -1;
    }
    memset(user_ctx, 0, sizeof(struct my_ctx));
    user_ctx->aa = 10;
    snprintf(user_ctx->bb, sizeof(user_ctx->bb), "%s", "user_custom_info");

    //2.set_log_level(EDGE_LOG_INFO);设置日志等级,不设置默认为EDGE_LOG_INFO,线程安全,可以随时调用这个函数,实时生效
    set_log_level(EDGE_LOG_INFO);
    //3.set_log_writer(my_log_writer, user_ctx);设置打印函数,sdk内部需要打印时会调用这个打印函数打印,如果不设置,默认打印到命令行终端,打印函数中注意数据用完后需要delete_log_info_box释放,非线程安全,一开始设置一次就可以了
    set_log_writer(my_log_writer, user_ctx);

    //4.调用new_data_service_ctx函数,初始化必要的上下文,线程安全,注意当只填一个IP时,无论这个IP是主还是备,都会去向这个IP订阅数据,填多个IP时,会根据主备情况自动切换
    ctx = new_data_service_ctx(
            ip_list,
            port,
            "your accessKey",
            "your secretKey",
            topic_name,
            consumerGroup,
            topic_type,
            1,
            user_ctx,
            connect_callback,
            close_callback,
            msg_callback
    );

    if (ctx == NULL) {
        printf("[DATASERVICE_TEST]:new_data_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //5.调用data_service_ctx_start函数,启动相关模块,开始从服务端接收数据,非线程安全
    ret = data_service_ctx_start(ctx);
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:data_service_ctx_start error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        delete_data_service_ctx(ctx);
        ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //运行60s后退出
    edge_sleep(60000);

    //不使用时记得stop和delete

    //ip_box不使用时是需要释放的
    delete_ip_box(ip_list);

    //6.可以调用data_service_ctx_stop函数暂停接收数据,start和stop的调用要成对,不能在没有调用start的情况下调用stop,也不能再已经start的情况下调用start,非线程安全
    data_service_ctx_stop(ctx);
    //7.delete_data_service_ctx;释放new_data_service_ctx占用的资源,退出时需要调用,需要在调用stop之后调用该函数,非线程安全
    delete_data_service_ctx(ctx);
    //8.uninit_edge_service_ctx;释放init_edge_service_ctx占用的资源,退出时需要调用,需要在调用delete_data_service_ctx之后调用该函数,非线程安全
    uninit_edge_service_ctx();

    free(user_ctx);

    return 0;
}

设备异步控制的返回结果数据订阅结构

struct ControlResponseStruct {
    int point_count;//points中数组元素的个数
    struct ControlResponseSubStruct *points;//struct ControlResponseSubStruct 的一个数组,points是这个数组的头指针
};

struct ControlResponseSubStruct {
    char *requestid;//控制请求的id
    char *messageid;//目前总是为空串
    char *requestmethod;//目前总是为空串
    char *calltype;//是同步还是异步,参考CALL_TYPE
    char *controlchannelid;//控制反较通道id
    char *productkey;//资产的productKey,控制请求时填了productKey这边才会有
    char *devicekey;//资产的deviceKey,控制请求时填了deviceKey这边才会有
    char *assetid;//资产的assetId
    char *servicename;//目前总是为空串
    char *serviceid;//控制的模型点id
    char *callbackurl;//目前总是为空串
    char *inputdata;//控制时的请求值
    char *outputdata;//目前总是为空串
    int64_t status;//控制结果状态码,0表示成功,其他表示失败
    char *msg;//控制结果状态码的一些解释
    char *submsg;//控制结果状态码的一些详细解释
    int64_t timeout;//控制超时时间,单位毫秒
    int64_t gmtservicerequest;//这个控制请求时的时间(utc毫秒)
    int64_t gmtservicereply;//这个控制反较时的时间(utc毫秒)
    int64_t gmtdevicereply;//这个控制设备反较时的时间,设备没有反较时为-1(utc毫秒)
    char *attr;//保留字段
};

消费设备异步设点的返回结果示例

以下示例为使用 C SDK 消费订阅的异步设点返回结果代码示例:

#include "edge_service_c_api/edge_service_c_api.h"

struct my_ctx {
    int aa;
    char bb[128];
};

void my_log_writer(struct LogInfoBox *log_info, void *user_ctx) {
    if (log_info == NULL) {
        return;
    }
    printf("[TEST-%s]:%s(file=%s, function=%s, line=%d)\n", log_info->module_name, log_info->log_str,
           log_info->file_path,
           log_info->func_name, log_info->line_num);

    //注意用完之后要释放
    delete_log_info_box(log_info);
}

void print_set_measurepoint_response_msg(struct SetMeasurepointResponseStruct *smrs_ptr) {
    printf("---------------------->\n");
    if (smrs_ptr == NULL) {
        printf("smrs_ptr == NULL\n");
        return;
    }
    printf("smrs_ptr->point_count=%d\n", smrs_ptr->point_count);
    int ii = 0;
    for (ii = 0; ii < smrs_ptr->point_count; ii++) {
        //调用方自定义数据处理,具体结构参照以下实时数据订阅结构
    }
}

void connect_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:connected, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void close_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:closed, channel_id=%s(file=%s, function=%s, line=%d)\n", channel_id,
           __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void
msg_callback(void *work_ctx, char *channel_id, struct DataServiceMessage *msg, void *user_ctx) {
    printf("[DATASERVICE]:recv msg, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        //注意用完后要释放
        delete_data_service_msg(msg);
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);

    printf("[DATASERVICE]:msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
           msg->topic_type, __FILE__, __FUNCTION__, __LINE__);

    switch (msg->topic_type) {
        case TOPIC_TYPE_SET_MEASUREPOINT_RESPONSE:
            print_set_measurepoint_response_msg((struct SetMeasurepointResponseStruct *) (msg->msg));
            break;
        default:
            printf("[DATASERVICE]:unsupported msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
                   msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
            break;
    }

    //注意用完后要释放
    delete_data_service_msg(msg);
    return;
}

int main(int argc, char *argv[]) {
    int ret = 0;
    struct IPBox *ip_list = NULL;
    char topic_name[60];
    memset(topic_name, 0, sizeof(topic_name));
    char consumerGroup[60];
    memset(consumerGroup, 0, sizeof(consumerGroup));

    //需要连接的ip
    ip_list = add_ip_to_ip_box(ip_list, "127.0.0.1");
    //订阅的topic类型,实时数据订阅可以选TOPIC_TYPE_SET_MEASUREPOINT_RESPONSE或TOPIC_TYPE_AUTO
    int topic_type = TOPIC_TYPE_SET_MEASUREPOINT_RESPONSE;
    //端口
    int port = EDGE_DATASERVICE_DEFAULT_PORT;
    //topic名字,写值反较数据订阅的topic都是以DATASVC.SET.开头
    snprintf(topic_name, sizeof(topic_name), "%s", "DATASVC.SET.APP.SUBTEST");
    //consumerGroup,非必填,如果consumerGroup是NULL,则默认为default,详细说明参考new_data_service_ctx函数注释
    snprintf(consumerGroup, sizeof(consumerGroup), "%s", "default");

    struct DataServiceCtx *ctx = NULL;

    //1.init_edge_service_ctx;初始化全局变量,非线程安全
    ret = init_edge_service_ctx();
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:init_edge_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        return -1;
    }

    struct my_ctx *user_ctx = (struct my_ctx *) malloc(sizeof(struct my_ctx));
    if (user_ctx == NULL) {
        printf("[DATASERVICE_TEST]:user_ctx malloc error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        uninit_edge_service_ctx();
        return -1;
    }
    memset(user_ctx, 0, sizeof(struct my_ctx));
    user_ctx->aa = 10;
    snprintf(user_ctx->bb, sizeof(user_ctx->bb), "%s", "user_custom_info");

    //2.set_log_level(EDGE_LOG_INFO);设置日志等级,不设置默认为EDGE_LOG_INFO,线程安全,可以随时调用这个函数,实时生效
    set_log_level(EDGE_LOG_INFO);
    //3.set_log_writer(my_log_writer, user_ctx);设置打印函数,sdk内部需要打印时会调用这个打印函数打印,如果不设置,默认打印到命令行终端,打印函数中注意数据用完后需要delete_log_info_box释放,非线程安全,一开始设置一次就可以了
    set_log_writer(my_log_writer, user_ctx);

    //4.调用new_data_service_ctx函数,初始化必要的上下文,线程安全,注意当只填一个IP时,无论这个IP是主还是备,都会去向这个IP订阅数据,填多个IP时,会根据主备情况自动切换
    ctx = new_data_service_ctx(
            ip_list,
            port,
            "your accessKey",
            "your secretKey",
            topic_name,
            consumerGroup,
            topic_type,
            1,
            user_ctx,
            connect_callback,
            close_callback,
            msg_callback
    );

    if (ctx == NULL) {
        printf("[DATASERVICE_TEST]:new_data_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //5.调用data_service_ctx_start函数,启动相关模块,开始从服务端接收数据,非线程安全
    ret = data_service_ctx_start(ctx);
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:data_service_ctx_start error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        delete_data_service_ctx(ctx);
        ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //运行60s后退出
    edge_sleep(60000);

    //不使用时记得stop和delete

    //ip_box不使用时是需要释放的
    delete_ip_box(ip_list);

    //6.可以调用data_service_ctx_stop函数暂停接收数据,start和stop的调用要成对,不能在没有调用start的情况下调用stop,也不能再已经start的情况下调用start,非线程安全
    data_service_ctx_stop(ctx);
    //7.delete_data_service_ctx;释放new_data_service_ctx占用的资源,退出时需要调用,需要在调用stop之后调用该函数,非线程安全
    delete_data_service_ctx(ctx);
    //8.uninit_edge_service_ctx;释放init_edge_service_ctx占用的资源,退出时需要调用,需要在调用delete_data_service_ctx之后调用该函数,非线程安全
    uninit_edge_service_ctx();

    free(user_ctx);

    return 0;
}

设备异步设点的返回结果数据订阅结构

struct SetMeasurepointResponseStruct {
    int point_count;//points中数组元素的个数
    struct SetMeasurepointResponseSubStruct *points;//struct SetMeasurepointResponseSubStruct 的一个数组,points是这个数组的头指针
};

struct SetMeasurepointResponseSubStruct {
    char *requestid;//控制请求的id
    char *orgid;//组织id
    char *calltype;//是同步还是异步,参考CALL_TYPE
    char *setmeasurepointchannelid;//写值反较通道id
    char *productkey;//资产的productKey,控制请求时填了productKey这边才会有
    char *devicekey;//资产的deviceKey,控制请求时填了deviceKey这边才会有
    char *assetid;//资产的assetId
    char *measurepointid;//写值的模型点id
    char *callbackurl;//目前总是为空串
    char *inputdata;//写值时的请求值
    int64_t status;//写值结果状态码,0表示成功,其他表示失败
    char *msg;//写值结果状态码的一些解释
    char *submsg;//写值结果状态码的一些详细解释
    int64_t timeout;//写值超时时间,单位毫秒
    int64_t gmtsetmeasurepointrequest;//写值请求时填的gmtsetmeasurepointrequest,如果没填,则会填请求时的edge系统时间,这个值会做为写值点的time字段(utc毫秒)
    int64_t gmtsetmeasurepointreply;//这个写值反较时的时间(utc毫秒)
    char *attr;//保留字段
};

消费自定义订阅数据示例

以下示例为使用 C SDK 自定义消费订阅的数据代码示例:

#include "edge_service_c_api/edge_service_c_api.h"

struct my_ctx {
    int aa;
    char bb[128];
};

void my_log_writer(struct LogInfoBox *log_info, void *user_ctx) {
    if (log_info == NULL) {
        return;
    }
    printf("[TEST-%s]:%s(file=%s, function=%s, line=%d)\n", log_info->module_name, log_info->log_str,
           log_info->file_path,
           log_info->func_name, log_info->line_num);

    //注意用完之后要释放
    delete_log_info_box(log_info);
}

void print_custom_msg(char *custom_ptr, int len) {
    printf("---------------------->\n");
    if (custom_ptr == NULL) {
        printf("custom_ptr == NULL\n");
        return;
    }

    int ii = 0;
    printf("hex:");
    for (ii = 0; ii < len; ii++) {
        printf("%hhx ", custom_ptr[ii]);
    }
    printf("\n");
    printf("str:%s\n", custom_ptr);
    printf("<----------------------\n\n");
}

void connect_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:connected, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void close_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:closed, channel_id=%s(file=%s, function=%s, line=%d)\n", channel_id,
           __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void
msg_callback(void *work_ctx, char *channel_id, struct DataServiceMessage *msg, void *user_ctx) {
    printf("[DATASERVICE]:recv msg, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        //注意用完后要释放
        delete_data_service_msg(msg);
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);

    printf("[DATASERVICE]:msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
           msg->topic_type, __FILE__, __FUNCTION__, __LINE__);

    switch (msg->topic_type) {
        case TOPIC_TYPE_CUSTOM:
            print_custom_msg((char *) (msg->msg), msg->msg_len);
            break;
        default:
            printf("[DATASERVICE]:unsupported msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
                   msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
            break;
    }

    //注意用完后要释放
    delete_data_service_msg(msg);
    return;
}

int main(int argc, char *argv[]) {
    int ret = 0;
    struct IPBox *ip_list = NULL;
    char topic_name[60];
    memset(topic_name, 0, sizeof(topic_name));
    char consumerGroup[60];
    memset(consumerGroup, 0, sizeof(consumerGroup));

    //需要连接的ip
    ip_list = add_ip_to_ip_box(ip_list, "127.0.0.1");
    //订阅的topic类型,实时数据订阅可以选TOPIC_TYPE_CUSTOM或TOPIC_TYPE_AUTO,custom类型不会解析订阅到的数据,会原样返回给调用方
    int topic_type = TOPIC_TYPE_CUSTOM;
    //端口
    int port = EDGE_DATASERVICE_DEFAULT_PORT;
    //topic名字,custom订阅方式时,可以订阅任意topic,包括实时数据、控制反较、写值反较、其他数据
    snprintf(topic_name, sizeof(topic_name), "%s", "custom_topic");
    //consumerGroup,非必填,如果consumerGroup是NULL,则默认为default,详细说明参考new_data_service_ctx函数注释
    snprintf(consumerGroup, sizeof(consumerGroup), "%s", "default");

    struct DataServiceCtx *ctx = NULL;

    //1.init_edge_service_ctx;初始化全局变量,非线程安全
    ret = init_edge_service_ctx();
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:init_edge_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        return -1;
    }

    struct my_ctx *user_ctx = (struct my_ctx *) malloc(sizeof(struct my_ctx));
    if (user_ctx == NULL) {
        printf("[DATASERVICE_TEST]:user_ctx malloc error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        uninit_edge_service_ctx();
        return -1;
    }
    memset(user_ctx, 0, sizeof(struct my_ctx));
    user_ctx->aa = 10;
    snprintf(user_ctx->bb, sizeof(user_ctx->bb), "%s", "user_custom_info");

    //2.set_log_level(EDGE_LOG_INFO);设置日志等级,不设置默认为EDGE_LOG_INFO,线程安全,可以随时调用这个函数,实时生效
    set_log_level(EDGE_LOG_INFO);
    //3.set_log_writer(my_log_writer, user_ctx);设置打印函数,sdk内部需要打印时会调用这个打印函数打印,如果不设置,默认打印到命令行终端,打印函数中注意数据用完后需要delete_log_info_box释放,非线程安全,一开始设置一次就可以了
    set_log_writer(my_log_writer, user_ctx);

    //4.调用new_data_service_ctx函数,初始化必要的上下文,线程安全,注意当只填一个IP时,无论这个IP是主还是备,都会去向这个IP订阅数据,填多个IP时,会根据主备情况自动切换
    ctx = new_data_service_ctx(
            ip_list,
            port,
            "your accessKey",
            "your secretKey",
            topic_name,
            consumerGroup,
            topic_type,
            1,
            user_ctx,
            connect_callback,
            close_callback,
            msg_callback
    );

    if (ctx == NULL) {
        printf("[DATASERVICE_TEST]:new_data_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //5.调用data_service_ctx_start函数,启动相关模块,开始从服务端接收数据,非线程安全
    ret = data_service_ctx_start(ctx);
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:data_service_ctx_start error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        delete_data_service_ctx(ctx);
        ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //运行60s后退出
    edge_sleep(60000);

    //不使用时记得stop和delete

    //ip_box不使用时是需要释放的
    delete_ip_box(ip_list);

    //6.可以调用data_service_ctx_stop函数暂停接收数据,start和stop的调用要成对,不能在没有调用start的情况下调用stop,也不能再已经start的情况下调用start,非线程安全
    data_service_ctx_stop(ctx);
    //7.delete_data_service_ctx;释放new_data_service_ctx占用的资源,退出时需要调用,需要在调用stop之后调用该函数,非线程安全
    delete_data_service_ctx(ctx);
    //8.uninit_edge_service_ctx;释放init_edge_service_ctx占用的资源,退出时需要调用,需要在调用delete_data_service_ctx之后调用该函数,非线程安全
    uninit_edge_service_ctx();

    free(user_ctx);

    return 0;
}