288 lines
12 KiB
C
288 lines
12 KiB
C
/***************************************************************
|
||
Copyright © huijue Network Co., Ltd. 1998-2129. All rights reserved.
|
||
Copyright © 上海汇珏网络通信设备股份有限公司 1998-2129. All rights reserved.
|
||
文件名 : MqttClient.c
|
||
作者 : kooloo
|
||
版本 : V1.0
|
||
描述 : 动环监控/边缘网关FSU主进程入口 支持设置调试输出口
|
||
硬件平台 : iMX6ULL
|
||
内核版本 : linux-imx-4.1.15-2.1.0-g3dc0a4b-v2.7
|
||
编译器版本 :gcc-linaro-4.9.4-2017.01-x86_64_arm-linux-gnueabihf
|
||
日志 : 初版V1.0 2023/7/15 kooloo创建
|
||
***************************************************************/
|
||
#include <stdio.h>
|
||
#include <stdlib.h>
|
||
#include <string.h>
|
||
#include <unistd.h>
|
||
#include <sys/types.h>
|
||
#include <sys/stat.h>
|
||
#include <fcntl.h>
|
||
#include "MQTTClient.h" //包含MQTT客户端库头文件
|
||
#include <errno.h>
|
||
#include "public.h" //公共函数头文件
|
||
#include <sys/mman.h> //posix 内存共享 消息队列
|
||
#include <sys/stat.h>
|
||
#include "mslog.h"
|
||
#include "SystemTimeFunc.h"
|
||
#include "ProcessisRunning.h"
|
||
#include <time.h>
|
||
#include "cJSON.h" //
|
||
#include "MqttClient.h"
|
||
#include <FuncConfigTypeID_Public.h>
|
||
|
||
static int msgarrvd(void *context, char *topicName, int topicLen,
|
||
MQTTClient_message *message)
|
||
{
|
||
if (!strcmp(topicName, g_DeviceCfg.MqttConf.ServerPublicTopic_GateWay)) //校验消息的主题 kooloo add 202312
|
||
{
|
||
if(message->payloadlen<C_SRCToDST_MQ_MSGSIZE)
|
||
{
|
||
cJSON *json_rt = cJSON_Parse(message->payload);
|
||
if (NULL == json_rt) // 接收到其它数据 不做处理
|
||
{
|
||
ms_info1("非json:%s\n", message->payload);
|
||
}
|
||
else //if(0) //json 解析正常 判断
|
||
{
|
||
|
||
cJSON *json_FsuID = cJSON_GetObjectItem(json_rt, C_FSUCODENAME); //获取表中ID 并做判断
|
||
if (!strcmp(g_DeviceCfg.devConf.DeviceNumber, json_FsuID->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略
|
||
{
|
||
cJSON *json_Type = cJSON_GetObjectItem(json_rt, C_GATEWAY_DATATYPE); //获取表中ID 并做判断
|
||
if (!strcmp(C_SERVER_READDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥测
|
||
{
|
||
int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0);
|
||
ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_READDATA,message->payloadlen, message->payload,len);
|
||
}
|
||
else if (!strcmp(C_SERVER_WRITEDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥调
|
||
{
|
||
int tyid;
|
||
tyid=Mq_cJson_ReturnTyid(message->payload, message->payloadlen); //获取ID kooloo add 20240312
|
||
|
||
//cJSON_ReplaceItemInObject(json_rt, C_GATEWAY_DATATYPE, cJSON_CreateString(C_GATEWAY_WRITEDATA));
|
||
//mq_sendJson(json_rt); //将遥*数据发送 kooloo add 202312
|
||
char* tmsgbuf_spvp = (char*)malloc(C_SERIALPORT_MQ_MSGSIZE);
|
||
int len;
|
||
memset(tmsgbuf_spvp, 0, C_SERIALPORT_MQ_MSGSIZE);
|
||
strncpy(tmsgbuf_spvp, message->payload, C_SERIALPORT_MQ_MSGSIZE); //将数据拷贝 kooloo add 202401
|
||
if(tFunTyID_DevCod_buf[tyid].PortID<eSDPort_Number_Max) //如果为普通串口 则将消息传递到 普通串口队列中,否则传到虚拟串口中 kooloo add 20240312
|
||
{
|
||
len = mq_send(MqdSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0);
|
||
printf("Mqtt MqdSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len);
|
||
}
|
||
else if(tFunTyID_DevCod_buf[tyid].PortID<(eSDPort_Number_Max+eVirtuallyPort_Number_Max)) //虚拟串口 kooloo add 20240312
|
||
{
|
||
len = mq_send(MqdVtSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0);
|
||
printf("Mqtt MqdVtSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len);
|
||
}
|
||
free(tmsgbuf_spvp); //用完释放 kooloo add 202401
|
||
//mq_sendRerJson_subdevfromID(tyid, oid.SimilarDevSN - 1);
|
||
//ms_info1("type :%d cjson:%s,Rxlen:%d\n",oid.SignalType,tmsgbuf, strlen(tmsgbuf));
|
||
|
||
//int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0);
|
||
//ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_WRITEDATA,message->payloadlen, message->payload,len);
|
||
}
|
||
else if (!strcmp(C_SERVER_ALARMDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥信
|
||
{
|
||
int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0);
|
||
ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_ALARMDATA,message->payloadlen, message->payload,len);
|
||
}
|
||
else if (!strcmp(C_SERVER_CRTLDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥控
|
||
{
|
||
int tyid;
|
||
tyid=Mq_cJson_ReturnTyid(message->payload, message->payloadlen); //获取ID kooloo add 20240312
|
||
|
||
//cJSON_ReplaceItemInObject(json_rt, C_GATEWAY_DATATYPE, cJSON_CreateString(C_GATEWAY_WRITEDATA));
|
||
//mq_sendJson(json_rt); //将遥*数据发送 kooloo add 202312
|
||
char* tmsgbuf_spvp = (char*)malloc(C_SERIALPORT_MQ_MSGSIZE);
|
||
int len;
|
||
memset(tmsgbuf_spvp, 0, C_SERIALPORT_MQ_MSGSIZE);
|
||
strncpy(tmsgbuf_spvp, message->payload, C_SERIALPORT_MQ_MSGSIZE); //将数据拷贝 kooloo add 202401
|
||
if(tFunTyID_DevCod_buf[tyid].PortID<eSDPort_Number_Max) //如果为普通串口 则将消息传递到 普通串口队列中,否则传到虚拟串口中 kooloo add 20240312
|
||
{
|
||
len = mq_send(MqdSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0);
|
||
printf("Mqtt MqdSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len);
|
||
}
|
||
else if(tFunTyID_DevCod_buf[tyid].PortID<(eSDPort_Number_Max+eVirtuallyPort_Number_Max)) //虚拟串口 kooloo add 20240312
|
||
{
|
||
len = mq_send(MqdVtSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0);
|
||
printf("Mqtt MqdVtSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len);
|
||
}
|
||
free(tmsgbuf_spvp); //用完释放 kooloo add 202401
|
||
//mq_sendRerJson_subdevfromID(tyid, oid.SimilarDevSN - 1);
|
||
//ms_info1("type :%d cjson:%s,Rxlen:%d\n",oid.SignalType,tmsgbuf, strlen(tmsgbuf));
|
||
|
||
//int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0);
|
||
//ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_CRTLDATA,message->payloadlen, message->payload,len);
|
||
}
|
||
else
|
||
{
|
||
ms_info1("type is wrong! jsonlen %d json:%s \n",message->payloadlen, message->payload);
|
||
}
|
||
}
|
||
cJSON_Delete(json_rt); //删除
|
||
}
|
||
}
|
||
}
|
||
|
||
/* 释放占用的内存空间 */
|
||
MQTTClient_freeMessage(&message);
|
||
MQTTClient_free(topicName);
|
||
/* 退出 */
|
||
return 1;
|
||
}
|
||
|
||
static void connlost(void *context, char *cause)
|
||
{
|
||
ms_info1("\nConnection lost\n");
|
||
ms_info1(" cause: %s\n", cause);
|
||
}
|
||
|
||
|
||
int main(int argc, char *argv[])
|
||
{
|
||
MQTTClient client;
|
||
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
|
||
MQTTClient_willOptions will_opts = MQTTClient_willOptions_initializer;
|
||
MQTTClient_message pubmsg = MQTTClient_message_initializer;
|
||
int rc;
|
||
int Temp_Counter;
|
||
int ret,fd; //临时变量
|
||
|
||
//设置日志级别为more;打开标准输出、打印所在行数和函数名、文件日志功能;
|
||
//设置日志目录为:/tmp/mslog;
|
||
//设置日志文件为:mslog_sample.txt;
|
||
//FLAG为LOG_API_TEST或TAG_TEST2的日志,不进行打印;
|
||
// mslog_api_init((mslog_level_warn|mslog_enable_stdprint|mslog_enable_linefunc|mslog_enable_filelog),
|
||
// "/tmp/mslog","mslog_fsu.txt","LOG_API_TEST|TAG_TEST2");
|
||
mslog_api_init(C_MSLOG_FLAG_MC,S_MSLOGDIR_PATH,S_MSLOGFILE_NAME_MC,"LOG_API_TEST|TAG_TEST2");
|
||
|
||
printf("Mqtt App\n"); //开机 程序打印信息 做区分使用
|
||
|
||
ms_info1("Mqtt SystemIpc_Init\n"); //数据库初始化 kooloo add 202309
|
||
SystemIpc_Init(); //申请共享内存和队列 kooloo add 202311
|
||
|
||
ms_info1("Mqtt sqTable_init\n"); //数据库初始化 kooloo add 202309
|
||
sqTable_init(C_SQTABLE_INIT_getDeviceCfg); //数据库初始化
|
||
|
||
/* 打印进程信息及编译时间 */
|
||
ms_info1("MqttClientApp process!\n");
|
||
ms_info1("Make time is: %s %s\n", __DATE__, __TIME__);
|
||
|
||
/* 创建mqtt客户端对象 */
|
||
if (MQTTCLIENT_SUCCESS !=
|
||
(rc = MQTTClient_create(&client, g_DeviceCfg.MqttConf.BrokerAddress_Port, g_DeviceCfg.MqttConf.MqttClientID,MQTTCLIENT_PERSISTENCE_NONE, NULL)))
|
||
{
|
||
ms_fatal1("Failed to create client, return code %d\n", rc);
|
||
exit(-1);
|
||
}
|
||
|
||
/* 设置回调 */
|
||
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_setCallbacks(client, NULL, connlost,msgarrvd, NULL)))
|
||
{
|
||
ms_fatal1("Failed to set callbacks, return code %d\n", rc);
|
||
MQTTClient_destroy(&client); //回调设置失败,将空间释放
|
||
exit(-1);
|
||
}
|
||
|
||
/* 连接MQTT服务器 */
|
||
will_opts.topicName = g_DeviceCfg.MqttConf.MqttWillToplic_Server; //遗嘱主题
|
||
will_opts.message = "Unexpected disconnection";//遗嘱消息
|
||
will_opts.retained = 1; //保留消息
|
||
will_opts.qos = 0; //QoS0
|
||
|
||
conn_opts.will = &will_opts;
|
||
conn_opts.keepAliveInterval = 30; //心跳包间隔时间
|
||
conn_opts.cleansession = 0; //cleanSession标志
|
||
conn_opts.username = g_DeviceCfg.MqttConf.MqttUserName; //用户名
|
||
conn_opts.password = g_DeviceCfg.MqttConf.MqttPassWord; //密码
|
||
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_connect(client, &conn_opts)))
|
||
{
|
||
ms_fatal1("Failed to connect, return code %d\n", rc);
|
||
MQTTClient_destroy(&client); //回调设置失败,将空间释放
|
||
exit(-1);
|
||
}
|
||
|
||
ms_info1("MQTT服务器连接成功!\n");
|
||
|
||
/* 发布上线消息 */
|
||
pubmsg.payload = "Online"; //消息的内容
|
||
pubmsg.payloadlen = 6; //内容的长度
|
||
pubmsg.qos = 0; //QoS等级
|
||
pubmsg.retained = 1; //保留消息
|
||
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.MqttWillToplic_Server, &pubmsg, NULL)))
|
||
{
|
||
ms_info1("重新发一次!\n");
|
||
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.MqttWillToplic_Server, &pubmsg, NULL)))
|
||
{
|
||
ms_fatal1("Failed to publish message, return code %d\n", rc);
|
||
|
||
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_disconnect(client, 10000)))
|
||
{
|
||
ms_fatal1("Failed to disconnect, return code %d\n", rc);
|
||
}
|
||
MQTTClient_destroy(&client); //回调设置失败,将空间释放
|
||
exit(-1);
|
||
}
|
||
}
|
||
|
||
/* 订阅主题 ServerPublicTopic_GateWay */
|
||
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_subscribe(client, g_DeviceCfg.MqttConf.ServerPublicTopic_GateWay, 0)))
|
||
{
|
||
ms_fatal1("Failed to subscribe, return code %d\n", rc);
|
||
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_disconnect(client, 10000)))
|
||
{
|
||
ms_fatal1("Failed to disconnect, return code %d\n", rc);
|
||
}
|
||
MQTTClient_destroy(&client); //回调设置失败,将空间释放
|
||
exit(-1);
|
||
}
|
||
else
|
||
{
|
||
ms_info1("subscribe ServerPublicTopic_GateWay ok > \n");
|
||
}
|
||
|
||
//uint8_t Mqmsgbuf[C_SRCToDST_MQ_MSGSIZE];
|
||
char* Mqmsgbuf = (char*)malloc(C_SRCToDST_MQ_MSGSIZE);
|
||
memset(Mqmsgbuf, 0, C_SRCToDST_MQ_MSGSIZE);
|
||
int Mqmsglen;
|
||
MQTTClient_message Mqmsg = MQTTClient_message_initializer;
|
||
while(1)
|
||
{
|
||
Mqmsglen = mq_receive(MqdSrcToDst, Mqmsgbuf, C_SRCToDST_MQ_MSGSIZE, NULL);
|
||
if(Mqmsglen>0)
|
||
{
|
||
ms_info1("Mqtt %.*s %d\n",Mqmsglen, Mqmsgbuf,Mqmsglen);
|
||
/* 发布时间信息 */
|
||
Mqmsg.payload = Mqmsgbuf; //消息的内容
|
||
Mqmsg.payloadlen = Mqmsglen; //内容的长度
|
||
Mqmsg.qos = 0; //QoS等级
|
||
Mqmsg.retained = 1; //保留消息
|
||
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.GateWayPublicTopic_Server, &Mqmsg, NULL)))
|
||
{
|
||
ms_info1("重新发一次!\n");
|
||
sleep(1); //延时1s重新发送
|
||
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.GateWayPublicTopic_Server, &Mqmsg, NULL)))
|
||
{
|
||
ms_fatal1("Failed to publish message, return code %d payloadlen %d\n", rc,Mqmsg.payloadlen);
|
||
|
||
if(rc = MQTTClient_unsubscribe(client, g_DeviceCfg.MqttConf.ServerPublicTopic_GateWay))
|
||
{
|
||
ms_fatal1("Failed to unsubscribe, return code %d\n", rc);
|
||
}
|
||
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_disconnect(client, 10000)))
|
||
{
|
||
ms_fatal1("Failed to disconnect, return code %d\n", rc);
|
||
}
|
||
MQTTClient_destroy(&client); //回调设置失败,将空间释放
|
||
exit(-1);
|
||
}
|
||
}
|
||
}
|
||
sleep(1); //判断轮询 速度高 kooloo add 202312
|
||
}
|
||
MQTTClient_destroy(&client); //释放资源 kooloo add 202311
|
||
System_Exit(); //系统退出 将公共的部分释放
|
||
exit(0);
|
||
}
|