292 lines
10 KiB
Python
292 lines
10 KiB
Python
#!/usr/bin/python
|
||
# -*- coding: UTF-8 -*-
|
||
# __author__ = 'HKC'
|
||
import io
|
||
import logging
|
||
from logging.handlers import RotatingFileHandler
|
||
import json
|
||
import datetime
|
||
import time
|
||
import sys
|
||
import os
|
||
from paho.mqtt import client as mqtt_client
|
||
import random
|
||
import getopt
|
||
|
||
|
||
# 日志文件路径
|
||
LOG_FILE_PATH = "./gwsimulator.log"
|
||
CFG_FILE_PATH = "./config.json"
|
||
|
||
# create logger
|
||
logger = logging.getLogger(__name__)
|
||
|
||
DAEMON_MODE = 0
|
||
|
||
|
||
def log_setup():
|
||
if len(logger.handlers) == 0:
|
||
level = logging.DEBUG
|
||
if os.path.exists(LOG_FILE_PATH):
|
||
file_size = os.path.getsize(LOG_FILE_PATH)
|
||
if file_size > 15000000:
|
||
timestamp = time.mktime(datetime.datetime.now().timetuple())
|
||
time_fmt = datetime.datetime.fromtimestamp(timestamp).strftime("%Y%m%d_%H%M")
|
||
new_name = LOG_FILE_PATH + "." + time_fmt
|
||
os.rename(LOG_FILE_PATH, new_name)
|
||
time.sleep(1)
|
||
|
||
handler = RotatingFileHandler(LOG_FILE_PATH, mode='a', maxBytes=30 * 1024 * 1024, backupCount=10, encoding=None,
|
||
delay=False)
|
||
formatter = logging.Formatter(
|
||
"[%(asctime)s] [%(module)s.%(funcName)s][%(lineno)s] [%(levelname)s] : %(message)s")
|
||
handler.setFormatter(formatter)
|
||
|
||
if DAEMON_MODE == 0:
|
||
console_handler = logging.StreamHandler()
|
||
formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] : %(message)s", "%Y-%m-%d %H:%M:%S")
|
||
console_handler.setFormatter(formatter)
|
||
logger.addHandler(console_handler)
|
||
|
||
logger.addHandler(handler)
|
||
logger.setLevel(level)
|
||
|
||
logger.info("===== log start, Welcome to the Earth ====")
|
||
|
||
|
||
# 保存异常信息
|
||
def dump_exception_msg(e, ip="127.0.0.1"):
|
||
exc_type, exc_value, exc_traceback = sys.exc_info()
|
||
msg = "[%s] %s %s %s" % (ip, sys._getframe(1).f_code.co_name, exc_type, exc_value)
|
||
logger.error( msg )
|
||
logger.exception(e)
|
||
|
||
|
||
def rand(lower_limit=None, upper_limit=None):
|
||
return random.randint(lower_limit, upper_limit)
|
||
|
||
|
||
class MqttConfigure:
|
||
def __init__(self):
|
||
self.broker_ip = '127.0.0.1'
|
||
self.port = 8764
|
||
self.username = 'username'
|
||
self.passwd = 'passwd'
|
||
self.publish_topic = 'public'
|
||
self.publish_interval = 5000
|
||
self.publish_qos = 0
|
||
self.subscribe_qos = 0
|
||
self.subscriber_topic = ''
|
||
self.client_id = f'python-mqtt-{random.randint(0, 1000)}'
|
||
self.max_publish_count = 0
|
||
self.fsuCodes = []
|
||
self.FsuCodePrefix = 1111111000
|
||
self.FsuCodeStart = 1
|
||
self.FsuCodeEnd = 999
|
||
|
||
|
||
class PublishData:
|
||
def __init__(self):
|
||
self.publish_data = {}
|
||
|
||
|
||
def read_configure(mqtt_conf=None, publish_data=None):
|
||
abspath = os.path.abspath(__file__)
|
||
json_path = os.path.dirname(abspath) + "/" + CFG_FILE_PATH # '/config.json'
|
||
try:
|
||
with io.open(json_path, 'r') as f:
|
||
json_data = json.load(f)
|
||
mqtt_conf.broker_ip = json_data['broker_ip']
|
||
mqtt_conf.port = json_data['port']
|
||
mqtt_conf.username = json_data['username']
|
||
mqtt_conf.passwd = json_data['passwd']
|
||
mqtt_conf.publish_topic = json_data['publish_topic']
|
||
mqtt_conf.publish_interval = json_data['publish_interval']
|
||
mqtt_conf.publish_qos = json_data['publish_qos']
|
||
mqtt_conf.subscribe_qos = json_data['subscribe_qos']
|
||
mqtt_conf.subscriber_topic = json_data['subcriber_topic']
|
||
mqtt_conf.client_id = json_data['client_id']
|
||
|
||
# 2025.03.27 增加读取多个FsuCode,用于模拟多个不同的设备
|
||
mqtt_conf.FsuCodePrefix = json_data['FsuCodePrefix']
|
||
mqtt_conf.FsuCodeStart = json_data['FsuCodeStart']
|
||
mqtt_conf.FsuCodeEnd = json_data['FsuCodeEnd']
|
||
mqtt_conf.fsuCodes = json_data['FsuCodes']
|
||
publish_data.publish_data = json_data['publish_data']
|
||
except Exception as e:
|
||
dump_exception_msg(e)
|
||
|
||
|
||
def pub_connect_mqtt():
|
||
def on_connect(client, userdata, flags, rc):
|
||
if rc == 0:
|
||
logger.info("Connected to MQTT Broker!")
|
||
else:
|
||
logger.error("Failed to connect, return code %d", rc)
|
||
|
||
def on_disconnect(client, userdata, rc):
|
||
"""断开连接回调"""
|
||
logger.info(f"[{time.ctime()}] 连接断开")
|
||
if rc != 0:
|
||
logger.info("尝试重新连接...")
|
||
reconnect()
|
||
|
||
def on_publish(client, userdata, mid):
|
||
"""消息发布回调"""
|
||
logger.info(f"[{time.ctime()}] 消息已确认送达 (MID: {mid})")
|
||
|
||
def reconnect():
|
||
"""自动重连机制"""
|
||
reconnect_delay = 5
|
||
try:
|
||
logger.info(f"等待{reconnect_delay}秒后尝试重连...")
|
||
time.sleep(reconnect_delay)
|
||
client.reconnect()
|
||
except Exception as e:
|
||
logger.info(f"重连失败:{str(e)}")
|
||
|
||
client = mqtt_client.Client(callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1, protocol=4,
|
||
client_id=mqtt_conf.client_id, reconnect_on_failure=True)
|
||
client.on_connect = on_connect
|
||
|
||
client.on_disconnect = on_disconnect
|
||
client.on_publish = on_publish
|
||
|
||
client.username_pw_set(username=mqtt_conf.username, password=mqtt_conf.passwd)
|
||
client.connect(host=mqtt_conf.broker_ip, port=mqtt_conf.port, keepalive=60)
|
||
time.sleep(1)
|
||
return client
|
||
|
||
|
||
def sub_connect_mqtt():
|
||
def on_connect(client, userdata, flags, rc):
|
||
if rc == 0:
|
||
logger.info("Connected to MQTT Broker!")
|
||
else:
|
||
logger.error("Failed to connect, return code %d", rc)
|
||
|
||
def on_message(client, userdata, msg):
|
||
logger.info(msg.topic + " " + str(msg.payload))
|
||
|
||
client = mqtt_client.Client(protocol=3,client_id=mqtt_conf.client_id)
|
||
client.on_connect = on_connect
|
||
client.on_message = on_message
|
||
client.username_pw_set(username=mqtt_conf.username, password=mqtt_conf.passwd)
|
||
client.connect(host=mqtt_conf.broker_ip, port=mqtt_conf.port, keepalive=60)
|
||
time.sleep(1)
|
||
return client
|
||
|
||
|
||
# 根据配置信息构建发布的报文消息
|
||
# 2025.03.11 增加函数参数,处理制定的fsucode
|
||
def build_publish_msg(fsu_code=None):
|
||
pd = publish_data.publish_data
|
||
msg = {"FsuCode": fsu_code, "type": pd["type"], "IdCodeContent": [],
|
||
"TimeStamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}
|
||
for one in pd["IdCodeContent"]:
|
||
idcode = one["OID"]
|
||
factor = one["Factor"]
|
||
t = one["type"]
|
||
value = one["Value"]
|
||
if t == "random_number":
|
||
lo = one["RND"][0]
|
||
up = one["RND"][1]
|
||
value = rand(lo, up)
|
||
|
||
msg['IdCodeContent'].append({
|
||
"OID": idcode,
|
||
"Factor": factor,
|
||
"Value": value
|
||
})
|
||
json_data = json.dumps(msg, ensure_ascii=False)
|
||
return json_data
|
||
|
||
|
||
def publish(client):
|
||
msg_count = 0
|
||
# 2025.03.11 增加循环处理多个FsuCode,用于模拟多个不同的设备
|
||
fsu_codes_result = []
|
||
for num in range(mqtt_conf.FsuCodeStart, mqtt_conf.FsuCodeEnd+1):
|
||
suffix = f"{num:04d}"
|
||
fsu_codes_result.append(mqtt_conf.FsuCodePrefix + suffix)
|
||
|
||
while True:
|
||
if mqtt_conf.max_publish_count != 0 and msg_count > mqtt_conf.max_publish_count:
|
||
break
|
||
# for fsu_code in mqtt_conf.fsuCodes:
|
||
for fsu_code in fsu_codes_result:
|
||
logger.info("Processing fsuCode: " + fsu_code)
|
||
msg = build_publish_msg(fsu_code)
|
||
msg_info = client.publish(topic=mqtt_conf.publish_topic, payload=msg, qos=mqtt_conf.publish_qos)
|
||
# msg_info : [0, 1]
|
||
status = msg_info[0]
|
||
mid = msg_info[1]
|
||
|
||
msg_info.wait_for_publish()
|
||
|
||
try:
|
||
b = msg_info.is_published()
|
||
if b:
|
||
logger.info(f">> `#{msg_count}` Message #{mid} is published!")
|
||
else:
|
||
logger.error(f">> `#{msg_count}` Message failed to be published!")
|
||
except Exception as e:
|
||
dump_exception_msg(e)
|
||
|
||
msg_count += 1
|
||
if status == mqtt_client.MQTT_ERR_SUCCESS:
|
||
# logger.info(f"Send `#{msg_count}` message to topic `{mqtt_conf.publish_topic}`: `{msg}` ")
|
||
logger.info(f"Send `#{msg_count}` message to topic `{mqtt_conf.publish_topic}` successfully")
|
||
else:
|
||
logger.error(f"Failed to send `#{msg_count}` message to topic {mqtt_conf.publish_topic}")
|
||
time.sleep(mqtt_conf.publish_interval/1000)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
log_setup()
|
||
try:
|
||
running_mode = "pub"
|
||
|
||
argv = sys.argv
|
||
if len(argv) == 1:
|
||
print('Usage: gwsimulator.py -m <pub|sub> -c <config_file.json>')
|
||
print(' or: gwsimulator.py --mode=pub|sub -c <config_file.json>')
|
||
sys.exit(2)
|
||
|
||
opts, args = getopt.getopt(argv[1:], "m:c:", ["mode=", "config="])
|
||
for opt, arg in opts:
|
||
if opt in ("-m", "--mode"):
|
||
running_mode = arg
|
||
elif opt in("-c", "--config"):
|
||
CFG_FILE_PATH = arg
|
||
else:
|
||
print('Usage: gwsimulator.py -m <pub|sub> -c <config_file.json>')
|
||
print(' or: gwsimulator.py --mode=pub|sub -c <config_file.json>')
|
||
sys.exit(2)
|
||
|
||
logger.info(f"Running in {running_mode} mode")
|
||
|
||
mqtt_conf = MqttConfigure()
|
||
publish_data = PublishData()
|
||
|
||
if sys.version_info >= (3, 0):
|
||
read_configure(mqtt_conf, publish_data)
|
||
else:
|
||
logger.error("I need python version > 3.0.0")
|
||
sys.exit(2)
|
||
|
||
if running_mode.lower() == "pub":
|
||
client = pub_connect_mqtt()
|
||
client.loop_start()
|
||
publish(client)
|
||
else:
|
||
client = sub_connect_mqtt()
|
||
client.subscribe(topic=mqtt_conf.subscriber_topic, qos=mqtt_conf.subscribe_qos)
|
||
client.loop_forever()
|
||
|
||
except KeyboardInterrupt:
|
||
logger.error("Disconnected from broker and EXIT")
|
||
|
||
client.disconnect()
|
||
logger.info("===== log end, I will be back ====")
|