EdgeGateway_FSU/DevicePortGet/Mqtt_src/MqttClient.c

288 lines
12 KiB
C
Raw Permalink Normal View History

2024-03-15 17:25:04 +08:00
/***************************************************************
Copyright © huijue Network Co., Ltd. 1998-2129. All rights reserved.
Copyright © 1998-2129. All rights reserved.
: MqttClient.c
: kooloo
: V1.0
: /FSU
: iMX6ULL
: linux-imx-4.1.15-2.1.0-g3dc0a4b-v2.7
gcc-linaro-4.9.4-2017.01-x86_64_arm-linux-gnueabihf
: V1.0 2023/7/15 kooloo
***************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "MQTTClient.h" //包含MQTT客户端库头文件
#include <errno.h>
#include "public.h" //公共函数头文件
#include <sys/mman.h> //posix 内存共享 消息队列
#include <sys/stat.h>
#include "mslog.h"
#include "SystemTimeFunc.h"
#include "ProcessisRunning.h"
#include <time.h>
#include "cJSON.h" //
#include "MqttClient.h"
#include <FuncConfigTypeID_Public.h>
static int msgarrvd(void *context, char *topicName, int topicLen,
MQTTClient_message *message)
{
if (!strcmp(topicName, g_DeviceCfg.MqttConf.ServerPublicTopic_GateWay)) //校验消息的主题 kooloo add 202312
{
if(message->payloadlen<C_SRCToDST_MQ_MSGSIZE)
{
cJSON *json_rt = cJSON_Parse(message->payload);
if (NULL == json_rt) // 接收到其它数据 不做处理
{
ms_info1("非json:%s\n", message->payload);
}
else //if(0) //json 解析正常 判断
{
cJSON *json_FsuID = cJSON_GetObjectItem(json_rt, C_FSUCODENAME); //获取表中ID 并做判断
if (!strcmp(g_DeviceCfg.devConf.DeviceNumber, json_FsuID->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略
{
cJSON *json_Type = cJSON_GetObjectItem(json_rt, C_GATEWAY_DATATYPE); //获取表中ID 并做判断
if (!strcmp(C_SERVER_READDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥测
{
int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0);
ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_READDATA,message->payloadlen, message->payload,len);
}
else if (!strcmp(C_SERVER_WRITEDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥调
{
int tyid;
tyid=Mq_cJson_ReturnTyid(message->payload, message->payloadlen); //获取ID kooloo add 20240312
//cJSON_ReplaceItemInObject(json_rt, C_GATEWAY_DATATYPE, cJSON_CreateString(C_GATEWAY_WRITEDATA));
//mq_sendJson(json_rt); //将遥*数据发送 kooloo add 202312
char* tmsgbuf_spvp = (char*)malloc(C_SERIALPORT_MQ_MSGSIZE);
int len;
memset(tmsgbuf_spvp, 0, C_SERIALPORT_MQ_MSGSIZE);
strncpy(tmsgbuf_spvp, message->payload, C_SERIALPORT_MQ_MSGSIZE); //将数据拷贝 kooloo add 202401
if(tFunTyID_DevCod_buf[tyid].PortID<eSDPort_Number_Max) //如果为普通串口 则将消息传递到 普通串口队列中,否则传到虚拟串口中 kooloo add 20240312
{
len = mq_send(MqdSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0);
printf("Mqtt MqdSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len);
}
else if(tFunTyID_DevCod_buf[tyid].PortID<(eSDPort_Number_Max+eVirtuallyPort_Number_Max)) //虚拟串口 kooloo add 20240312
{
len = mq_send(MqdVtSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0);
printf("Mqtt MqdVtSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len);
}
free(tmsgbuf_spvp); //用完释放 kooloo add 202401
//mq_sendRerJson_subdevfromID(tyid, oid.SimilarDevSN - 1);
//ms_info1("type :%d cjson:%s,Rxlen:%d\n",oid.SignalType,tmsgbuf, strlen(tmsgbuf));
//int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0);
//ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_WRITEDATA,message->payloadlen, message->payload,len);
}
else if (!strcmp(C_SERVER_ALARMDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥信
{
int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0);
ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_ALARMDATA,message->payloadlen, message->payload,len);
}
else if (!strcmp(C_SERVER_CRTLDATA, json_Type->valuestring)) //如果接收到的消息是FSUCODE 正确 则将消息转发 否则忽略 遥控
{
int tyid;
tyid=Mq_cJson_ReturnTyid(message->payload, message->payloadlen); //获取ID kooloo add 20240312
//cJSON_ReplaceItemInObject(json_rt, C_GATEWAY_DATATYPE, cJSON_CreateString(C_GATEWAY_WRITEDATA));
//mq_sendJson(json_rt); //将遥*数据发送 kooloo add 202312
char* tmsgbuf_spvp = (char*)malloc(C_SERIALPORT_MQ_MSGSIZE);
int len;
memset(tmsgbuf_spvp, 0, C_SERIALPORT_MQ_MSGSIZE);
strncpy(tmsgbuf_spvp, message->payload, C_SERIALPORT_MQ_MSGSIZE); //将数据拷贝 kooloo add 202401
if(tFunTyID_DevCod_buf[tyid].PortID<eSDPort_Number_Max) //如果为普通串口 则将消息传递到 普通串口队列中,否则传到虚拟串口中 kooloo add 20240312
{
len = mq_send(MqdSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0);
printf("Mqtt MqdSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len);
}
else if(tFunTyID_DevCod_buf[tyid].PortID<(eSDPort_Number_Max+eVirtuallyPort_Number_Max)) //虚拟串口 kooloo add 20240312
{
len = mq_send(MqdVtSerialPort, tmsgbuf_spvp, strlen(tmsgbuf_spvp), 0);
printf("Mqtt MqdVtSerialPort:type :%s jsonlen %d json:%s,sendlen:%d\n",json_Type->valuestring, strlen(tmsgbuf_spvp), tmsgbuf_spvp, len);
}
free(tmsgbuf_spvp); //用完释放 kooloo add 202401
//mq_sendRerJson_subdevfromID(tyid, oid.SimilarDevSN - 1);
//ms_info1("type :%d cjson:%s,Rxlen:%d\n",oid.SignalType,tmsgbuf, strlen(tmsgbuf));
//int len=mq_send(MqdDstToSrc, message->payload, message->payloadlen, 0);
//ms_info1("type :%s jsonlen %d json:%s sendlen:%d\n",C_SERVER_CRTLDATA,message->payloadlen, message->payload,len);
}
else
{
ms_info1("type is wrong! jsonlen %d json:%s \n",message->payloadlen, message->payload);
}
}
cJSON_Delete(json_rt); //删除
}
}
}
/* 释放占用的内存空间 */
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
/* 退出 */
return 1;
}
static void connlost(void *context, char *cause)
{
ms_info1("\nConnection lost\n");
ms_info1(" cause: %s\n", cause);
}
int main(int argc, char *argv[])
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_willOptions will_opts = MQTTClient_willOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
int rc;
int Temp_Counter;
int ret,fd; //临时变量
//设置日志级别为more打开标准输出、打印所在行数和函数名、文件日志功能
//设置日志目录为:/tmp/mslog;
//设置日志文件为mslog_sample.txt;
//FLAG为LOG_API_TEST或TAG_TEST2的日志不进行打印;
// mslog_api_init((mslog_level_warn|mslog_enable_stdprint|mslog_enable_linefunc|mslog_enable_filelog),
// "/tmp/mslog","mslog_fsu.txt","LOG_API_TEST|TAG_TEST2");
mslog_api_init(C_MSLOG_FLAG_MC,S_MSLOGDIR_PATH,S_MSLOGFILE_NAME_MC,"LOG_API_TEST|TAG_TEST2");
printf("Mqtt App\n"); //开机 程序打印信息 做区分使用
ms_info1("Mqtt SystemIpc_Init\n"); //数据库初始化 kooloo add 202309
SystemIpc_Init(); //申请共享内存和队列 kooloo add 202311
ms_info1("Mqtt sqTable_init\n"); //数据库初始化 kooloo add 202309
sqTable_init(C_SQTABLE_INIT_getDeviceCfg); //数据库初始化
/* 打印进程信息及编译时间 */
ms_info1("MqttClientApp process!\n");
ms_info1("Make time is: %s %s\n", __DATE__, __TIME__);
/* 创建mqtt客户端对象 */
if (MQTTCLIENT_SUCCESS !=
(rc = MQTTClient_create(&client, g_DeviceCfg.MqttConf.BrokerAddress_Port, g_DeviceCfg.MqttConf.MqttClientID,MQTTCLIENT_PERSISTENCE_NONE, NULL)))
{
ms_fatal1("Failed to create client, return code %d\n", rc);
exit(-1);
}
/* 设置回调 */
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_setCallbacks(client, NULL, connlost,msgarrvd, NULL)))
{
ms_fatal1("Failed to set callbacks, return code %d\n", rc);
MQTTClient_destroy(&client); //回调设置失败,将空间释放
exit(-1);
}
/* 连接MQTT服务器 */
will_opts.topicName = g_DeviceCfg.MqttConf.MqttWillToplic_Server; //遗嘱主题
will_opts.message = "Unexpected disconnection";//遗嘱消息
will_opts.retained = 1; //保留消息
will_opts.qos = 0; //QoS0
conn_opts.will = &will_opts;
conn_opts.keepAliveInterval = 30; //心跳包间隔时间
conn_opts.cleansession = 0; //cleanSession标志
conn_opts.username = g_DeviceCfg.MqttConf.MqttUserName; //用户名
conn_opts.password = g_DeviceCfg.MqttConf.MqttPassWord; //密码
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_connect(client, &conn_opts)))
{
ms_fatal1("Failed to connect, return code %d\n", rc);
MQTTClient_destroy(&client); //回调设置失败,将空间释放
exit(-1);
}
ms_info1("MQTT服务器连接成功!\n");
/* 发布上线消息 */
pubmsg.payload = "Online"; //消息的内容
pubmsg.payloadlen = 6; //内容的长度
pubmsg.qos = 0; //QoS等级
pubmsg.retained = 1; //保留消息
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.MqttWillToplic_Server, &pubmsg, NULL)))
{
ms_info1("重新发一次!\n");
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.MqttWillToplic_Server, &pubmsg, NULL)))
{
ms_fatal1("Failed to publish message, return code %d\n", rc);
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_disconnect(client, 10000)))
{
ms_fatal1("Failed to disconnect, return code %d\n", rc);
}
MQTTClient_destroy(&client); //回调设置失败,将空间释放
exit(-1);
}
}
/* 订阅主题 ServerPublicTopic_GateWay */
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_subscribe(client, g_DeviceCfg.MqttConf.ServerPublicTopic_GateWay, 0)))
{
ms_fatal1("Failed to subscribe, return code %d\n", rc);
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_disconnect(client, 10000)))
{
ms_fatal1("Failed to disconnect, return code %d\n", rc);
}
MQTTClient_destroy(&client); //回调设置失败,将空间释放
exit(-1);
}
else
{
ms_info1("subscribe ServerPublicTopic_GateWay ok > \n");
}
//uint8_t Mqmsgbuf[C_SRCToDST_MQ_MSGSIZE];
char* Mqmsgbuf = (char*)malloc(C_SRCToDST_MQ_MSGSIZE);
memset(Mqmsgbuf, 0, C_SRCToDST_MQ_MSGSIZE);
int Mqmsglen;
MQTTClient_message Mqmsg = MQTTClient_message_initializer;
while(1)
{
Mqmsglen = mq_receive(MqdSrcToDst, Mqmsgbuf, C_SRCToDST_MQ_MSGSIZE, NULL);
if(Mqmsglen>0)
{
ms_info1("Mqtt %.*s %d\n",Mqmsglen, Mqmsgbuf,Mqmsglen);
/* 发布时间信息 */
Mqmsg.payload = Mqmsgbuf; //消息的内容
Mqmsg.payloadlen = Mqmsglen; //内容的长度
Mqmsg.qos = 0; //QoS等级
Mqmsg.retained = 1; //保留消息
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.GateWayPublicTopic_Server, &Mqmsg, NULL)))
{
ms_info1("重新发一次!\n");
sleep(1); //延时1s重新发送
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_publishMessage(client, g_DeviceCfg.MqttConf.GateWayPublicTopic_Server, &Mqmsg, NULL)))
{
ms_fatal1("Failed to publish message, return code %d payloadlen %d\n", rc,Mqmsg.payloadlen);
if(rc = MQTTClient_unsubscribe(client, g_DeviceCfg.MqttConf.ServerPublicTopic_GateWay))
{
ms_fatal1("Failed to unsubscribe, return code %d\n", rc);
}
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_disconnect(client, 10000)))
{
ms_fatal1("Failed to disconnect, return code %d\n", rc);
}
MQTTClient_destroy(&client); //回调设置失败,将空间释放
exit(-1);
}
}
}
sleep(1); //判断轮询 速度高 kooloo add 202312
}
MQTTClient_destroy(&client); //释放资源 kooloo add 202311
System_Exit(); //系统退出 将公共的部分释放
exit(0);
}