基于阿里云python sdk实现阿里云物联网MQTT的客户端订阅及发布

阅读本文前需已确保了解如下内容:

1、有开通阿里云物联网相关产品,或已有阿里物联网产品下的设备要素信息product_key、device_name、device_secret

2、有使用过MQTT.fx、MQTTX等mqtt客户端软件,了解如何使用这些软件实现客户端连接及订阅发布,这会有助于理解本文sdk的排错

3、实验环境:win10、python3.7

4、阿里云物联网官方文档地址:https://help.aliyun.com/document_detail/98292.html?spm=a2c4g.98293.0.0.348036e8JBlcf4

1、确保已安装python3.7,理论上3.x的应该都行,我没有用虚拟环境
2、pip install paho-mqtt==1.4.0
3、pip install aliyun-iot-linkkit

5、官方文档的python版本demo下载地址:https://linkkit-export.oss-cn-shanghai.aliyuncs.com/python/python-linkkit-examples.zip?spm=a2c4g.98292.0.0.4fa8e356Khyqzh&file=python-linkkit-examples.zip

完整示例源码:

下面直接上根据上面准备内容修改后的python源码(已做了相关脱敏及改动,如遇到程序错误请自行修复,正常只要阿里云公共实例下只要设备要素信息正确,以及将代码里的topic改为你真正的就不会有大问题)

import sys
from linkkit import linkkit
import threading
import traceback
import inspect
import time
import logging
import json
import random
import datetime 

#功能打开设备二维码需要用到python下的窗口界面控件
#import pyqrcode,tkinter
#from tkinter.messagebox import *


# config log
__log_format = '%(asctime)s-%(process)d-%(thread)d - %(name)s:%(module)s:%(funcName)s - %(levelname)s - %(message)s'
logging.basicConfig(format=__log_format)

#这里需要填写你的相关参数
host_name="cn-shanghai"
product_key="hml6xxxx"
device_name="ROhLixxxxxxxxxxxxxxxxxx"
device_secret="98ca62xxxxxxxxxxxxxxxxxxxxxxxxxxxx"

#默认的payload,便于后续处理发布上报payload使用
data_pubCommandResult_ori = {
"pubType":2,
"configInfo":"",
"status":0,
"user":"nSC1XiB",
"sTimes":1,
"serialNumber":round(time.time()*1000)
}
data_start = {
"clickNumbers":2,
"cardNum":"nSC1XiB",
"serialNumber":20230627110463
}

def equ_setting(write=''):#write=空表示读取内容
    if write=='':
        try:
            f=open('ali_mqtt.ini','r+')
            equ_setting=json.loads(f.read())
            f.close
        except Exception :
            print('配置文件错误,重新初始化')  
            f=open('ali_mqtt.ini','w')
            f.write('{"a":1}')
            f.close  
            equ_setting={"a":1}
    else:
        try:
            f=open('ali_mqtt.ini','w')
            f.write(write)
            f.close  
            equ_setting=json.loads(write)
        except Exception :
            print('配置文件错误,更新失败')
            #print(write)
            f=open('ali_mqtt.ini','r+')
            equ_setting=json.loads(f.read())
            f.close
    return equ_setting



lk = linkkit.LinkKit(
    host_name=host_name,
    product_key=product_key,
    device_name=device_name,
    device_secret=device_secret)
#lk.config_mqtt(endpoint=product_key+".iot-as-mqtt.cn-shanghai.aliyuncs.com")#公共阿里云物联网实例一般可以不要这一行
lk.config_mqtt(port=1883, protocol="MQTTv311", transport="TCP",
            secure="TLS", keep_alive=60, clean_session=True,
            max_inflight_message=20, max_queued_message=0,
            auto_reconnect_min_sec=1,
            auto_reconnect_max_sec=60,
            cadata=None)
#开启/关闭日志
lk.enable_logger(logging.DEBUG)


def on_device_dynamic_register(rc, value, userdata):
    if rc == 0:
        print("dynamic register device success, value:" + value)
    else:
        print("dynamic register device fail, message:" + value)


def on_connect(session_flag, rc, userdata):
    print("on_connect:%d,rc:%d" % (session_flag, rc))
    #阿里云官方aliyun-iot-linkkit提供的sdk在lk.to_full_topic()里面不需要填写topic的/productkey/devicename,我这里的topic不是/user/test 是user/pubStart
    #官方解释:lk.to_full_topic是一个Topic自动生成接口,您在前面已经填入了ProductKey、DeviceName等参数,该接口调用后将返回一个字符串,其值等同于"/YourProductKey/YourDeviceName/user/test"
    rc, mid = lk.publish_topic(lk.to_full_topic("user/pubStart"), '这里是需要上报的paylod,一般约定是json格式,也可以是这种字符串')
    if rc == 0:
         print("===========================设备启动,发布获取配置topic成功:%r, mid:%r" % (rc, mid))
    else:
        print("===========================设备启动,发布获取配置topic失败:%d" % rc)
    print("="*45)
    print("提示,请输入以下清单对应序号!")
    tips()
    pass


def on_disconnect(rc, userdata):
    print("on_disconnect:rc:%d,userdata:" % rc)


def on_topic_message(topic, payload, qos, userdata):
    #必须先确保已经订阅了相关topic,否则这里收不到
    print("===================开始接受订阅消息===========================")
    #
    print("on_topic_message:" + topic + " payload:" + str(payload) + " qos:" + str(qos))
    print("=================订阅消息已接收完毕===========================")

    #根据接收到的订阅消息的topic进行判断,并处理相关逻辑;;;;
    payload=json.loads(payload)#payload是消息内容,需要json序列化一下方便使用
    if 'user/subConfig' in topic:#订阅到设备参数的topic,需要把订阅消息的内容存入本地文件(约定的都是json格式的)
        equ_setting(json.dumps(payload))
        #print("已订阅到服务器下发的卡头参数"+equ_setting(''))

    if 'user/subCommand' in topic:#订阅到的topic是 服务器下发的指令需求,根据订阅消息的operateType参数决定客户端进行下一步操作
        if payload['operateType']==3:#3订阅到的消息是要上报设备状态
            data_pubCommandResult=data_pubCommandResult_ori# 为了防止变量被其他topic更新,每次上报的内容都需要先用默认的变量进行复制而不是直接使用默认变量
            if payload['sTimes']==4:#如果sTimes是4,客户端随机返回 上报status=0或1
                print("===========================随机上报设备状态")
                data_pubCommandResult['serialNumber']=payload['serialNumber']#将要上报的serialNumber取自 订阅到的消息里的serialNumber
                data_pubCommandResult['user']=payload['user']
                data_pubCommandResult['sTimes']=payload['sTimes']
                data_pubCommandResult['status']=random.randint(0,1)# 0正常 1异常
                rc, mid = lk.publish_topic(lk.to_full_topic("user/pubCommandResult"), json.dumps(data_pubCommandResult))
                if rc == 0:
                    print("===========================发布设备状态成功【status="+str(data_pubCommandResult['status'])+"】" )
                
                else:
                    print("===========================发布设备状态失败:%d" % rc)
            else:
                #其他处理逻辑代码类似,不再赘述
                pass
        else:
            #其他处理逻辑代码类似,不再赘述
            pass
    pass


def on_subscribe_topic(mid, granted_qos, userdata):
    #granted_qos 为订阅topic列表对应的QoS返回结果,正常值为0或1,128表示订阅失败
    print("函数内>>>>on_subscribe_topic mid:%d, granted_qos:%s" %
          (mid, str(','.join('%s' % it for it in granted_qos))))
    pass


def on_unsubscribe_topic(mid, userdata):
    print("on_unsubscribe_topic mid:%d" % mid)
    pass


def on_publish_topic(mid, userdata):
    print("on_publish_topic mid:%d" % mid)


def tips():

    print("1、模拟启动2次")
    print("2、查看设备配置参数")
    print("3、打开设备二维码")
    print("")
    print("="*45)
def clicked():
    num=txt.get()  
    #直接上报一条消息 启动
    rc, mid = lk.publish_topic(lk.to_full_topic("user/pubStart"), '这里是需要上报的paylod,一般约定是json格式,也可以是这种字符串')
    if rc == 0:
        showinfo("提示", "启动"+num+"次成功")
        print("===========================按键启动topic发送成功")
    else:
        showerror("提示", "启动"+num+"次失败")
        print("===========================按键启动topic发送失败:%d" % rc)
    
lk.on_device_dynamic_register = on_device_dynamic_register
lk.on_connect = on_connect
lk.on_disconnect = on_disconnect
lk.on_topic_message = on_topic_message
lk.on_subscribe_topic = on_subscribe_topic
lk.on_unsubscribe_topic = on_unsubscribe_topic
lk.on_publish_topic = on_publish_topic


lk.config_device_info("Eth|03ACDEFF0032|Eth|03ACDEFF0031")#这里是阿里云sdk默认的,没做修改
lk.connect_async()
lk.start_worker_loop()

while True:
    
    try:
        msg = input()
    except KeyboardInterrupt:
        sys.exit()
    else:
        if msg == "close":
            lk.disconnect()
        elif msg == "start":
            lk.connect_async()

        elif msg == "1":
            #直接上报一条消息 启动
            data_start['serialNumber']=round(time.time()*1000)+999


            data_start['clickNumbers']=2
            rc, mid = lk.publish_topic(lk.to_full_topic("user/pubStart"), json.dumps(data_start))
            if rc == 0:
                print("===========================模拟启动topic发送成功【次数="+str(data_start['clickNumbers'])+"】")
            else:
                print("===========================模拟启动topic发送失败:%d" % rc)

        elif msg=="2":
                print(json.dumps(equ_setting('')))

        elif msg=="3":
                print("示例已关闭")
                '''
                code=pyqrcode.create("hell 123456-asdfghj")#如需要显示中文encoding='UTF-8'即可        
                cXbm=code.xbm(scale=5)#scale生成的二维码图片比例大小

                win=tkinter.Tk()
                win.title("二维码")
                tkBap=tkinter.BitmapImage(data=cXbm)
                tkBap.config(foreground="black")
                tkBap.config(background="white")
                tLable=tkinter.Label(image=tkBap)
                tLable.grid(column=0, row=0)               
                tLable2=tkinter.Label(text="==================")
                txt = tkinter.Entry( width=4)
                txt.insert(0,"2")
                btn = tkinter.Button(text=" 启动 ",command=clicked)
                tLable3=tkinter.Label(text="启动次数->")
                tLable.pack()
                tLable2.pack(fill=tkinter.X)
                
                btn.pack(fill=tkinter.Y,side=tkinter.RIGHT,pady='10px',padx='5px')
                txt.pack(fill=tkinter.Y,side=tkinter.RIGHT,pady='10px',padx='5px')
                tLable3.pack(fill=tkinter.Y,side=tkinter.RIGHT,pady='10px')
                '''
                pass


        elif msg == "A": # 订阅消息,一般只需要订阅一次,后面再启动客户端不需要再订阅,除非操作了取消请阅
            rc, mid = lk.subscribe_topic(lk.to_full_topic("user/subComman"))
            if rc == 0:
                print("===========================subscribe topic success:%r, mid:%r" % (rc, mid))
            else:
                print("===========================subscribe topic fail:%d" % rc)
        elif msg == "B":# 订阅消息,一般只需要订阅一次,后面再启动客户端不需要再订阅,除非操作了取消请阅
            rc, mid = lk.subscribe_topic(lk.to_full_topic("user/subConfig"))
            if rc == 0:
                print("===========================subscribe topic success:%r, mid:%r" % (rc, mid))
            else:
                print("===========================subscribe topic fail:%d" % rc)



        elif msg == "dump":
            ret = lk.dump_user_topics()
            print("user topics:%s", str(ret))
        elif msg == "dest":
            lk.destruct()
            print("destructed")
        else:
            print("="*45)
            print("指令错误,请输入以下清单对应序号!")
            tips()
            #sys.exit()

附录,我的其他几篇关于MQTT的文章:

阿里云物联网MQTT用户名密码生成链接参数(MQTT客户端发布订阅)

python实现mqtt客户端进行topic订阅及发布

基于互联网精神,在注明出处的前提下本站文章可自由转载!

本文链接:https://ranjuan.cn/aliyun-python-sdk-mqtt-client/

赞赏

微信赞赏支付宝赞赏

发表评论