#!/usr/bin/python # -*- coding: UTF-8 -*- # __author__ = 'HKC' import io import logging from logging.handlers import RotatingFileHandler import json import datetime import time import sys import os from paho.mqtt import client as mqtt_client import random import getopt # 日志文件路径 LOG_FILE_PATH = "./gwsimulator.log" CFG_FILE_PATH = "./config.json" # create logger logger = logging.getLogger(__name__) DAEMON_MODE = 0 def log_setup(): if len(logger.handlers) == 0: level = logging.DEBUG if os.path.exists(LOG_FILE_PATH): file_size = os.path.getsize(LOG_FILE_PATH) if file_size > 15000000: timestamp = time.mktime(datetime.datetime.now().timetuple()) time_fmt = datetime.datetime.fromtimestamp(timestamp).strftime("%Y%m%d_%H%M") new_name = LOG_FILE_PATH + "." + time_fmt os.rename(LOG_FILE_PATH, new_name) time.sleep(1) handler = RotatingFileHandler(LOG_FILE_PATH, mode='a', maxBytes=30 * 1024 * 1024, backupCount=10, encoding=None, delay=False) formatter = logging.Formatter( "[%(asctime)s] [%(module)s.%(funcName)s][%(lineno)s] [%(levelname)s] : %(message)s") handler.setFormatter(formatter) if DAEMON_MODE == 0: console_handler = logging.StreamHandler() formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] : %(message)s", "%Y-%m-%d %H:%M:%S") console_handler.setFormatter(formatter) logger.addHandler(console_handler) logger.addHandler(handler) logger.setLevel(level) logger.info("===== log start, Welcome to the Earth ====") # 保存异常信息 def dump_exception_msg(e, ip="127.0.0.1"): exc_type, exc_value, exc_traceback = sys.exc_info() msg = "[%s] %s %s %s" % (ip, sys._getframe(1).f_code.co_name, exc_type, exc_value) logger.error( msg ) logger.exception(e) def rand(lower_limit=None, upper_limit=None): return random.randint(lower_limit, upper_limit) class MqttConfigure: def __init__(self): self.broker_ip = '127.0.0.1' self.port = 8764 self.username = 'username' self.passwd = 'passwd' self.publish_topic = 'public' self.publish_interval = 5000 self.publish_qos = 0 self.subscribe_qos = 0 self.subscriber_topic = '' self.client_id = f'python-mqtt-{random.randint(0, 1000)}' self.max_publish_count = 0 self.fsuCodes = [] self.FsuCodePrefix = 1111111000 self.FsuCodeStart = 1 self.FsuCodeEnd = 999 class PublishData: def __init__(self): self.publish_data = {} def read_configure(mqtt_conf=None, publish_data=None): abspath = os.path.abspath(__file__) json_path = os.path.dirname(abspath) + "/" + CFG_FILE_PATH # '/config.json' try: with io.open(json_path, 'r') as f: json_data = json.load(f) mqtt_conf.broker_ip = json_data['broker_ip'] mqtt_conf.port = json_data['port'] mqtt_conf.username = json_data['username'] mqtt_conf.passwd = json_data['passwd'] mqtt_conf.publish_topic = json_data['publish_topic'] mqtt_conf.publish_interval = json_data['publish_interval'] mqtt_conf.publish_qos = json_data['publish_qos'] mqtt_conf.subscribe_qos = json_data['subscribe_qos'] mqtt_conf.subscriber_topic = json_data['subcriber_topic'] mqtt_conf.client_id = json_data['client_id'] # 2025.03.27 增加读取多个FsuCode,用于模拟多个不同的设备 mqtt_conf.FsuCodePrefix = json_data['FsuCodePrefix'] mqtt_conf.FsuCodeStart = json_data['FsuCodeStart'] mqtt_conf.FsuCodeEnd = json_data['FsuCodeEnd'] mqtt_conf.fsuCodes = json_data['FsuCodes'] publish_data.publish_data = json_data['publish_data'] except Exception as e: dump_exception_msg(e) def pub_connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: logger.info("Connected to MQTT Broker!") else: logger.error("Failed to connect, return code %d", rc) def on_disconnect(client, userdata, rc): """断开连接回调""" logger.info(f"[{time.ctime()}] 连接断开") if rc != 0: logger.info("尝试重新连接...") reconnect() def on_publish(client, userdata, mid): """消息发布回调""" logger.info(f"[{time.ctime()}] 消息已确认送达 (MID: {mid})") def reconnect(): """自动重连机制""" reconnect_delay = 5 try: logger.info(f"等待{reconnect_delay}秒后尝试重连...") time.sleep(reconnect_delay) client.reconnect() except Exception as e: logger.info(f"重连失败:{str(e)}") client = mqtt_client.Client(callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1, protocol=4, client_id=mqtt_conf.client_id, reconnect_on_failure=True) client.on_connect = on_connect client.on_disconnect = on_disconnect client.on_publish = on_publish client.username_pw_set(username=mqtt_conf.username, password=mqtt_conf.passwd) client.connect(host=mqtt_conf.broker_ip, port=mqtt_conf.port, keepalive=60) time.sleep(1) return client def sub_connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: logger.info("Connected to MQTT Broker!") else: logger.error("Failed to connect, return code %d", rc) def on_message(client, userdata, msg): logger.info(msg.topic + " " + str(msg.payload)) client = mqtt_client.Client(protocol=3,client_id=mqtt_conf.client_id) client.on_connect = on_connect client.on_message = on_message client.username_pw_set(username=mqtt_conf.username, password=mqtt_conf.passwd) client.connect(host=mqtt_conf.broker_ip, port=mqtt_conf.port, keepalive=60) time.sleep(1) return client # 根据配置信息构建发布的报文消息 # 2025.03.11 增加函数参数,处理制定的fsucode def build_publish_msg(fsu_code=None): pd = publish_data.publish_data msg = {"FsuCode": fsu_code, "type": pd["type"], "IdCodeContent": [], "TimeStamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} for one in pd["IdCodeContent"]: idcode = one["OID"] factor = one["Factor"] t = one["type"] value = one["Value"] if t == "random_number": lo = one["RND"][0] up = one["RND"][1] value = rand(lo, up) msg['IdCodeContent'].append({ "OID": idcode, "Factor": factor, "Value": value }) json_data = json.dumps(msg, ensure_ascii=False) return json_data def publish(client): msg_count = 0 # 2025.03.11 增加循环处理多个FsuCode,用于模拟多个不同的设备 fsu_codes_result = [] for num in range(mqtt_conf.FsuCodeStart, mqtt_conf.FsuCodeEnd+1): suffix = f"{num:04d}" fsu_codes_result.append(mqtt_conf.FsuCodePrefix + suffix) while True: if mqtt_conf.max_publish_count != 0 and msg_count > mqtt_conf.max_publish_count: break # for fsu_code in mqtt_conf.fsuCodes: for fsu_code in fsu_codes_result: logger.info("Processing fsuCode: " + fsu_code) msg = build_publish_msg(fsu_code) msg_info = client.publish(topic=mqtt_conf.publish_topic, payload=msg, qos=mqtt_conf.publish_qos) # msg_info : [0, 1] status = msg_info[0] mid = msg_info[1] msg_info.wait_for_publish() try: b = msg_info.is_published() if b: logger.info(f">> `#{msg_count}` Message #{mid} is published!") else: logger.error(f">> `#{msg_count}` Message failed to be published!") except Exception as e: dump_exception_msg(e) msg_count += 1 if status == mqtt_client.MQTT_ERR_SUCCESS: # logger.info(f"Send `#{msg_count}` message to topic `{mqtt_conf.publish_topic}`: `{msg}` ") logger.info(f"Send `#{msg_count}` message to topic `{mqtt_conf.publish_topic}` successfully") else: logger.error(f"Failed to send `#{msg_count}` message to topic {mqtt_conf.publish_topic}") time.sleep(mqtt_conf.publish_interval/1000) if __name__ == '__main__': log_setup() try: running_mode = "pub" argv = sys.argv if len(argv) == 1: print('Usage: gwsimulator.py -m -c ') print(' or: gwsimulator.py --mode=pub|sub -c ') sys.exit(2) opts, args = getopt.getopt(argv[1:], "m:c:", ["mode=", "config="]) for opt, arg in opts: if opt in ("-m", "--mode"): running_mode = arg elif opt in("-c", "--config"): CFG_FILE_PATH = arg else: print('Usage: gwsimulator.py -m -c ') print(' or: gwsimulator.py --mode=pub|sub -c ') sys.exit(2) logger.info(f"Running in {running_mode} mode") mqtt_conf = MqttConfigure() publish_data = PublishData() if sys.version_info >= (3, 0): read_configure(mqtt_conf, publish_data) else: logger.error("I need python version > 3.0.0") sys.exit(2) if running_mode.lower() == "pub": client = pub_connect_mqtt() client.loop_start() publish(client) else: client = sub_connect_mqtt() client.subscribe(topic=mqtt_conf.subscriber_topic, qos=mqtt_conf.subscribe_qos) client.loop_forever() except KeyboardInterrupt: logger.error("Disconnected from broker and EXIT") client.disconnect() logger.info("===== log end, I will be back ====")