emsApplication/applications/ems_datahubs/eventhandler.cpp

249 lines
8.2 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#include "eventhandler.h"
#include <hv/hv.h>
#include <hv/hmain.h>
#include <hv/iniparser.h>
#include <hv/hloop.h>
#include <hv/hsocket.h>
#include <hv/hssl.h>
#include "kdefine.h"
#include "frame_define.h"
#include "openjson.h"
#include "iconv-utils.h"
#include "kutilities.h"
#include "opmysql.h"
#include "mqtt_msg.h"
EventHandler::EventHandler()
{
}
EventHandler::~EventHandler()
{
}
void EventHandler::onRecvHandler(hio_t* io, void* buf, int readbytes)
{
__USING_NAMESPACE_HJ__;
char localaddrstr[SOCKADDR_STRLEN] = { 0 };
char peeraddrstr[SOCKADDR_STRLEN] = { 0 };
hlogi("### 1 ### on_recv fd=%d readbytes=%d [%s] <==== [%s]", hio_fd(io), readbytes,
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
MessageFrame respFrame;
if( readbytes > 0xFFFF - 1 )
{
hloge("too large data buffer to process: %d", readbytes);
respFrame.setErrorFrame(ERR_INVALID_BUF_LEN);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
hlogi("<=== decode OK [\n%s\n]", printHex(buf, readbytes).c_str());
MessageFrame* pReadFrame = (MessageFrame*)buf;
hlogi("on_recv fd=%d frame_len=%d [0x%x] ", hio_fd(io), pReadFrame->frame_len, pReadFrame->frame_len);
if( pReadFrame->frame_len > 0xFFFF - 1 )
{
hloge("too big string buffer to process: %d, it should be less than %d", pReadFrame->frame_len, 0xFFFF);
respFrame.setErrorFrame(ERR_INVALID_LEN);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
if( Frame_DeviceData_Request == pReadFrame->frame_type )
{
handleGatherData(io, buf, readbytes);
}
else
{
assert(false);
}
}
void EventHandler::handleGatherData(hio_t* io, void* buf, int readbytes)
{
assert(buf);
__USING_NAMESPACE_HJ__;
MessageFrame respFrame;
MessageFrame* pReadFrame = (MessageFrame*)buf;
hlogi("<=== reveive device data request");
OpenJson json;
CODING buf_code = GetCoding((unsigned char*)pReadFrame->frame_content, pReadFrame->frame_len); //<2F>ж<EFBFBD><D0B6>Ƿ<EFBFBD><C7B7><EFBFBD>utf-8
hlogi("<=== recieve buffer coding is [%s]", buf_code == GBK ? "GBK" : (buf_code == UTF8 ? "UTF8" : "UNKNOWN CODING"));
MessageData* pData = (MessageData*)pReadFrame->frame_content;
#ifdef _DEBUG
hlogd("<=== MessageData structure [\n%s\n]", printHex(pData, pReadFrame->frame_len).c_str());
#endif
//<2F><><EFBFBD>ォ֡<EFBDAB><D6A1>֡<EFBFBD><D6A1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ת<EFBFBD><D7AA>Ϊ<EFBFBD>ַ<EFBFBD><D6B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>json<6F><6E>ʽ<EFBFBD><CABD><EFBFBD>ַ<EFBFBD><D6B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>mqtt_msg.h<><68>MessageData<74><EFBFBD><E1B9B9><EFBFBD><EFBFBD>
std::string msg((char*)pReadFrame->frame_content + MSG_HEADER_LENGTH, pReadFrame->frame_len - MSG_HEADER_LENGTH);
#ifdef _DEBUG
hlogd("<=== json content [\n%s\n]", msg.c_str());
#endif
if( buf_code == CODING::GBK
|| buf_code == CODING::UNKOWN )
{
std::string str_result;
//ת<><D7AA>ΪUTF8
if( !GBKToUTF8(msg, str_result) )
{
hloge("Failed to transfer code from GBK to UTF-8");
respFrame.setErrorFrame(ERR_INVALID_UTF8);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
hlogi("Successfuly transfer code from GBK to UTF-8!");
msg = str_result;
}
#ifdef _DEBUG
hlogd("<=== recieve !!VALID!! mqtt pack len =[%d] data=[%s]", msg.length(), msg.c_str()); //<2F><><EFBFBD><EFBFBD>Ǻõ<C7BA>
#endif
unsigned int len = msg.length();
char* pTmp = new char[len];
memcpy(pTmp, msg.c_str(), len);
std::shared_ptr<char> ptr; //<2F>Ÿ<EFBFBD><C5B8><EFBFBD><EFBFBD><EFBFBD>ָ<EFBFBD><D6B8>ʡ<EFBFBD><CAA1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ɾ<EFBFBD><C9BE>
ptr.reset(pTmp);
//hlogi("<=== decode OK, msg=[%s]", msg.c_str());
#if 0
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 17).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 18).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 19).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 20).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 21).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 22).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 23).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 24).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 25).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 26).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 27).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 28).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 29).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 30).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 31).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 32).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 33).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 34).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 61).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 62).c_str());
hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 63).c_str());
#endif
//hlogd("<=== decode OK [\n%s\n]", printHex(pTmp, len).c_str());
try
{
if( !json.decode(msg) )
{
hloge("Failed to decode json string pack , length=%d", readbytes);
respFrame.setErrorFrame(ERR_INVALID_JSON_FMT);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
std::string fsucode = json["FsuCode"].s();
std::string msg_type = json["type"].s();
std::string timestamp = get_current_timestamp(); // json["TimeStamp"].s();
if( fsucode.length() == 0 )
{
//delete[] pTmp;
hlogw("!!empty fsucode recieved!");
respFrame.setErrorFrame(ERR_INVALID_FSUCODE);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
hlogi("<=== decode OK, recieve fsucode=[%s] type=[%s] ts=[%s]", fsucode.c_str(), msg_type.c_str(), timestamp.c_str());
hio_write(io, (void*)&respFrame, sizeof respFrame);
#ifdef _DEBUG
hlogd("<<<<Check Mem after decode here>>>> \n[\n%s\n]\n", printHex(pTmp, len).c_str());
#endif
std::string out_compress;
int zip_ret = 0;
if( (zip_ret = CompressString(pTmp, len, out_compress, Z_DEFAULT_COMPRESSION)) != Z_OK )
{
hloge("Failed to compress source data, zip return value %d", zip_ret);
return;
}
hlogd("<<<<Compress result: string size from original [%d] to [%d]", len, out_compress.size());
#ifdef _DEBUG
hlogd("<<<<Compressed string here>>>> \n[\n%s\n]\n", printHex(out_compress.c_str(), out_compress.size()).c_str());
#endif
//std::string msg2(pTmp, len);
if( msg_type == "gateway-data"
|| msg_type == "gateway-alarmdata"
|| msg_type == "gateway-writedata"
|| msg_type == "gateway-readdata"
|| msg_type == "web-write"
|| msg_type == "web-alarm" )
{
auto& IdCodeContent = json["IdCodeContent"];
if( IdCodeContent.size() <= 0 )
{
//delete[] pTmp;
hloge("invalid IdCodeContent's size: %d", IdCodeContent.size());
return;
}
auto& pNode = IdCodeContent[0]; //<2F><><EFBFBD><EFBFBD>ֻ<EFBFBD><D6BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>һ<EFBFBD><D2BB><EFBFBD>ڵ<EFBFBD>
std::string oid = pNode["OID"].s();
OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id);
}
if( msg_type == "web-read" )
{
auto& IdCodeContent = json["IdCodes"];
if( IdCodeContent.size() <= 0 )
{
hloge("invalid IdCodes's size: %d", IdCodeContent.size());
//delete[] pTmp;
return;
}
auto& pNode = IdCodeContent[0]; //<2F><><EFBFBD><EFBFBD>ֻ<EFBFBD><D6BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>һ<EFBFBD><D2BB><EFBFBD>ڵ<EFBFBD>
std::string oid = pNode.s();
OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id);
}
//delete[] pTmp;
}
catch( const char* errMsg )
{
hloge("Failed to decode json string pack , catch error:%s", errMsg);
respFrame.setErrorFrame(ERR_INVALID_JSON_FMT);
hio_write(io, (void*)&respFrame, sizeof respFrame);
return;
}
}