#include "eventhandler.h" #include #include #include #include #include #include #include #include "kdefine.h" #include "frame_define.h" #include "openjson.h" #include "iconv-utils.h" #include "kutilities.h" #include "opmysql.h" #include "mqtt_msg.h" EventHandler::EventHandler() { } EventHandler::~EventHandler() { } void EventHandler::onRecvHandler(hio_t* io, void* buf, int readbytes) { __USING_NAMESPACE_HJ__; char localaddrstr[SOCKADDR_STRLEN] = { 0 }; char peeraddrstr[SOCKADDR_STRLEN] = { 0 }; hlogi("### 1 ### on_recv fd=%d readbytes=%d [%s] <==== [%s]", hio_fd(io), readbytes, SOCKADDR_STR(hio_localaddr(io), localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); MessageFrame respFrame; if( readbytes > FRAME_MAX_LENGTH - 1 ) //限制1Mb 字节 { hloge("too large data buffer to process: %d", readbytes); respFrame.setErrorFrame(ERR_INVALID_BUF_LEN); hio_write(io, (void*)&respFrame, sizeof respFrame); return; } hlogi("<=== decode OK [\n%s\n]", printHex(buf, MIN(readbytes,512)).c_str()); MessageFrame* pReadFrame = (MessageFrame*)buf; hlogi("on_recv fd=%d frame_len=%d [0x%x] ", hio_fd(io), pReadFrame->frame_len, pReadFrame->frame_len); if( pReadFrame->frame_len > FRAME_MAX_LENGTH - 1 ) { hloge("too big string buffer to process: %d, it should be less than %d", pReadFrame->frame_len, 0xFFFF); respFrame.setErrorFrame(ERR_INVALID_LEN); hio_write(io, (void*)&respFrame, sizeof respFrame); return; } switch (pReadFrame->frame_type) { case Frame_DeviceData_Request: { handleGatherData(io, buf, readbytes); break; } case Frame_Echo_Request: { hlogd("<== recieve test echo request and response signal back!"); hio_write(io, (void*)&respFrame, sizeof respFrame); break; } case Frame_Configure_DB_Request: { hlogd("<== recieve configuration file upload request!"); int ret = handleConfigureData(io, buf, readbytes); if(ret!=0) respFrame.setErrorFrame((ErrorCode)ret); hio_write(io, (void*)&respFrame, sizeof respFrame); break; } default: { assert(false); break; } } } void EventHandler::handleGatherData(hio_t* io, void* buf, int readbytes) { assert(buf); __USING_NAMESPACE_HJ__; MessageFrame respFrame; MessageFrame* pReadFrame = (MessageFrame*)buf; hlogi("<=== reveive device data request"); OpenJson json; CODING buf_code = GetCoding((unsigned char*)pReadFrame->frame_content, pReadFrame->frame_len); //判断是否是utf-8 hlogi("<=== recieve buffer coding is [%s]", buf_code == GBK ? "GBK" : (buf_code == UTF8 ? "UTF8" : "UNKNOWN CODING")); MessageData* pData = (MessageData*)pReadFrame->frame_content; #ifdef _DEBUG hlogd("<=== MessageData structure [\n%s\n]", printHex(pData, pReadFrame->frame_len).c_str()); #endif //这里将帧内帧的内容转换为字符串,符合json格式的字符串,详见mqtt_msg.h的MessageData结构定义 std::string msg((char*)pReadFrame->frame_content + MSG_HEADER_LENGTH, pReadFrame->frame_len - MSG_HEADER_LENGTH); #ifdef _DEBUG hlogd("<=== json content [\n%s\n]", msg.c_str()); #endif if( buf_code == CODING::GBK || buf_code == CODING::UNKOWN ) { std::string str_result; //转换为UTF8 if( !GBKToUTF8(msg, str_result) ) { hloge("Failed to transfer code from GBK to UTF-8"); respFrame.setErrorFrame(ERR_INVALID_UTF8); hio_write(io, (void*)&respFrame, sizeof respFrame); return; } hlogi("Successfuly transfer code from GBK to UTF-8!"); msg = str_result; } #ifdef _DEBUG hlogd("<=== recieve !!VALID!! mqtt pack len =[%d] data=[%s]", msg.length(), msg.c_str()); //这里还是好的 #endif unsigned int len = msg.length(); char* pTmp = new char[len]; memcpy(pTmp, msg.c_str(), len); std::shared_ptr ptr; //放个智能指针省得忘记删除 ptr.reset(pTmp); //hlogi("<=== decode OK, msg=[%s]", msg.c_str()); #if 0 hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 17).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 18).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 19).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 20).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 21).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 22).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 23).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 24).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 25).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 26).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 27).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 28).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 29).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 30).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 31).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 32).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 33).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 34).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 61).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 62).c_str()); hlogi("<=== decode OK [\n%s\n]", printHex(pTmp, 63).c_str()); #endif //hlogd("<=== decode OK [\n%s\n]", printHex(pTmp, len).c_str()); try { if( !json.decode(msg) ) { hloge("Failed to decode json string pack , length=%d", readbytes); respFrame.setErrorFrame(ERR_INVALID_JSON_FMT); hio_write(io, (void*)&respFrame, sizeof respFrame); return; } std::string fsucode = json["FsuCode"].s(); std::string msg_type = json["type"].s(); std::string timestamp = get_current_timestamp(); // json["TimeStamp"].s(); if( fsucode.length() == 0 ) { //delete[] pTmp; hlogw("!!empty fsucode recieved!"); respFrame.setErrorFrame(ERR_INVALID_FSUCODE); hio_write(io, (void*)&respFrame, sizeof respFrame); return; } hlogi("<=== decode OK, recieve fsucode=[%s] type=[%s] ts=[%s]", fsucode.c_str(), msg_type.c_str(), timestamp.c_str()); hio_write(io, (void*)&respFrame, sizeof respFrame); #ifdef _DEBUG hlogd("<<<>>> \n[\n%s\n]\n", printHex(pTmp, len).c_str()); #endif std::string out_compress; int zip_ret = 0; if( (zip_ret = CompressString(pTmp, len, out_compress, Z_DEFAULT_COMPRESSION)) != Z_OK ) { hloge("Failed to compress source data, zip return value %d", zip_ret); return; } hlogd("<<<>>> \n[\n%s\n]\n", printHex(out_compress.c_str(), out_compress.size()).c_str()); #endif //std::string msg2(pTmp, len); if( msg_type == "gateway-data" || msg_type == "gateway-alarmdata" || msg_type == "gateway-writedata" || msg_type == "gateway-readdata" || msg_type == "web-write" || msg_type == "web-alarm" ) { auto& IdCodeContent = json["IdCodeContent"]; if( IdCodeContent.size() <= 0 ) { //delete[] pTmp; hloge("invalid IdCodeContent's size: %d", IdCodeContent.size()); return; } auto& pNode = IdCodeContent[0]; //这是只解析第一个节点 std::string oid = pNode["OID"].s(); OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); } if( msg_type == "web-read" ) { auto& IdCodeContent = json["IdCodes"]; if( IdCodeContent.size() <= 0 ) { hloge("invalid IdCodes's size: %d", IdCodeContent.size()); //delete[] pTmp; return; } auto& pNode = IdCodeContent[0]; //这是只解析第一个节点 std::string oid = pNode.s(); OpDatabase::getInstance()->InsertMessage(timestamp, msg_type, fsucode, out_compress, (int)pData->mqtt_topic, (int)pData->device_id); } //delete[] pTmp; } catch( const char* errMsg ) { hloge("Failed to decode json string pack , catch error:%s", errMsg); respFrame.setErrorFrame(ERR_INVALID_JSON_FMT); hio_write(io, (void*)&respFrame, sizeof respFrame); return; } } int EventHandler::handleConfigureData(hio_t* io, void* buf, int readbytes) { assert(buf); __USING_NAMESPACE_HJ__; MessageFrame respFrame; MessageFrame* pReadFrame = (MessageFrame*)buf; //保存文件 std::string filename("d://test_1.zip"); std::ofstream file(filename, std::ios::binary); if (!file) { hloge("failed to create file: %s",filename.c_str()); return ERR_PERMISSION_DENIED; } // 将 pdata 中的内容写入文件 file.write((const char*)(pReadFrame->frame_content), pReadFrame->frame_len); if (!file) { hloge("failed to write to file: %s", filename.c_str()); return ERR_RETRANS_CONTENT; } else { hlogd("successfuly to write to file: %s", filename.c_str()); return ErrorCode::ERR_OK; } }