emsApplication/applications/ems_datahubs/eventhandler.cpp

222 lines
8.1 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#include "eventhandler.h"
#include <hv/hv.h>
#include <hv/hmain.h>
#include <hv/iniparser.h>
#include <hv/hloop.h>
#include <hv/hsocket.h>
#include <hv/hssl.h>
#include "kdefine.h"
#include "frame_define.h"
#include "openjson.h"
#include "iconv-utils.h"
#include "kutilities.h"
EventHandler::EventHandler()
{
}
EventHandler::~EventHandler()
{
}
void EventHandler::onRecvHandler(hio_t* io, void* buf, int readbytes)
{
__USING_NAMESPACE_HJ__;
char localaddrstr[SOCKADDR_STRLEN] = { 0 };
char peeraddrstr[SOCKADDR_STRLEN] = { 0 };
hlogi("### 1 ### on_recv fd=%d readbytes=%d [%s] <==== [%s]", hio_fd(io), readbytes,
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
MessageFrame respFrame;
if( readbytes > 0xFFFF - 1 )
{
hloge("too large data buffer to process: %d", readbytes);
respFrame.setErrorFrame(ERR_INVALID_BUF_LEN);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
hlogi("<=== decode OK [\n%s\n]", printHex(buf, readbytes).c_str());
MessageFrame* pReadFrame = (MessageFrame*)buf;
hlogi("on_recv fd=%d frame_len=%d [0x%x] ", hio_fd(io), pReadFrame->frame_len, pReadFrame->frame_len);
if( pReadFrame->frame_len > 0xFFFF - 1 )
{
hloge("too big string buffer to process: %d, it should be less than %d", pReadFrame->frame_len, 0xFFFF);
respFrame.setErrorFrame(ERR_INVALID_LEN);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
if( Frame_DeviceData_Request == pReadFrame->frame_type )
{
hlogi("<=== reveive device data request");
OpenJson json;
CODING buf_code = GetCoding((unsigned char*)pReadFrame->frame_content, pReadFrame->frame_len); //判断是否是utf-8
hlogi("<=== recieve buffer coding is [%s]", buf_code== GBK ? "GBK": (buf_code == UTF8 ? "UTF8" : "UNKNOWN CODING"));
//这里将帧内帧的内容转换为字符串符合json格式的字符串详见mqtt_msg.h的MessageData结构定义
std::string msg((char*)pReadFrame->frame_content + 8, pReadFrame->frame_len - 8);
if( buf_code == CODING::GBK
|| buf_code == CODING::UNKOWN )
{
std::string str_result;
//转换为UTF8
if( !GBKToUTF8(msg, str_result) )
{
hloge("Failed to transfer code from GBK to UTF-8");
respFrame.setErrorFrame(ERR_INVALID_UTF8);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
hlogi("Successfuly transfer code from GBK to UTF-8!");
msg = str_result;
}
#ifdef _DEBUG
hlogd("<=== recieve !!VALID!! mqtt pack len =[%d] data=[%s]", msg.length(), msg.c_str()); //这里还是好的
#endif
unsigned int len = msg.length();
char* pTmp = new char[len];
memcpy(pTmp, msg.c_str(), len);
std::shared_ptr<char> ptr; //放个智能指针省得忘记删除
ptr.reset(pTmp);
//hlogi("<=== decode OK, msg=[%s]", msg.c_str());
#if 0
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 17).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 18).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 19).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 20).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 21).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 22).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 23).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 24).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 25).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 26).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 27).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 28).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 29).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 30).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 31).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 32).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 33).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 34).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 61).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 62).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 63).c_str());
#endif
//hlogd("<=== decode OK [\n%s\n]", printHex(pTmp, len).c_str());
try
{
if( !json.decode(msg) )
{
hloge("Failed to decode json string pack , length=%d", readbytes);
respFrame.setErrorFrame(ERR_INVALID_JSON_FMT);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
std::string fsucode = json["FsuCode"].s();
std::string msg_type = json["type"].s();
std::string timestamp = get_current_timestamp(); // json["TimeStamp"].s();
if( fsucode.length() == 0 )
{
//delete[] pTmp;
hlogw("!!empty fsucode recieved!");
respFrame.setErrorFrame(ERR_INVALID_FSUCODE);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
hlogi("<=== decode OK, recieve fsucode=[%s] type=[%s] ts=[%s]", fsucode.c_str(), msg_type.c_str(), timestamp.c_str());
hio_write(io, (void*)&respFrame, sizeof respFrame);
#ifdef _DEBUG
hlogd("<<<<Check Mem after decode here>>>> \n[\n%s\n]\n", printHex(pTmp, len).c_str());
#endif
std::string out_compress;
int zip_ret = 0;
if( (zip_ret = CompressString(pTmp, len, out_compress, Z_DEFAULT_COMPRESSION)) != Z_OK )
{
hloge("Failed to compress source data, zip return value %d", zip_ret);
return;
}
hlogd("<<<<Compress result: string size from original [%d] to [%d]", len, out_compress.size());
#ifdef _DEBUG
hlogd("<<<<Compressed string here>>>> \n[\n%s\n]\n", printHex(out_compress.c_str(), out_compress.size()).c_str());
#endif
//std::string msg2(pTmp, len);
if( msg_type == "gateway-data"
|| msg_type == "gateway-alarmdata"
|| msg_type == "gateway-writedata"
|| msg_type == "gateway-readdata"
|| msg_type == "web-write"
|| msg_type == "web-alarm" )
{
auto& IdCodeContent = json["IdCodeContent"];
if( IdCodeContent.size() <= 0 )
{
//delete[] pTmp;
hloge("invalid IdCodeContent's size: %d", IdCodeContent.size());
return;
}
auto& pNode = IdCodeContent[0]; //这是只解析第一个节点
std::string oid = pNode["OID"].s();
//OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id);
}
if( msg_type == "web-read" )
{
auto& IdCodeContent = json["IdCodes"];
if( IdCodeContent.size() <= 0 )
{
hloge("invalid IdCodes's size: %d", IdCodeContent.size());
//delete[] pTmp;
return;
}
auto& pNode = IdCodeContent[0]; //这是只解析第一个节点
std::string oid = pNode.s();
//OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id);
}
//delete[] pTmp;
}
catch( const char* errMsg )
{
hloge("Failed to decode json string pack , catch error:%s", errMsg);
respFrame.setErrorFrame(ERR_INVALID_JSON_FMT);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
}
}