232 lines
7.6 KiB
Python
232 lines
7.6 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: UTF-8 -*-
|
|
# __author__ = 'HKC'
|
|
import io
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
import json
|
|
import datetime
|
|
import time
|
|
import sys
|
|
import os
|
|
from paho.mqtt import client as mqtt_client
|
|
import random
|
|
import getopt
|
|
|
|
|
|
# 日志文件路径
|
|
LOG_FILE_PATH = "./gwsimulator.log"
|
|
CFG_FILE_PATH = "./config.json"
|
|
|
|
# create logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DAEMON_MODE = 0
|
|
|
|
|
|
def log_setup():
|
|
if len(logger.handlers) == 0:
|
|
level = logging.DEBUG
|
|
if os.path.exists(LOG_FILE_PATH):
|
|
file_size = os.path.getsize(LOG_FILE_PATH)
|
|
if file_size > 15000000:
|
|
timestamp = time.mktime(datetime.datetime.now().timetuple())
|
|
time_fmt = datetime.datetime.fromtimestamp(timestamp).strftime("%Y%m%d_%H%M")
|
|
new_name = LOG_FILE_PATH + "." + time_fmt
|
|
os.rename(LOG_FILE_PATH, new_name)
|
|
time.sleep(1)
|
|
|
|
handler = RotatingFileHandler(LOG_FILE_PATH, mode='a', maxBytes=30 * 1024 * 1024, backupCount=10, encoding=None,
|
|
delay=False)
|
|
formatter = logging.Formatter(
|
|
"[%(asctime)s] [%(module)s.%(funcName)s][%(lineno)s] [%(levelname)s] : %(message)s")
|
|
handler.setFormatter(formatter)
|
|
|
|
if DAEMON_MODE == 0:
|
|
console_handler = logging.StreamHandler()
|
|
formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] : %(message)s", "%Y-%m-%d %H:%M:%S")
|
|
console_handler.setFormatter(formatter)
|
|
logger.addHandler(console_handler)
|
|
|
|
logger.addHandler(handler)
|
|
logger.setLevel(level)
|
|
|
|
logger.info("===== log start, Welcome to the Earth ====")
|
|
|
|
|
|
# 保存异常信息
|
|
def dump_exception_msg(e, ip="127.0.0.1"):
|
|
exc_type, exc_value, exc_traceback = sys.exc_info()
|
|
msg = "[%s] %s %s %s" % (ip, sys._getframe(1).f_code.co_name, exc_type, exc_value)
|
|
logger.error( msg )
|
|
logger.exception(e)
|
|
|
|
|
|
def rand(lower_limit=None, upper_limit=None):
|
|
return random.randint(lower_limit, upper_limit)
|
|
|
|
|
|
class MqttConfigure:
|
|
def __init__(self):
|
|
self.broker_ip = '127.0.0.1'
|
|
self.port = 8764
|
|
self.username = 'username'
|
|
self.passwd = 'passwd'
|
|
self.publish_topic = 'public'
|
|
self.publish_interval = 5000
|
|
self.publish_qos = 0
|
|
self.subscribe_qos = 0
|
|
self.subscriber_topic = ''
|
|
self.client_id = f'python-mqtt-{random.randint(0, 1000)}'
|
|
self.max_publish_count = 0
|
|
|
|
|
|
class PublishData:
|
|
def __init__(self):
|
|
self.publish_data = {}
|
|
|
|
|
|
def read_configure(mqtt_conf=None, publish_data=None):
|
|
abspath = os.path.abspath(__file__)
|
|
json_path = os.path.dirname(abspath) + "/" + CFG_FILE_PATH # '/config.json'
|
|
try:
|
|
with io.open(json_path, 'r') as f:
|
|
json_data = json.load(f)
|
|
mqtt_conf.broker_ip = json_data['broker_ip']
|
|
mqtt_conf.port = json_data['port']
|
|
mqtt_conf.username = json_data['username']
|
|
mqtt_conf.passwd = json_data['passwd']
|
|
mqtt_conf.publish_topic = json_data['publish_topic']
|
|
mqtt_conf.publish_interval = json_data['publish_interval']
|
|
mqtt_conf.publish_qos = json_data['publish_qos']
|
|
mqtt_conf.subscribe_qos = json_data['subscribe_qos']
|
|
mqtt_conf.subscriber_topic = json_data['subcriber_topic']
|
|
mqtt_conf.client_id = json_data['client_id']
|
|
publish_data.publish_data = json_data['publish_data']
|
|
except Exception as e:
|
|
dump_exception_msg(e)
|
|
|
|
|
|
def pub_connect_mqtt():
|
|
def on_connect(client, userdata, flags, rc):
|
|
if rc == 0:
|
|
logger.info("Connected to MQTT Broker!")
|
|
else:
|
|
logger.error("Failed to connect, return code %d", rc)
|
|
|
|
client = mqtt_client.Client(protocol=3, client_id=mqtt_conf.client_id)
|
|
client.on_connect = on_connect
|
|
client.username_pw_set(username=mqtt_conf.username, password=mqtt_conf.passwd)
|
|
client.connect(host=mqtt_conf.broker_ip, port=mqtt_conf.port, keepalive=60)
|
|
time.sleep(1)
|
|
return client
|
|
|
|
|
|
def sub_connect_mqtt():
|
|
def on_connect(client, userdata, flags, rc):
|
|
if rc == 0:
|
|
logger.info("Connected to MQTT Broker!")
|
|
else:
|
|
logger.error("Failed to connect, return code %d", rc)
|
|
|
|
def on_message(client, userdata, msg):
|
|
logger.info(msg.topic + " " + str(msg.payload))
|
|
|
|
client = mqtt_client.Client(protocol=3,client_id=mqtt_conf.client_id)
|
|
client.on_connect = on_connect
|
|
client.on_message = on_message
|
|
client.username_pw_set(username=mqtt_conf.username, password=mqtt_conf.passwd)
|
|
client.connect(host=mqtt_conf.broker_ip, port=mqtt_conf.port, keepalive=60)
|
|
time.sleep(1)
|
|
return client
|
|
|
|
|
|
# 根据配置信息构建发布的报文消息
|
|
def build_publish_msg():
|
|
pd = publish_data.publish_data
|
|
msg = {"FsuCode": pd["FsuCode"], "type": pd["type"], "IdCodeContent": [],
|
|
"TimeStamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}
|
|
for one in pd["IdCodeContent"]:
|
|
idcode = one["OID"]
|
|
factor = one["Factor"]
|
|
t = one["type"]
|
|
value = one["Value"]
|
|
if t == "random_number":
|
|
lo = one["RND"][0]
|
|
up = one["RND"][1]
|
|
value = rand(lo, up)
|
|
|
|
msg['IdCodeContent'].append({
|
|
"OID": idcode,
|
|
"Factor": factor,
|
|
"Value": value
|
|
})
|
|
json_data = json.dumps(msg, ensure_ascii=False)
|
|
return json_data
|
|
|
|
|
|
def publish(client):
|
|
msg_count = 0
|
|
while True:
|
|
if mqtt_conf.max_publish_count != 0 and msg_count > mqtt_conf.max_publish_count:
|
|
break
|
|
msg = build_publish_msg()
|
|
result = client.publish(topic=mqtt_conf.publish_topic, payload=msg, qos=mqtt_conf.publish_qos)
|
|
# result: [0, 1]
|
|
status = result[0]
|
|
msg_count += 1
|
|
if status == 0:
|
|
logger.info(f"Send `#{msg_count}` message to topic `{mqtt_conf.publish_topic}`: `{msg}` ")
|
|
else:
|
|
logger.error(f"Failed to send `#{msg_count}` message to topic {mqtt_conf.publish_topic}")
|
|
time.sleep(mqtt_conf.publish_interval/1000)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
log_setup()
|
|
try:
|
|
running_mode = "pub"
|
|
|
|
argv = sys.argv
|
|
if len(argv) == 1:
|
|
print('Usage: gwsimulator.py -m <pub|sub> -c <config_file.json>')
|
|
print(' or: gwsimulator.py --mode=pub|sub -c <config_file.json>')
|
|
sys.exit(2)
|
|
|
|
opts, args = getopt.getopt(argv[1:], "m:c:", ["mode=", "config="])
|
|
for opt, arg in opts:
|
|
if opt in ("-m", "--mode"):
|
|
running_mode = arg
|
|
elif opt in("-c", "--config"):
|
|
CFG_FILE_PATH = arg
|
|
else:
|
|
print('Usage: gwsimulator.py -m <pub|sub> -c <config_file.json>')
|
|
print(' or: gwsimulator.py --mode=pub|sub -c <config_file.json>')
|
|
sys.exit(2)
|
|
|
|
logger.info(f"Running in {running_mode} mode")
|
|
|
|
mqtt_conf = MqttConfigure()
|
|
publish_data = PublishData()
|
|
|
|
if sys.version_info >= (3, 0):
|
|
read_configure(mqtt_conf, publish_data)
|
|
else:
|
|
logger.error("I need python version > 3.0.0")
|
|
sys.exit(2)
|
|
|
|
if running_mode.lower() == "pub":
|
|
client = pub_connect_mqtt()
|
|
client.loop_start()
|
|
publish(client)
|
|
else:
|
|
client = sub_connect_mqtt()
|
|
client.subscribe(topic=mqtt_conf.subscriber_topic, qos=mqtt_conf.subscribe_qos)
|
|
client.loop_forever()
|
|
|
|
except KeyboardInterrupt:
|
|
logger.error("Disconnected from broker and EXIT")
|
|
|
|
client.disconnect()
|
|
logger.info("===== log end, I will be back ====")
|