/*************************************************************** Copyright © huijue Network Co., Ltd. 1998-2129. All rights reserved. Copyright © 上海汇珏网络通信设备股份有限公司 1998-2129. All rights reserved. 文件名 : MqttClient.c 作者 : kooloo 版本 : V1.0 描述 : 动环监控/边缘网关FSU主进程入口 支持设置调试输出口 硬件平台 : iMX6ULL 内核版本 : linux-imx-4.1.15-2.1.0-g3dc0a4b-v2.7 编译器版本 :gcc-linaro-4.9.4-2017.01-x86_64_arm-linux-gnueabihf 日志 : 初版V1.0 2023/7/15 kooloo创建 ***************************************************************/ #include #include #include #include #include #include #include #include "MQTTClient.h" //包含MQTT客户端库头文件 #include #include "public.h" //公共函数头文件 #include //posix 内存共享 消息队列 #include #include "mslog.h" #include "SystemTimeFunc.h" #include "ProcessisRunning.h" #include #include "cJSON.h" // #include "MqttClient.h" #include static int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { if (!strcmp(topicName, g_DeviceCfg.MqttConf.ServerPublicTopic_GateWay)) //校验消息的主题 kooloo add 202312 { if(message->payloadlenpayload); if (NULL == json_rt) // 接收到其它数据 不做处理 { ms_info1("非json:%s\n", message->payload); } else //if(0) //json 解析正常 判断 { cJSON *json_FsuID = cJSON_GetObjectItem(json_rt, C_FSUCODENAME); //获取表中ID 并做判断 if (!strcmp(g_DeviceCfg.devConf.DeviceNumber, json_FsuID->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 { cJSON *json_Type = cJSON_GetObjectItem(json_rt, C_GATEWAY_DATATYPE); //获取表中ID 并做判断 if (!strcmp(C_SERVER_READDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥测 { int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0); ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_READDATA,message->payloadlen, message->payload,len); } else if (!strcmp(C_SERVER_WRITEDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥调 { int tyid; tyid=Mq_cJson_ReturnTyid(message->payload, message->payloadlen); //获取ID kooloo add 20240312 //cJSON_ReplaceItemInObject(json_rt, C_GATEWAY_DATATYPE, cJSON_CreateString(C_GATEWAY_WRITEDATA)); //mq_sendJson(json_rt); //将遥*数据发送 kooloo add 202312 char* tmsgbuf_spvp = (char*)malloc(C_SERIALPORT_MQ_MSGSIZE); int len; memset(tmsgbuf_spvp, 0, C_SERIALPORT_MQ_MSGSIZE); strncpy(tmsgbuf_spvp, message->payload, C_SERIALPORT_MQ_MSGSIZE); //将数据拷贝 kooloo add 202401 if(tFunTyID_DevCod_buf[tyid].PortIDvaluestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len); } else if(tFunTyID_DevCod_buf[tyid].PortID<(eSDPort_Number_Max+eVirtuallyPort_Number_Max)) //虚拟串口 kooloo add 20240312 { len = mq_send(MqdVtSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0); printf("Mqtt MqdVtSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len); } free(tmsgbuf_spvp); //用完释放 kooloo add 202401 //mq_sendRerJson_subdevfromID(tyid, oid.SimilarDevSN - 1); //ms_info1("type :%d cjson:%s,Rxlen:%d\n",oid.SignalType,tmsgbuf, strlen(tmsgbuf)); //int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0); //ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_WRITEDATA,message->payloadlen, message->payload,len); } else if (!strcmp(C_SERVER_ALARMDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥信 { int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0); ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_ALARMDATA,message->payloadlen, message->payload,len); } else if (!strcmp(C_SERVER_CRTLDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥控 { int tyid; tyid=Mq_cJson_ReturnTyid(message->payload, message->payloadlen); //获取ID kooloo add 20240312 //cJSON_ReplaceItemInObject(json_rt, C_GATEWAY_DATATYPE, cJSON_CreateString(C_GATEWAY_WRITEDATA)); //mq_sendJson(json_rt); //将遥*数据发送 kooloo add 202312 char* tmsgbuf_spvp = (char*)malloc(C_SERIALPORT_MQ_MSGSIZE); int len; memset(tmsgbuf_spvp, 0, C_SERIALPORT_MQ_MSGSIZE); strncpy(tmsgbuf_spvp, message->payload, C_SERIALPORT_MQ_MSGSIZE); //将数据拷贝 kooloo add 202401 if(tFunTyID_DevCod_buf[tyid].PortIDvaluestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len); } else if(tFunTyID_DevCod_buf[tyid].PortID<(eSDPort_Number_Max+eVirtuallyPort_Number_Max)) //虚拟串口 kooloo add 20240312 { len = mq_send(MqdVtSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0); printf("Mqtt MqdVtSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len); } free(tmsgbuf_spvp); //用完释放 kooloo add 202401 //mq_sendRerJson_subdevfromID(tyid, oid.SimilarDevSN - 1); //ms_info1("type :%d cjson:%s,Rxlen:%d\n",oid.SignalType,tmsgbuf, strlen(tmsgbuf)); //int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0); //ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_CRTLDATA,message->payloadlen, message->payload,len); } else { ms_info1("type is wrong! jsonlen %d json:%s \n",message->payloadlen, message->payload); } } cJSON_Delete(json_rt); //删除 } } } /* 释放占用的内存空间 */ MQTTClient_freeMessage(&message); MQTTClient_free(topicName); /* 退出 */ return 1; } static void connlost(void *context, char *cause) { ms_info1("\nConnection lost\n"); ms_info1(" cause: %s\n", cause); } int main(int argc, char *argv[]) { MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_willOptions will_opts = MQTTClient_willOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; int rc; int Temp_Counter; int ret,fd; //临时变量 //设置日志级别为more;打开标准输出、打印所在行数和函数名、文件日志功能; //设置日志目录为:/tmp/mslog; //设置日志文件为:mslog_sample.txt; //FLAG为LOG_API_TEST或TAG_TEST2的日志,不进行打印; // mslog_api_init((mslog_level_warn|mslog_enable_stdprint|mslog_enable_linefunc|mslog_enable_filelog), // "/tmp/mslog","mslog_fsu.txt","LOG_API_TEST|TAG_TEST2"); mslog_api_init(C_MSLOG_FLAG_MC,S_MSLOGDIR_PATH,S_MSLOGFILE_NAME_MC,"LOG_API_TEST|TAG_TEST2"); printf("Mqtt App\n"); //开机 程序打印信息 做区分使用 ms_info1("Mqtt SystemIpc_Init\n"); //数据库初始化 kooloo add 202309 SystemIpc_Init(); //申请共享内存和队列 kooloo add 202311 ms_info1("Mqtt sqTable_init\n"); //数据库初始化 kooloo add 202309 sqTable_init(C_SQTABLE_INIT_getDeviceCfg); //数据库初始化 /* 打印进程信息及编译时间 */ ms_info1("MqttClientApp process!\n"); ms_info1("Make time is: %s %s\n", __DATE__, __TIME__); /* 创建mqtt客户端对象 */ if (MQTTCLIENT_SUCCESS != (rc = MQTTClient_create(&client, g_DeviceCfg.MqttConf.BrokerAddress_Port, g_DeviceCfg.MqttConf.MqttClientID,MQTTCLIENT_PERSISTENCE_NONE, NULL))) { ms_fatal1("Failed to create client, return code %d\n", rc); exit(-1); } /* 设置回调 */ if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_setCallbacks(client, NULL, connlost,msgarrvd, NULL))) { ms_fatal1("Failed to set callbacks, return code %d\n", rc); MQTTClient_destroy(&client); //回调设置失败,将空间释放 exit(-1); } /* 连接MQTT服务器 */ will_opts.topicName = g_DeviceCfg.MqttConf.MqttWillToplic_Server; //遗嘱主题 will_opts.message = "Unexpected disconnection";//遗嘱消息 will_opts.retained = 1; //保留消息 will_opts.qos = 0; //QoS0 conn_opts.will = &will_opts; conn_opts.keepAliveInterval = 30; //心跳包间隔时间 conn_opts.cleansession = 0; //cleanSession标志 conn_opts.username = g_DeviceCfg.MqttConf.MqttUserName; //用户名 conn_opts.password = g_DeviceCfg.MqttConf.MqttPassWord; //密码 if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_connect(client, &conn_opts))) { ms_fatal1("Failed to connect, return code %d\n", rc); MQTTClient_destroy(&client); //回调设置失败,将空间释放 exit(-1); } ms_info1("MQTT服务器连接成功!\n"); /* 发布上线消息 */ pubmsg.payload = "Online"; //消息的内容 pubmsg.payloadlen = 6; //内容的长度 pubmsg.qos = 0; //QoS等级 pubmsg.retained = 1; //保留消息 if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.MqttWillToplic_Server, &pubmsg, NULL))) { ms_info1("重新发一次!\n"); if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.MqttWillToplic_Server, &pubmsg, NULL))) { ms_fatal1("Failed to publish message, return code %d\n", rc); if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_disconnect(client, 10000))) { ms_fatal1("Failed to disconnect, return code %d\n", rc); } MQTTClient_destroy(&client); //回调设置失败,将空间释放 exit(-1); } } /* 订阅主题 ServerPublicTopic_GateWay */ if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_subscribe(client, g_DeviceCfg.MqttConf.ServerPublicTopic_GateWay, 0))) { ms_fatal1("Failed to subscribe, return code %d\n", rc); if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_disconnect(client, 10000))) { ms_fatal1("Failed to disconnect, return code %d\n", rc); } MQTTClient_destroy(&client); //回调设置失败,将空间释放 exit(-1); } else { ms_info1("subscribe ServerPublicTopic_GateWay ok > \n"); } //uint8_t Mqmsgbuf[C_SRCToDST_MQ_MSGSIZE]; char* Mqmsgbuf = (char*)malloc(C_SRCToDST_MQ_MSGSIZE); memset(Mqmsgbuf, 0, C_SRCToDST_MQ_MSGSIZE); int Mqmsglen; MQTTClient_message Mqmsg = MQTTClient_message_initializer; while(1) { Mqmsglen = mq_receive(MqdSrcToDst, Mqmsgbuf, C_SRCToDST_MQ_MSGSIZE, NULL); if(Mqmsglen>0) { ms_info1("Mqtt %.*s %d\n",Mqmsglen, Mqmsgbuf,Mqmsglen); /* 发布时间信息 */ Mqmsg.payload = Mqmsgbuf; //消息的内容 Mqmsg.payloadlen = Mqmsglen; //内容的长度 Mqmsg.qos = 0; //QoS等级 Mqmsg.retained = 1; //保留消息 if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.GateWayPublicTopic_Server, &Mqmsg, NULL))) { ms_info1("重新发一次!\n"); sleep(1); //延时1s重新发送 if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.GateWayPublicTopic_Server, &Mqmsg, NULL))) { ms_fatal1("Failed to publish message, return code %d payloadlen %d\n", rc,Mqmsg.payloadlen); if(rc = MQTTClient_unsubscribe(client, g_DeviceCfg.MqttConf.ServerPublicTopic_GateWay)) { ms_fatal1("Failed to unsubscribe, return code %d\n", rc); } if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_disconnect(client, 10000))) { ms_fatal1("Failed to disconnect, return code %d\n", rc); } MQTTClient_destroy(&client); //回调设置失败,将空间释放 exit(-1); } } } sleep(1); //判断轮询 速度高 kooloo add 202312 } MQTTClient_destroy(&client); //释放资源 kooloo add 202311 System_Exit(); //系统退出 将公共的部分释放 exit(0); }