#include #include #include #include #include #include #include #include #include #include #define LOCKFILE "/var/lock/beehub.lock" using namespace std; int lockfile(int fd) { struct flock fl; fl.l_type = F_WRLCK; fl.l_start = 0; fl.l_whence = SEEK_SET; fl.l_len = 0; return fcntl(fd, F_SETLK, &fl); } int already_running(void) { int fd; char buf[16]; fd = open(LOCKFILE, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); if(fd < 0) { fprintf(stderr, "FAILED TO OPEN FILE" LOCKFILE); exit(1); } if(lockfile(fd) < 0) { if(EACCES == errno || EAGAIN == errno) { close(fd); return 1; } fprintf(stderr, "FAILED TO LOCK FILE" LOCKFILE); exit(1); } ftruncate(fd, 0); sprintf(buf, "%ld", (long)getpid()); write(fd, buf, strlen(buf) + 1); return 0; } std::string readFirstLine(const std::string &filename) { std::ifstream file(filename); if (!file.is_open()) { printf("Could not open file %s.\n", filename.c_str()); return "0000000000"; } std::string firstLine; std::getline(file, firstLine); file.close(); return firstLine; } int main(int argc, char *argv[]) { if(already_running()) { printf("PROGRAM ALREADY RUNNING...\n"); exit(0); } if (argc < 2) { cout << "Usage: " << argv[0] << " " << endl; cout << "Example: " << argv[0] << " 139.196.5.21:44242 10" << endl; return 1; } #ifdef DEBUG std::ofstream log_file("/var/log/beehub.log", std::ios::app); #endif std::string filename = "/etc/devid"; std::string devid; do { try { devid = readFirstLine(filename); std::cout << "First line of the file: " << devid << std::endl; } catch (const std::exception &e) { std::cerr << "Error: " << e.what() << std::endl; return 1; } #ifdef DEBUG log_file << "#######################" << std::endl; log_file << "Device ID: " << devid << std::endl; #endif std::string ip = argv[1]; int message_count = 3; if (argc == 3) message_count = atoi(argv[2]); const string endpoint = "tcp://" + ip; #ifdef DEBUG log_file << "Endpoint: " << endpoint << std::endl; #endif void *context = zmq_ctx_new(); void *socket = zmq_socket(context, ZMQ_REQ); // Set send timeout int sendTimeout = 5000; // Timeout in milliseconds zmq_setsockopt(socket, ZMQ_SNDTIMEO, &sendTimeout, sizeof(sendTimeout)); // Set receive timeout int receiveTimeout = 5000; // Timeout in milliseconds zmq_setsockopt(socket, ZMQ_RCVTIMEO, &receiveTimeout, sizeof(receiveTimeout)); int rc = zmq_connect(socket, endpoint.c_str()); #ifdef DEBUG log_file << "Connecting to endpoint: " << endpoint << std::endl; #endif if (rc != 0) { printf("Failed to connect to server: %s\n", zmq_strerror(errno)); return 1; } #ifdef DEBUG log_file << "Connected to endpoint: " << endpoint << std::endl; #endif int request_nbr; char buffer[100] = {0}; for (request_nbr = 0; request_nbr != message_count; request_nbr++) { std::ostringstream oss; printf("Sending %s #%d…\n", devid.c_str(), request_nbr); errno = 0; oss << devid << "#" << request_nbr; int sendResult = zmq_send(socket, oss.str().c_str(), oss.str().size(), 0); if (sendResult == -1 && errno == EAGAIN) { #ifdef DEBUG log_file << "Send operation timed out. Exiting." << std::endl; #endif continue; } else if (sendResult == -1) { #ifdef DEBUG log_file << "Error occurred during send: " << zmq_strerror(errno) << std::endl; #endif continue; } #ifdef DEBUG log_file << "Sent message: " << devid << std::endl; #endif int size = zmq_recv(socket, buffer, sizeof(buffer) - 1, 0); if (size > 0) { buffer[size] = 0; // Null-terminate the string printf("Received reply: %s\n", buffer); } else { printf("No response received within timeout\n"); } #ifdef DEBUG log_file << "Received reply: " << buffer << std::endl; #endif } // 将LINGER设置为0 int linger = 0; zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)); printf("closing socket\n"); zmq_close(socket); printf("destroying context\n"); zmq_ctx_destroy(context); printf("waiting for 3 minute before sending again\n"); std::this_thread::sleep_for(std::chrono::seconds(180)); #ifdef DEBUG log_file << "Waiting for 1 minute before sending again" << std::endl; log_file << "************************" << std::endl; #endif } while (true); printf("exiting\n"); return 0; }