消费订阅数据(EnOS Edge 用户)¶
数据订阅任务启动运行之后,可以使用数据订阅 SDK 开发应用,消费已订阅的数据。
本文介绍使用数据订阅 SDK 的安装和消费订阅数据的代码示例。
EnOS Edge 支持以下数据订阅 SDK:
Java SDK
Go SDK
C SDK
对 EnOS SDK 的详细介绍、最新版本及下载地址,访问 EnOS SDKs 和工具。
使用 Java SDK¶
数据订阅 Java SDK 的安装和消费订阅数据的代码示例如下:
安装数据订阅 Java SDK¶
访问数据订阅 Java SDK 的 Maven 仓库,获取依赖信息 EnOS Edge Maven 仓库。
在 Java 开发项目文件中添加如下 Maven 依赖,安装数据订阅 SDK:
<dependency> <groupId>com.envisioniot</groupId> <artifactId>subscription-client</artifactId> <version>5.0.2</version> </dependency> <dependency> <groupId>com.envisioniot</groupId> <artifactId>edge-subscribe-impl</artifactId> <version>1.0.5</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.0</version> </dependency>
消费实时数据代码示例¶
以下示例为使用指定的 Consumer Group 消费订阅的资产实时数据。如果订阅的数据量较大,可使用同一 Consumer Group 的2个 Consumer Client 同时消费订阅数据,提高消费数据的效率。
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.data.IDataHandler;
import com.envisioniot.sub.client.data.IDataService;
import com.envisioniot.sub.common.model.dto.StreamMessage;
public class DataServiceDemo {
public static void main(String[] args) throws Exception {
// 订阅服务地址,根据EnOS环境填写
String host = "subscription_server";
// 订阅服务端口,根据EnOS环境填写
int port = 9001;
// 应用身份验证,在应用创建时生成
String accessKey = "access_key";
// 应用身份验证,在应用创建时生成
String secretKey = "secret_key";
// 订阅ID,创建订阅任务时指定或生成
String subId = "subscription_id";
// 订阅分组,相关概念见订阅SDK参考
String consumerGroup = "consumer_group";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey, true);
// 根据订阅类型获取相应的service,本示例获取实时数据service
IDataService dataService = eosClient.getDataService();
// 在启动订阅之前需要创建订阅数据处理函数
IDataHandler dataHandler = new IDataHandler(){
public void dataRead(StreamMessage message) {
System.out.println(message);
}
};
// 调用subscribe函数创建订阅连接,调用后订阅连接被创建
// 同时指定订阅分组,关于订阅分组的概念见订阅SDK参考说明
dataService.subscribe(dataHandler, subId, consumerGroup);
}
}
备注
在以上示例中, host 和 port 指订阅服务的地址和端口号。不同的云服务和实例的服务地址和端口号不同。请登录 EnOS 管理控制台,点击页面右上角 帮助 > 环境信息 获取对应环境的订阅服务地址和端口信息。
每个 Topic Partition 数量为2,即同一个订阅Topic最多只支持2个 Consumer Client 同时消费数据。
一个 Consumer 实例只能消费一个 Topic。
数据在 Topic 中的存储时长默认为3天。
消费告警数据代码示例¶
以下示例为不指定 Consumer Group 消费订阅的告警引擎数据。
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;
public class AlertServiceDemo1 {
public static void main(String[] args) throws Exception {
// 订阅服务地址,根据EnOS环境填写
String host = "subscription_server";
// 订阅服务端口,根据EnOS环境填写
int port = 9001;
// 应用身份验证,在应用创建时生成
String accessKey = "access_key";
// 应用身份验证,在应用创建时生成
String secretKey = "secret_key";
// 订阅ID,创建订阅任务时指定或生成
String subId = "subscription_id";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey, true);
// 根据订阅类型获取相应的service,本示例获取告警service
IAlertService alertService = eosClient.getAlertService();
// 在启动订阅之前需要创建订阅数据处理函数
IAlertHandler alertHandler = new IAlertHandler (){
@Override
public void alertRead(Alert alert) {
System.out.println(alert);
}
};
// 需要调用subscribe函数创建订阅连接,调用后订阅连接被创建
alertService.subscribe(alertHandler, subId);
}
}
备注
除以上代码示例为,可参考 消费告警数据代码字段说明,编写消费告警引擎数据的代码。
消费事件数据代码示例¶
以下示例为不指定 Consumer Group 消费订阅的资产事件数据。
import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;
public class EventServiceDemo1 {
public static void main(String[] args) throws Exception {
// 订阅服务地址,根据EnOS环境填写
String host = "subscription_server";
// 订阅服务端口,根据EnOS环境填写
int port = 9001;
// 应用身份验证,在应用创建时生成
String accessKey = "access_key";
// 应用身份验证,在应用创建时生成
String secretKey = "secret_key";
// 订阅ID,创建订阅任务时指定或生成
String subId = "subscription_id";
EosClient eosClient = new EosClient(host, port, accessKey, secretKey, true);
// 根据订阅类型获取相应的service,本示例获取事件service
IEventService eventService = eosClient.getEventService();
// 在启动订阅之前需要创建订阅数据处理函数
IEventHandler eventHandler = new IEventHandler (){
@Override
public void eventRead(String event) {
System.out.println(event);
}
};
// 需要调用subscribe函数创建订阅连接,调用后订阅连接被创建
eventService.subscribe(eventHandler, subId);
}
}
使用 Go SDK¶
数据订阅 Java SDK 的安装和消费订阅数据的代码示例如下,更多信息参见 数据订阅 Go SDK:
配置数据订阅 Go SDK¶
获取数据订阅 Go SDK 源码
git clone https://github.com/EnvisionIot/edge-subscription-service-sdk-go
调用 NewSubscriber() 函数初始化创建一个订阅者
import ( "edge-subscription-go/subscribe" ) func main(){ edgeServiceIp := "127.0.0.1" edgeServicePort := 9150 subTopicName := "DATASVC.SUB.group" subChannelName := "real_time_point" accessKey := "a-b-c" appSecret := "a-d-f" subId := "subTopicId" f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId) if err != nil{ //init failed, return return } }
根据订阅类型,定义相应的消息处理回调函数。
import ( "edge-subscription-go/subscribe/record" "fmt" ) func main(){ ... functionRealTimePoint := func(records []record.SSRecordPoint) { for _, point := range records{ fmt.Printf("real time point value = %s\n", point.GetValue()) } } ... }
消费实时数据代码示例¶
import (
"edge-subscription-go/subscribe"
"edge-subscription-go/subscribe/record"
"fmt"
)
func main(){
//初始化创建一个订阅者
edgeServiceIp := "127.0.0.1"
edgeServicePort := 9150
subTopicName := "DATASVC.SUB.group"
subChannelName := "real_time_point"
accessKey := "accessKey"
appSecret := "appSecret"
subId := "subTopicId"
f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
if err != nil{
return
}
//定义实时数据处理回调函数
functionRealTimePoint := func(records []record.SSRecordPoint) {
for _, point := range records{
fmt.Printf("real time point value = %s\n", point.GetValue())
}
}
//开始订阅接受数据
_ = f.SubRealTimePoint(functionRealTimePoint)
}
消费告警数据代码示例¶
import (
"edge-subscription-go/subscribe"
"edge-subscription-go/subscribe/alarm"
"fmt"
)
func main(){
//初始化创建一个订阅者
edgeServiceIp := "127.0.0.1"
edgeServicePort := 9150
subTopicName := "DATASVC.SUB.group"
subChannelName := "alarm_data"
accessKey := "accessKey"
appSecret := "appSecret"
subId := "subTopicId"
f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
if err != nil{
return
}
//定义告警数据处理回调函数
functionAlarmData := func(records []alarm.AlarmRecord) {
for _, point := range records{
fmt.Printf("alarm data value = %s\n", point.GetValue())
}
}
//开始订阅接受数据
_ = f.SubAlarmData(functionAlarmData)
}
备注
除以上代码示例为,可参考 消费告警数据代码字段说明,编写消费告警引擎数据的代码。
消费设备异步控制的返回代码示例¶
import (
"edge-subscription-go/subscribe"
"edge-subscription-go/subscribe/control"
"fmt"
)
func main(){
//初始化创建一个订阅者
edgeServiceIp := "127.0.0.1"
edgeServicePort := 9150
subTopicName := "DATASVC.SUB.group"
subChannelName := "alarm_data"
accessKey := "accessKey"
appSecret := "appSecret"
subId := "subTopicId"
f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
if err != nil{
return
}
//定义控制反较处理回调函数
functionControlResponse := func(records []control.ControlResponsePoint) {
for _, point := range records{
fmt.Printf("control response value = %s\n", point.GetInputData())
}
}
//开始订阅接受数据
_ = f.SubControlResponse(functionControlResponse)
}
消费设备异步设点的返回结果示例¶
import (
"edge-subscription-go/subscribe"
"edge-subscription-go/subscribe/control"
"fmt"
)
func main(){
//初始化创建一个订阅者
edgeServiceIp := "127.0.0.1"
edgeServicePort := 9150
subTopicName := "DATASVC.SUB.group"
subChannelName := "alarm_data"
accessKey := "accessKey"
appSecret := "appSecret"
subId := "subTopicId"
f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
if err != nil{
return
}
//定义设备设点反较处理回调函数
functionSetPointResponse := func(records []setpoint.SetMeasurepointResponsePoint) {
for _, point := range records{
fmt.Printf("set measurePoint input value = %s\n", point.GetInputData())
}
}
//开始订阅接受数据
_ = f.SubSetMeasurePointResponse(functionSetPointResponse)
}
使用 C SDK¶
数据订阅 C SDK 的安装和消费订阅数据的代码示例如下,更多信息参见 数据订阅 C SDK:
配置数据订阅 C SDK¶
使用数据订阅 C SDK 需要先安装 CMake。
编译
调试版本
cmake -D PLATFORM_DEFINE=linux_x86_normal_64_local -D DEBUG_MODE=ON . make //使用 "make VERBOSE=1" 获取更多信息
发布版本
cmake -D PLATFORM_DEFINE=linux_x86_normal_64_local -D DEBUG_MODE=OFF . make //使用 "make VERBOSE=1" 获取更多信息
消费实时数据代码示例¶
以下示例为使用 C SDK 消费订阅的资产实时数据代码示例:
#include "edge_service_c_api/edge_service_c_api.h"
struct my_ctx {
int aa;
char bb[128];
};
void my_log_writer(struct LogInfoBox *log_info, void *user_ctx) {
if (log_info == NULL) {
return;
}
printf("[TEST-%s]:%s(file=%s, function=%s, line=%d)\n", log_info->module_name, log_info->log_str,
log_info->file_path,
log_info->func_name, log_info->line_num);
//注意用完之后要释放
delete_log_info_box(log_info);
}
void print_data_subscribe_msg(struct DataSubscribeStruct *dss_ptr) {
printf("---------------------->\n");
if (dss_ptr == NULL) {
printf("dss_ptr == NULL\n");
return;
}
printf("dss_ptr->point_count=%d\n", dss_ptr->point_count);
int ii = 0;
for (ii = 0; ii < dss_ptr->point_count; ii++) {
//调用方自定义数据处理,具体结构参照以下实时数据订阅结构
}
}
void connect_callback(void *work_ctx, char *channel_id, void *user_ctx) {
printf("[DATASERVICE]:connected, channel_id=%s(file=%s, function=%s, line=%d)\n",
channel_id, __FILE__, __FUNCTION__, __LINE__);
struct my_ctx *x = (struct my_ctx *) (user_ctx);
if (x == NULL) {
return;
}
printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
return;
}
void close_callback(void *work_ctx, char *channel_id, void *user_ctx) {
printf("[DATASERVICE]:closed, channel_id=%s(file=%s, function=%s, line=%d)\n", channel_id,
__FILE__, __FUNCTION__, __LINE__);
struct my_ctx *x = (struct my_ctx *) (user_ctx);
if (x == NULL) {
return;
}
printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
return;
}
void
msg_callback(void *work_ctx, char *channel_id, struct DataServiceMessage *msg, void *user_ctx) {
printf("[DATASERVICE]:recv msg, channel_id=%s(file=%s, function=%s, line=%d)\n",
channel_id, __FILE__, __FUNCTION__, __LINE__);
struct my_ctx *x = (struct my_ctx *) (user_ctx);
if (x == NULL) {
//注意用完后要释放
delete_data_service_msg(msg);
return;
}
printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
printf("[DATASERVICE]:msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
switch (msg->topic_type) {
case TOPIC_TYPE_DATA_SUBSCRIBE:
case TOPIC_TYPE_DATA_SUBSCRIBE_ALL:
print_data_subscribe_msg((struct DataSubscribeStruct *) (msg->msg));
break;
default:
printf("[DATASERVICE]:unsupported msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
break;
}
//注意用完后要释放
delete_data_service_msg(msg);
return;
}
int main(int argc, char *argv[]) {
int ret = 0;
struct IPBox *ip_list = NULL;
char topic_name[60];
memset(topic_name, 0, sizeof(topic_name));
char consumerGroup[60];
memset(consumerGroup, 0, sizeof(consumerGroup));
//需要连接的ip
ip_list = add_ip_to_ip_box(ip_list, "127.0.0.1");
//订阅的topic类型,实时数据订阅可以选TOPIC_TYPE_DATA_SUBSCRIBE或TOPIC_TYPE_AUTO
int topic_type = TOPIC_TYPE_DATA_SUBSCRIBE;
//端口
int port = EDGE_DATASERVICE_DEFAULT_PORT;
//topic名字,实时数据订阅的topic都是以DATASVC.SUB.开头
snprintf(topic_name, sizeof(topic_name), "%s", "DATASVC.SUB.APP.SUBTEST");
//consumerGroup,非必填,如果consumerGroup是NULL,则默认为default,详细说明参考new_data_service_ctx函数注释
snprintf(consumerGroup, sizeof(consumerGroup), "%s", "default");
struct DataServiceCtx *ctx = NULL;
//1.init_edge_service_ctx;初始化全局变量,非线程安全
ret = init_edge_service_ctx();
if (ret < 0) {
printf("[DATASERVICE_TEST]:init_edge_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__,
__FUNCTION__, __LINE__);
delete_ip_box(ip_list);
return -1;
}
struct my_ctx *user_ctx = (struct my_ctx *) malloc(sizeof(struct my_ctx));
if (user_ctx == NULL) {
printf("[DATASERVICE_TEST]:user_ctx malloc error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
__LINE__);
delete_ip_box(ip_list);
uninit_edge_service_ctx();
return -1;
}
memset(user_ctx, 0, sizeof(struct my_ctx));
user_ctx->aa = 10;
snprintf(user_ctx->bb, sizeof(user_ctx->bb), "%s", "user_custom_info");
//2.set_log_level(EDGE_LOG_INFO);设置日志等级,不设置默认为EDGE_LOG_INFO,线程安全,可以随时调用这个函数,实时生效
set_log_level(EDGE_LOG_INFO);
//3.set_log_writer(my_log_writer, user_ctx);设置打印函数,sdk内部需要打印时会调用这个打印函数打印,如果不设置,默认打印到命令行终端,打印函数中注意数据用完后需要delete_log_info_box释放,非线程安全,一开始设置一次就可以了
set_log_writer(my_log_writer, user_ctx);
//4.调用new_data_service_ctx函数,初始化必要的上下文,线程安全,注意当只填一个IP时,无论这个IP是主还是备,都会去向这个IP订阅数据,填多个IP时,会根据主备情况自动切换
ctx = new_data_service_ctx(
ip_list,
port,
"your accessKey",
"your secretKey",
topic_name,
consumerGroup,
topic_type,
1,
user_ctx,
connect_callback,
close_callback,
msg_callback
);
if (ctx == NULL) {
printf("[DATASERVICE_TEST]:new_data_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
__LINE__);
delete_ip_box(ip_list);
ip_list = NULL;
free(user_ctx);
user_ctx = NULL;
uninit_edge_service_ctx();
return -1;
}
//5.调用data_service_ctx_start函数,启动相关模块,开始从服务端接收数据,非线程安全
ret = data_service_ctx_start(ctx);
if (ret < 0) {
printf("[DATASERVICE_TEST]:data_service_ctx_start error(file=%s, function=%s, line=%d)\n", __FILE__,
__FUNCTION__, __LINE__);
delete_ip_box(ip_list);
ip_list = NULL;
free(user_ctx);
user_ctx = NULL;
delete_data_service_ctx(ctx);
ctx = NULL;
uninit_edge_service_ctx();
return -1;
}
//运行60s后退出
edge_sleep(60000);
//不使用时记得stop和delete
//ip_box不使用时是需要释放的
delete_ip_box(ip_list);
//6.可以调用data_service_ctx_stop函数暂停接收数据,start和stop的调用要成对,不能在没有调用start的情况下调用stop,也不能再已经start的情况下调用start,非线程安全
data_service_ctx_stop(ctx);
//7.delete_data_service_ctx;释放new_data_service_ctx占用的资源,退出时需要调用,需要在调用stop之后调用该函数,非线程安全
delete_data_service_ctx(ctx);
//8.uninit_edge_service_ctx;释放init_edge_service_ctx占用的资源,退出时需要调用,需要在调用delete_data_service_ctx之后调用该函数,非线程安全
uninit_edge_service_ctx();
free(user_ctx);
return 0;
}
实时数据订阅结构
struct DataSubscribeStruct {
int point_count;//points中数组元素的个数
struct DataSubscribeSubStruct *points;//struct DataSubscribeSubStruct的一个数组,points是这个数组的头指针
};
struct DataSubscribeSubStruct {
char *orgid;//组织id
char *modelid;//模型id
char *assetid;//资产id
char *collectdeviceid;//设备id(一个设备在edge内部的id)
char *pointid;//模型点id
int64_t time;//时间戳(utc毫秒)
char *value;//值的字符串表示,如"3.3", "[1,2,3]", "[\"a\", \"b\", \"c\"]", "test"
int32_t quality;//质量位
int64_t dq;//edge流式计算使用的质量位
char *modelpath;//模型id的全路径,如/parent/child
char *policytype;//表示tsdb的存储类型,参考POLICY_TYPE
char *signaltype;//表示数据点的类型,参考SIGNAL_TYPE
int hasquality;//该模型点是否定义了质量位
char *datatype;//value字段的数据类型,参考DATATYPE定义
char *subdatatype;//value数组元素的数据类型,只有当datatype为ARRAY时才有效,参考SUB_DATATYPE
char *attr;//保留字段
int32_t usingoem;//time字段是否使用的是oem时标,0时使用edge系统时标,1时使用设备上送的oem时间戳
int64_t oemtime;//设备上送的oem时间戳(utc毫秒)
int32_t pointtype;//模型点类型,参考POINT_TYPE
};
消费设备异步控制的返回结果示例¶
以下示例为使用 C SDK 消费订阅的异步控制返回结果代码示例:
#include "edge_service_c_api/edge_service_c_api.h"
struct my_ctx {
int aa;
char bb[128];
};
void my_log_writer(struct LogInfoBox *log_info, void *user_ctx) {
if (log_info == NULL) {
return;
}
printf("[TEST-%s]:%s(file=%s, function=%s, line=%d)\n", log_info->module_name, log_info->log_str,
log_info->file_path,
log_info->func_name, log_info->line_num);
//注意用完之后要释放
delete_log_info_box(log_info);
}
void print_control_response_msg(struct ControlResponseStruct *crs_ptr) {
printf("---------------------->\n");
if (crs_ptr == NULL) {
printf("crs_ptr == NULL\n");
return;
}
printf("crs_ptr->point_count=%d\n", crs_ptr->point_count);
int ii = 0;
for (ii = 0; ii < crs_ptr->point_count; ii++) {
//调用方自定义数据处理,具体结构参照以下设备异步控制返回结果数据订阅结构
}
}
void connect_callback(void *work_ctx, char *channel_id, void *user_ctx) {
printf("[DATASERVICE]:connected, channel_id=%s(file=%s, function=%s, line=%d)\n",
channel_id, __FILE__, __FUNCTION__, __LINE__);
struct my_ctx *x = (struct my_ctx *) (user_ctx);
if (x == NULL) {
return;
}
printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
return;
}
void close_callback(void *work_ctx, char *channel_id, void *user_ctx) {
printf("[DATASERVICE]:closed, channel_id=%s(file=%s, function=%s, line=%d)\n", channel_id,
__FILE__, __FUNCTION__, __LINE__);
struct my_ctx *x = (struct my_ctx *) (user_ctx);
if (x == NULL) {
return;
}
printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
return;
}
void
msg_callback(void *work_ctx, char *channel_id, struct DataServiceMessage *msg, void *user_ctx) {
printf("[DATASERVICE]:recv msg, channel_id=%s(file=%s, function=%s, line=%d)\n",
channel_id, __FILE__, __FUNCTION__, __LINE__);
struct my_ctx *x = (struct my_ctx *) (user_ctx);
if (x == NULL) {
//注意用完后要释放
delete_data_service_msg(msg);
return;
}
printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
printf("[DATASERVICE]:msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
switch (msg->topic_type) {
case TOPIC_TYPE_CONTROL_RESPONSE:
print_control_response_msg((struct ControlResponseStruct *) (msg->msg));
break;
default:
printf("[DATASERVICE]:unsupported msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
break;
}
//注意用完后要释放
delete_data_service_msg(msg);
return;
}
int main(int argc, char *argv[]) {
int ret = 0;
struct IPBox *ip_list = NULL;
char topic_name[60];
memset(topic_name, 0, sizeof(topic_name));
char consumerGroup[60];
memset(consumerGroup, 0, sizeof(consumerGroup));
//需要连接的ip
ip_list = add_ip_to_ip_box(ip_list, "127.0.0.1");
//订阅的topic类型,控制反较数据订阅可以选TOPIC_TYPE_CONTROL_RESPONSE或TOPIC_TYPE_AUTO
int topic_type = TOPIC_TYPE_CONTROL_RESPONSE;
//端口
int port = EDGE_DATASERVICE_DEFAULT_PORT;
//topic名字,控制反较数据订阅的topic都是以DATASVC.CONTROL.开头
snprintf(topic_name, sizeof(topic_name), "%s", "DATASVC.CONTROL.APP.SUBTEST");
//consumerGroup,非必填,如果consumerGroup是NULL,则默认为default,详细说明参考new_data_service_ctx函数注释
snprintf(consumerGroup, sizeof(consumerGroup), "%s", "default");
struct DataServiceCtx *ctx = NULL;
//1.init_edge_service_ctx;初始化全局变量,非线程安全
ret = init_edge_service_ctx();
if (ret < 0) {
printf("[DATASERVICE_TEST]:init_edge_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__,
__FUNCTION__, __LINE__);
delete_ip_box(ip_list);
return -1;
}
struct my_ctx *user_ctx = (struct my_ctx *) malloc(sizeof(struct my_ctx));
if (user_ctx == NULL) {
printf("[DATASERVICE_TEST]:user_ctx malloc error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
__LINE__);
delete_ip_box(ip_list);
uninit_edge_service_ctx();
return -1;
}
memset(user_ctx, 0, sizeof(struct my_ctx));
user_ctx->aa = 10;
snprintf(user_ctx->bb, sizeof(user_ctx->bb), "%s", "user_custom_info");
//2.set_log_level(EDGE_LOG_INFO);设置日志等级,不设置默认为EDGE_LOG_INFO,线程安全,可以随时调用这个函数,实时生效
set_log_level(EDGE_LOG_INFO);
//3.set_log_writer(my_log_writer, user_ctx);设置打印函数,sdk内部需要打印时会调用这个打印函数打印,如果不设置,默认打印到命令行终端,打印函数中注意数据用完后需要delete_log_info_box释放,非线程安全,一开始设置一次就可以了
set_log_writer(my_log_writer, user_ctx);
//4.调用new_data_service_ctx函数,初始化必要的上下文,线程安全,注意当只填一个IP时,无论这个IP是主还是备,都会去向这个IP订阅数据,填多个IP时,会根据主备情况自动切换
ctx = new_data_service_ctx(
ip_list,
port,
"your accessKey",
"your secretKey",
topic_name,
consumerGroup,
topic_type,
1,
user_ctx,
connect_callback,
close_callback,
msg_callback
);
if (ctx == NULL) {
printf("[DATASERVICE_TEST]:new_data_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
__LINE__);
delete_ip_box(ip_list);
ip_list = NULL;
free(user_ctx);
user_ctx = NULL;
uninit_edge_service_ctx();
return -1;
}
//5.调用data_service_ctx_start函数,启动相关模块,开始从服务端接收数据,非线程安全
ret = data_service_ctx_start(ctx);
if (ret < 0) {
printf("[DATASERVICE_TEST]:data_service_ctx_start error(file=%s, function=%s, line=%d)\n", __FILE__,
__FUNCTION__, __LINE__);
delete_ip_box(ip_list);
ip_list = NULL;
free(user_ctx);
user_ctx = NULL;
delete_data_service_ctx(ctx);
ctx = NULL;
uninit_edge_service_ctx();
return -1;
}
//运行60s后退出
edge_sleep(60000);
//不使用时记得stop和delete
//ip_box不使用时是需要释放的
delete_ip_box(ip_list);
//6.可以调用data_service_ctx_stop函数暂停接收数据,start和stop的调用要成对,不能在没有调用start的情况下调用stop,也不能再已经start的情况下调用start,非线程安全
data_service_ctx_stop(ctx);
//7.delete_data_service_ctx;释放new_data_service_ctx占用的资源,退出时需要调用,需要在调用stop之后调用该函数,非线程安全
delete_data_service_ctx(ctx);
//8.uninit_edge_service_ctx;释放init_edge_service_ctx占用的资源,退出时需要调用,需要在调用delete_data_service_ctx之后调用该函数,非线程安全
uninit_edge_service_ctx();
free(user_ctx);
return 0;
}
设备异步控制的返回结果数据订阅结构
struct ControlResponseStruct {
int point_count;//points中数组元素的个数
struct ControlResponseSubStruct *points;//struct ControlResponseSubStruct 的一个数组,points是这个数组的头指针
};
struct ControlResponseSubStruct {
char *requestid;//控制请求的id
char *messageid;//目前总是为空串
char *requestmethod;//目前总是为空串
char *calltype;//是同步还是异步,参考CALL_TYPE
char *controlchannelid;//控制反较通道id
char *productkey;//资产的productKey,控制请求时填了productKey这边才会有
char *devicekey;//资产的deviceKey,控制请求时填了deviceKey这边才会有
char *assetid;//资产的assetId
char *servicename;//目前总是为空串
char *serviceid;//控制的模型点id
char *callbackurl;//目前总是为空串
char *inputdata;//控制时的请求值
char *outputdata;//目前总是为空串
int64_t status;//控制结果状态码,0表示成功,其他表示失败
char *msg;//控制结果状态码的一些解释
char *submsg;//控制结果状态码的一些详细解释
int64_t timeout;//控制超时时间,单位毫秒
int64_t gmtservicerequest;//这个控制请求时的时间(utc毫秒)
int64_t gmtservicereply;//这个控制反较时的时间(utc毫秒)
int64_t gmtdevicereply;//这个控制设备反较时的时间,设备没有反较时为-1(utc毫秒)
char *attr;//保留字段
};
消费设备异步设点的返回结果示例¶
以下示例为使用 C SDK 消费订阅的异步设点返回结果代码示例:
#include "edge_service_c_api/edge_service_c_api.h"
struct my_ctx {
int aa;
char bb[128];
};
void my_log_writer(struct LogInfoBox *log_info, void *user_ctx) {
if (log_info == NULL) {
return;
}
printf("[TEST-%s]:%s(file=%s, function=%s, line=%d)\n", log_info->module_name, log_info->log_str,
log_info->file_path,
log_info->func_name, log_info->line_num);
//注意用完之后要释放
delete_log_info_box(log_info);
}
void print_set_measurepoint_response_msg(struct SetMeasurepointResponseStruct *smrs_ptr) {
printf("---------------------->\n");
if (smrs_ptr == NULL) {
printf("smrs_ptr == NULL\n");
return;
}
printf("smrs_ptr->point_count=%d\n", smrs_ptr->point_count);
int ii = 0;
for (ii = 0; ii < smrs_ptr->point_count; ii++) {
//调用方自定义数据处理,具体结构参照以下实时数据订阅结构
}
}
void connect_callback(void *work_ctx, char *channel_id, void *user_ctx) {
printf("[DATASERVICE]:connected, channel_id=%s(file=%s, function=%s, line=%d)\n",
channel_id, __FILE__, __FUNCTION__, __LINE__);
struct my_ctx *x = (struct my_ctx *) (user_ctx);
if (x == NULL) {
return;
}
printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
return;
}
void close_callback(void *work_ctx, char *channel_id, void *user_ctx) {
printf("[DATASERVICE]:closed, channel_id=%s(file=%s, function=%s, line=%d)\n", channel_id,
__FILE__, __FUNCTION__, __LINE__);
struct my_ctx *x = (struct my_ctx *) (user_ctx);
if (x == NULL) {
return;
}
printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
return;
}
void
msg_callback(void *work_ctx, char *channel_id, struct DataServiceMessage *msg, void *user_ctx) {
printf("[DATASERVICE]:recv msg, channel_id=%s(file=%s, function=%s, line=%d)\n",
channel_id, __FILE__, __FUNCTION__, __LINE__);
struct my_ctx *x = (struct my_ctx *) (user_ctx);
if (x == NULL) {
//注意用完后要释放
delete_data_service_msg(msg);
return;
}
printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
printf("[DATASERVICE]:msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
switch (msg->topic_type) {
case TOPIC_TYPE_SET_MEASUREPOINT_RESPONSE:
print_set_measurepoint_response_msg((struct SetMeasurepointResponseStruct *) (msg->msg));
break;
default:
printf("[DATASERVICE]:unsupported msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
break;
}
//注意用完后要释放
delete_data_service_msg(msg);
return;
}
int main(int argc, char *argv[]) {
int ret = 0;
struct IPBox *ip_list = NULL;
char topic_name[60];
memset(topic_name, 0, sizeof(topic_name));
char consumerGroup[60];
memset(consumerGroup, 0, sizeof(consumerGroup));
//需要连接的ip
ip_list = add_ip_to_ip_box(ip_list, "127.0.0.1");
//订阅的topic类型,实时数据订阅可以选TOPIC_TYPE_SET_MEASUREPOINT_RESPONSE或TOPIC_TYPE_AUTO
int topic_type = TOPIC_TYPE_SET_MEASUREPOINT_RESPONSE;
//端口
int port = EDGE_DATASERVICE_DEFAULT_PORT;
//topic名字,写值反较数据订阅的topic都是以DATASVC.SET.开头
snprintf(topic_name, sizeof(topic_name), "%s", "DATASVC.SET.APP.SUBTEST");
//consumerGroup,非必填,如果consumerGroup是NULL,则默认为default,详细说明参考new_data_service_ctx函数注释
snprintf(consumerGroup, sizeof(consumerGroup), "%s", "default");
struct DataServiceCtx *ctx = NULL;
//1.init_edge_service_ctx;初始化全局变量,非线程安全
ret = init_edge_service_ctx();
if (ret < 0) {
printf("[DATASERVICE_TEST]:init_edge_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__,
__FUNCTION__, __LINE__);
delete_ip_box(ip_list);
return -1;
}
struct my_ctx *user_ctx = (struct my_ctx *) malloc(sizeof(struct my_ctx));
if (user_ctx == NULL) {
printf("[DATASERVICE_TEST]:user_ctx malloc error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
__LINE__);
delete_ip_box(ip_list);
uninit_edge_service_ctx();
return -1;
}
memset(user_ctx, 0, sizeof(struct my_ctx));
user_ctx->aa = 10;
snprintf(user_ctx->bb, sizeof(user_ctx->bb), "%s", "user_custom_info");
//2.set_log_level(EDGE_LOG_INFO);设置日志等级,不设置默认为EDGE_LOG_INFO,线程安全,可以随时调用这个函数,实时生效
set_log_level(EDGE_LOG_INFO);
//3.set_log_writer(my_log_writer, user_ctx);设置打印函数,sdk内部需要打印时会调用这个打印函数打印,如果不设置,默认打印到命令行终端,打印函数中注意数据用完后需要delete_log_info_box释放,非线程安全,一开始设置一次就可以了
set_log_writer(my_log_writer, user_ctx);
//4.调用new_data_service_ctx函数,初始化必要的上下文,线程安全,注意当只填一个IP时,无论这个IP是主还是备,都会去向这个IP订阅数据,填多个IP时,会根据主备情况自动切换
ctx = new_data_service_ctx(
ip_list,
port,
"your accessKey",
"your secretKey",
topic_name,
consumerGroup,
topic_type,
1,
user_ctx,
connect_callback,
close_callback,
msg_callback
);
if (ctx == NULL) {
printf("[DATASERVICE_TEST]:new_data_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
__LINE__);
delete_ip_box(ip_list);
ip_list = NULL;
free(user_ctx);
user_ctx = NULL;
uninit_edge_service_ctx();
return -1;
}
//5.调用data_service_ctx_start函数,启动相关模块,开始从服务端接收数据,非线程安全
ret = data_service_ctx_start(ctx);
if (ret < 0) {
printf("[DATASERVICE_TEST]:data_service_ctx_start error(file=%s, function=%s, line=%d)\n", __FILE__,
__FUNCTION__, __LINE__);
delete_ip_box(ip_list);
ip_list = NULL;
free(user_ctx);
user_ctx = NULL;
delete_data_service_ctx(ctx);
ctx = NULL;
uninit_edge_service_ctx();
return -1;
}
//运行60s后退出
edge_sleep(60000);
//不使用时记得stop和delete
//ip_box不使用时是需要释放的
delete_ip_box(ip_list);
//6.可以调用data_service_ctx_stop函数暂停接收数据,start和stop的调用要成对,不能在没有调用start的情况下调用stop,也不能再已经start的情况下调用start,非线程安全
data_service_ctx_stop(ctx);
//7.delete_data_service_ctx;释放new_data_service_ctx占用的资源,退出时需要调用,需要在调用stop之后调用该函数,非线程安全
delete_data_service_ctx(ctx);
//8.uninit_edge_service_ctx;释放init_edge_service_ctx占用的资源,退出时需要调用,需要在调用delete_data_service_ctx之后调用该函数,非线程安全
uninit_edge_service_ctx();
free(user_ctx);
return 0;
}
设备异步设点的返回结果数据订阅结构
struct SetMeasurepointResponseStruct {
int point_count;//points中数组元素的个数
struct SetMeasurepointResponseSubStruct *points;//struct SetMeasurepointResponseSubStruct 的一个数组,points是这个数组的头指针
};
struct SetMeasurepointResponseSubStruct {
char *requestid;//控制请求的id
char *orgid;//组织id
char *calltype;//是同步还是异步,参考CALL_TYPE
char *setmeasurepointchannelid;//写值反较通道id
char *productkey;//资产的productKey,控制请求时填了productKey这边才会有
char *devicekey;//资产的deviceKey,控制请求时填了deviceKey这边才会有
char *assetid;//资产的assetId
char *measurepointid;//写值的模型点id
char *callbackurl;//目前总是为空串
char *inputdata;//写值时的请求值
int64_t status;//写值结果状态码,0表示成功,其他表示失败
char *msg;//写值结果状态码的一些解释
char *submsg;//写值结果状态码的一些详细解释
int64_t timeout;//写值超时时间,单位毫秒
int64_t gmtsetmeasurepointrequest;//写值请求时填的gmtsetmeasurepointrequest,如果没填,则会填请求时的edge系统时间,这个值会做为写值点的time字段(utc毫秒)
int64_t gmtsetmeasurepointreply;//这个写值反较时的时间(utc毫秒)
char *attr;//保留字段
};
消费自定义订阅数据示例¶
以下示例为使用 C SDK 自定义消费订阅的数据代码示例:
#include "edge_service_c_api/edge_service_c_api.h"
struct my_ctx {
int aa;
char bb[128];
};
void my_log_writer(struct LogInfoBox *log_info, void *user_ctx) {
if (log_info == NULL) {
return;
}
printf("[TEST-%s]:%s(file=%s, function=%s, line=%d)\n", log_info->module_name, log_info->log_str,
log_info->file_path,
log_info->func_name, log_info->line_num);
//注意用完之后要释放
delete_log_info_box(log_info);
}
void print_custom_msg(char *custom_ptr, int len) {
printf("---------------------->\n");
if (custom_ptr == NULL) {
printf("custom_ptr == NULL\n");
return;
}
int ii = 0;
printf("hex:");
for (ii = 0; ii < len; ii++) {
printf("%hhx ", custom_ptr[ii]);
}
printf("\n");
printf("str:%s\n", custom_ptr);
printf("<----------------------\n\n");
}
void connect_callback(void *work_ctx, char *channel_id, void *user_ctx) {
printf("[DATASERVICE]:connected, channel_id=%s(file=%s, function=%s, line=%d)\n",
channel_id, __FILE__, __FUNCTION__, __LINE__);
struct my_ctx *x = (struct my_ctx *) (user_ctx);
if (x == NULL) {
return;
}
printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
return;
}
void close_callback(void *work_ctx, char *channel_id, void *user_ctx) {
printf("[DATASERVICE]:closed, channel_id=%s(file=%s, function=%s, line=%d)\n", channel_id,
__FILE__, __FUNCTION__, __LINE__);
struct my_ctx *x = (struct my_ctx *) (user_ctx);
if (x == NULL) {
return;
}
printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
return;
}
void
msg_callback(void *work_ctx, char *channel_id, struct DataServiceMessage *msg, void *user_ctx) {
printf("[DATASERVICE]:recv msg, channel_id=%s(file=%s, function=%s, line=%d)\n",
channel_id, __FILE__, __FUNCTION__, __LINE__);
struct my_ctx *x = (struct my_ctx *) (user_ctx);
if (x == NULL) {
//注意用完后要释放
delete_data_service_msg(msg);
return;
}
printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
printf("[DATASERVICE]:msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
switch (msg->topic_type) {
case TOPIC_TYPE_CUSTOM:
print_custom_msg((char *) (msg->msg), msg->msg_len);
break;
default:
printf("[DATASERVICE]:unsupported msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
break;
}
//注意用完后要释放
delete_data_service_msg(msg);
return;
}
int main(int argc, char *argv[]) {
int ret = 0;
struct IPBox *ip_list = NULL;
char topic_name[60];
memset(topic_name, 0, sizeof(topic_name));
char consumerGroup[60];
memset(consumerGroup, 0, sizeof(consumerGroup));
//需要连接的ip
ip_list = add_ip_to_ip_box(ip_list, "127.0.0.1");
//订阅的topic类型,实时数据订阅可以选TOPIC_TYPE_CUSTOM或TOPIC_TYPE_AUTO,custom类型不会解析订阅到的数据,会原样返回给调用方
int topic_type = TOPIC_TYPE_CUSTOM;
//端口
int port = EDGE_DATASERVICE_DEFAULT_PORT;
//topic名字,custom订阅方式时,可以订阅任意topic,包括实时数据、控制反较、写值反较、其他数据
snprintf(topic_name, sizeof(topic_name), "%s", "custom_topic");
//consumerGroup,非必填,如果consumerGroup是NULL,则默认为default,详细说明参考new_data_service_ctx函数注释
snprintf(consumerGroup, sizeof(consumerGroup), "%s", "default");
struct DataServiceCtx *ctx = NULL;
//1.init_edge_service_ctx;初始化全局变量,非线程安全
ret = init_edge_service_ctx();
if (ret < 0) {
printf("[DATASERVICE_TEST]:init_edge_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__,
__FUNCTION__, __LINE__);
delete_ip_box(ip_list);
return -1;
}
struct my_ctx *user_ctx = (struct my_ctx *) malloc(sizeof(struct my_ctx));
if (user_ctx == NULL) {
printf("[DATASERVICE_TEST]:user_ctx malloc error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
__LINE__);
delete_ip_box(ip_list);
uninit_edge_service_ctx();
return -1;
}
memset(user_ctx, 0, sizeof(struct my_ctx));
user_ctx->aa = 10;
snprintf(user_ctx->bb, sizeof(user_ctx->bb), "%s", "user_custom_info");
//2.set_log_level(EDGE_LOG_INFO);设置日志等级,不设置默认为EDGE_LOG_INFO,线程安全,可以随时调用这个函数,实时生效
set_log_level(EDGE_LOG_INFO);
//3.set_log_writer(my_log_writer, user_ctx);设置打印函数,sdk内部需要打印时会调用这个打印函数打印,如果不设置,默认打印到命令行终端,打印函数中注意数据用完后需要delete_log_info_box释放,非线程安全,一开始设置一次就可以了
set_log_writer(my_log_writer, user_ctx);
//4.调用new_data_service_ctx函数,初始化必要的上下文,线程安全,注意当只填一个IP时,无论这个IP是主还是备,都会去向这个IP订阅数据,填多个IP时,会根据主备情况自动切换
ctx = new_data_service_ctx(
ip_list,
port,
"your accessKey",
"your secretKey",
topic_name,
consumerGroup,
topic_type,
1,
user_ctx,
connect_callback,
close_callback,
msg_callback
);
if (ctx == NULL) {
printf("[DATASERVICE_TEST]:new_data_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
__LINE__);
delete_ip_box(ip_list);
ip_list = NULL;
free(user_ctx);
user_ctx = NULL;
uninit_edge_service_ctx();
return -1;
}
//5.调用data_service_ctx_start函数,启动相关模块,开始从服务端接收数据,非线程安全
ret = data_service_ctx_start(ctx);
if (ret < 0) {
printf("[DATASERVICE_TEST]:data_service_ctx_start error(file=%s, function=%s, line=%d)\n", __FILE__,
__FUNCTION__, __LINE__);
delete_ip_box(ip_list);
ip_list = NULL;
free(user_ctx);
user_ctx = NULL;
delete_data_service_ctx(ctx);
ctx = NULL;
uninit_edge_service_ctx();
return -1;
}
//运行60s后退出
edge_sleep(60000);
//不使用时记得stop和delete
//ip_box不使用时是需要释放的
delete_ip_box(ip_list);
//6.可以调用data_service_ctx_stop函数暂停接收数据,start和stop的调用要成对,不能在没有调用start的情况下调用stop,也不能再已经start的情况下调用start,非线程安全
data_service_ctx_stop(ctx);
//7.delete_data_service_ctx;释放new_data_service_ctx占用的资源,退出时需要调用,需要在调用stop之后调用该函数,非线程安全
delete_data_service_ctx(ctx);
//8.uninit_edge_service_ctx;释放init_edge_service_ctx占用的资源,退出时需要调用,需要在调用delete_data_service_ctx之后调用该函数,非线程安全
uninit_edge_service_ctx();
free(user_ctx);
return 0;
}