EdgeGateway_FSU/DevicePortGet/Mqtt_src/MqttClient.c

288 lines
12 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

/***************************************************************
Copyright © huijue Network Co., Ltd. 1998-2129. All rights reserved.
Copyright © 上海汇珏网络通信设备股份有限公司 1998-2129. All rights reserved.
文件名 : MqttClient.c
作者 : kooloo
版本 : V1.0
描述 : 动环监控/边缘网关FSU主进程入口 支持设置调试输出口
硬件平台 : iMX6ULL
内核版本 : linux-imx-4.1.15-2.1.0-g3dc0a4b-v2.7
编译器版本 gcc-linaro-4.9.4-2017.01-x86_64_arm-linux-gnueabihf
日志 : 初版V1.0 2023/7/15 kooloo创建
***************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "MQTTClient.h" //包含MQTT客户端库头文件
#include <errno.h>
#include "public.h" //公共函数头文件
#include <sys/mman.h> //posix 内存共享 消息队列
#include <sys/stat.h>
#include "mslog.h"
#include "SystemTimeFunc.h"
#include "ProcessisRunning.h"
#include <time.h>
#include "cJSON.h" //
#include "MqttClient.h"
#include <FuncConfigTypeID_Public.h>
static int msgarrvd(void *context, char *topicName, int topicLen,
MQTTClient_message *message)
{
if (!strcmp(topicName, g_DeviceCfg.MqttConf.ServerPublicTopic_GateWay)) //校验消息的主题 kooloo add 202312
{
if(message->payloadlen<C_SRCToDST_MQ_MSGSIZE)
{
cJSON *json_rt = cJSON_Parse(message->payload);
if (NULL == json_rt) // 接收到其它数据 不做处理
{
ms_info1("非json:%s\n", message->payload);
}
else //if(0) //json 解析正常 判断
{
cJSON *json_FsuID = cJSON_GetObjectItem(json_rt, C_FSUCODENAME); //获取表中ID 并做判断
if (!strcmp(g_DeviceCfg.devConf.DeviceNumber, json_FsuID->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略
{
cJSON *json_Type = cJSON_GetObjectItem(json_rt, C_GATEWAY_DATATYPE); //获取表中ID 并做判断
if (!strcmp(C_SERVER_READDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥测
{
int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0);
ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_READDATA,message->payloadlen, message->payload,len);
}
else if (!strcmp(C_SERVER_WRITEDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥调
{
int tyid;
tyid=Mq_cJson_ReturnTyid(message->payload, message->payloadlen); //获取ID kooloo add 20240312
//cJSON_ReplaceItemInObject(json_rt, C_GATEWAY_DATATYPE, cJSON_CreateString(C_GATEWAY_WRITEDATA));
//mq_sendJson(json_rt); //将遥*数据发送 kooloo add 202312
char* tmsgbuf_spvp = (char*)malloc(C_SERIALPORT_MQ_MSGSIZE);
int len;
memset(tmsgbuf_spvp, 0, C_SERIALPORT_MQ_MSGSIZE);
strncpy(tmsgbuf_spvp, message->payload, C_SERIALPORT_MQ_MSGSIZE); //将数据拷贝 kooloo add 202401
if(tFunTyID_DevCod_buf[tyid].PortID<eSDPort_Number_Max) //如果为普通串口 则将消息传递到 普通串口队列中,否则传到虚拟串口中 kooloo add 20240312
{
len = mq_send(MqdSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0);
printf("Mqtt MqdSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len);
}
else if(tFunTyID_DevCod_buf[tyid].PortID<(eSDPort_Number_Max+eVirtuallyPort_Number_Max)) //虚拟串口 kooloo add 20240312
{
len = mq_send(MqdVtSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0);
printf("Mqtt MqdVtSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len);
}
free(tmsgbuf_spvp); //用完释放 kooloo add 202401
//mq_sendRerJson_subdevfromID(tyid, oid.SimilarDevSN - 1);
//ms_info1("type :%d cjson:%s,Rxlen:%d\n",oid.SignalType,tmsgbuf, strlen(tmsgbuf));
//int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0);
//ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_WRITEDATA,message->payloadlen, message->payload,len);
}
else if (!strcmp(C_SERVER_ALARMDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥信
{
int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0);
ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_ALARMDATA,message->payloadlen, message->payload,len);
}
else if (!strcmp(C_SERVER_CRTLDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥控
{
int tyid;
tyid=Mq_cJson_ReturnTyid(message->payload, message->payloadlen); //获取ID kooloo add 20240312
//cJSON_ReplaceItemInObject(json_rt, C_GATEWAY_DATATYPE, cJSON_CreateString(C_GATEWAY_WRITEDATA));
//mq_sendJson(json_rt); //将遥*数据发送 kooloo add 202312
char* tmsgbuf_spvp = (char*)malloc(C_SERIALPORT_MQ_MSGSIZE);
int len;
memset(tmsgbuf_spvp, 0, C_SERIALPORT_MQ_MSGSIZE);
strncpy(tmsgbuf_spvp, message->payload, C_SERIALPORT_MQ_MSGSIZE); //将数据拷贝 kooloo add 202401
if(tFunTyID_DevCod_buf[tyid].PortID<eSDPort_Number_Max) //如果为普通串口 则将消息传递到 普通串口队列中,否则传到虚拟串口中 kooloo add 20240312
{
len = mq_send(MqdSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0);
printf("Mqtt MqdSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len);
}
else if(tFunTyID_DevCod_buf[tyid].PortID<(eSDPort_Number_Max+eVirtuallyPort_Number_Max)) //虚拟串口 kooloo add 20240312
{
len = mq_send(MqdVtSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0);
printf("Mqtt MqdVtSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len);
}
free(tmsgbuf_spvp); //用完释放 kooloo add 202401
//mq_sendRerJson_subdevfromID(tyid, oid.SimilarDevSN - 1);
//ms_info1("type :%d cjson:%s,Rxlen:%d\n",oid.SignalType,tmsgbuf, strlen(tmsgbuf));
//int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0);
//ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_CRTLDATA,message->payloadlen, message->payload,len);
}
else
{
ms_info1("type is wrong! jsonlen %d json:%s \n",message->payloadlen, message->payload);
}
}
cJSON_Delete(json_rt); //删除
}
}
}
/* 释放占用的内存空间 */
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
/* 退出 */
return 1;
}
static void connlost(void *context, char *cause)
{
ms_info1("\nConnection lost\n");
ms_info1(" cause: %s\n", cause);
}
int main(int argc, char *argv[])
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_willOptions will_opts = MQTTClient_willOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
int rc;
int Temp_Counter;
int ret,fd; //临时变量
//设置日志级别为more打开标准输出、打印所在行数和函数名、文件日志功能
//设置日志目录为:/tmp/mslog;
//设置日志文件为mslog_sample.txt;
//FLAG为LOG_API_TEST或TAG_TEST2的日志不进行打印;
// mslog_api_init((mslog_level_warn|mslog_enable_stdprint|mslog_enable_linefunc|mslog_enable_filelog),
// "/tmp/mslog","mslog_fsu.txt","LOG_API_TEST|TAG_TEST2");
mslog_api_init(C_MSLOG_FLAG_MC,S_MSLOGDIR_PATH,S_MSLOGFILE_NAME_MC,"LOG_API_TEST|TAG_TEST2");
printf("Mqtt App\n"); //开机 程序打印信息 做区分使用
ms_info1("Mqtt SystemIpc_Init\n"); //数据库初始化 kooloo add 202309
SystemIpc_Init(); //申请共享内存和队列 kooloo add 202311
ms_info1("Mqtt sqTable_init\n"); //数据库初始化 kooloo add 202309
sqTable_init(C_SQTABLE_INIT_getDeviceCfg); //数据库初始化
/* 打印进程信息及编译时间 */
ms_info1("MqttClientApp process!\n");
ms_info1("Make time is: %s %s\n", __DATE__, __TIME__);
/* 创建mqtt客户端对象 */
if (MQTTCLIENT_SUCCESS !=
(rc = MQTTClient_create(&client, g_DeviceCfg.MqttConf.BrokerAddress_Port, g_DeviceCfg.MqttConf.MqttClientID,MQTTCLIENT_PERSISTENCE_NONE, NULL)))
{
ms_fatal1("Failed to create client, return code %d\n", rc);
exit(-1);
}
/* 设置回调 */
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_setCallbacks(client, NULL, connlost,msgarrvd, NULL)))
{
ms_fatal1("Failed to set callbacks, return code %d\n", rc);
MQTTClient_destroy(&client); //回调设置失败,将空间释放
exit(-1);
}
/* 连接MQTT服务器 */
will_opts.topicName = g_DeviceCfg.MqttConf.MqttWillToplic_Server; //遗嘱主题
will_opts.message = "Unexpected disconnection";//遗嘱消息
will_opts.retained = 1; //保留消息
will_opts.qos = 0; //QoS0
conn_opts.will = &will_opts;
conn_opts.keepAliveInterval = 30; //心跳包间隔时间
conn_opts.cleansession = 0; //cleanSession标志
conn_opts.username = g_DeviceCfg.MqttConf.MqttUserName; //用户名
conn_opts.password = g_DeviceCfg.MqttConf.MqttPassWord; //密码
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_connect(client, &conn_opts)))
{
ms_fatal1("Failed to connect, return code %d\n", rc);
MQTTClient_destroy(&client); //回调设置失败,将空间释放
exit(-1);
}
ms_info1("MQTT服务器连接成功!\n");
/* 发布上线消息 */
pubmsg.payload = "Online"; //消息的内容
pubmsg.payloadlen = 6; //内容的长度
pubmsg.qos = 0; //QoS等级
pubmsg.retained = 1; //保留消息
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.MqttWillToplic_Server, &pubmsg, NULL)))
{
ms_info1("重新发一次!\n");
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.MqttWillToplic_Server, &pubmsg, NULL)))
{
ms_fatal1("Failed to publish message, return code %d\n", rc);
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_disconnect(client, 10000)))
{
ms_fatal1("Failed to disconnect, return code %d\n", rc);
}
MQTTClient_destroy(&client); //回调设置失败,将空间释放
exit(-1);
}
}
/* 订阅主题 ServerPublicTopic_GateWay */
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_subscribe(client, g_DeviceCfg.MqttConf.ServerPublicTopic_GateWay, 0)))
{
ms_fatal1("Failed to subscribe, return code %d\n", rc);
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_disconnect(client, 10000)))
{
ms_fatal1("Failed to disconnect, return code %d\n", rc);
}
MQTTClient_destroy(&client); //回调设置失败,将空间释放
exit(-1);
}
else
{
ms_info1("subscribe ServerPublicTopic_GateWay ok > \n");
}
//uint8_t Mqmsgbuf[C_SRCToDST_MQ_MSGSIZE];
char* Mqmsgbuf = (char*)malloc(C_SRCToDST_MQ_MSGSIZE);
memset(Mqmsgbuf, 0, C_SRCToDST_MQ_MSGSIZE);
int Mqmsglen;
MQTTClient_message Mqmsg = MQTTClient_message_initializer;
while(1)
{
Mqmsglen = mq_receive(MqdSrcToDst, Mqmsgbuf, C_SRCToDST_MQ_MSGSIZE, NULL);
if(Mqmsglen>0)
{
ms_info1("Mqtt %.*s %d\n",Mqmsglen, Mqmsgbuf,Mqmsglen);
/* 发布时间信息 */
Mqmsg.payload = Mqmsgbuf; //消息的内容
Mqmsg.payloadlen = Mqmsglen; //内容的长度
Mqmsg.qos = 0; //QoS等级
Mqmsg.retained = 1; //保留消息
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.GateWayPublicTopic_Server, &Mqmsg, NULL)))
{
ms_info1("重新发一次!\n");
sleep(1); //延时1s重新发送
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.GateWayPublicTopic_Server, &Mqmsg, NULL)))
{
ms_fatal1("Failed to publish message, return code %d payloadlen %d\n", rc,Mqmsg.payloadlen);
if(rc = MQTTClient_unsubscribe(client, g_DeviceCfg.MqttConf.ServerPublicTopic_GateWay))
{
ms_fatal1("Failed to unsubscribe, return code %d\n", rc);
}
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_disconnect(client, 10000)))
{
ms_fatal1("Failed to disconnect, return code %d\n", rc);
}
MQTTClient_destroy(&client); //回调设置失败,将空间释放
exit(-1);
}
}
}
sleep(1); //判断轮询 速度高 kooloo add 202312
}
MQTTClient_destroy(&client); //释放资源 kooloo add 202311
System_Exit(); //系统退出 将公共的部分释放
exit(0);
}