python實現(xiàn)NB-IoT模塊遠程控制
本來想嘗試下如果不使用運營商網(wǎng)絡應用平臺情況下,只是在服務商服務器上是否可以實現(xiàn)對終端完全控制,如果這樣可行,那么物聯(lián)網(wǎng)應用服務端更有靈活性。實際情況下,很難實現(xiàn)和運營商網(wǎng)絡對等的處理,用python代碼原型確實能夠?qū)崿F(xiàn)參數(shù)的變化(如PSM,eDXR等),但是終端分配的IP地址畢竟屬于接入網(wǎng)部分,更近似一個局域網(wǎng),如果采用其他方式訪問(如IMSI、IMEI等),還是需要與運營商核心網(wǎng)進行配合。以下是嘗試遠程控制的實現(xiàn)方法。
主要實現(xiàn)功能
1、使用python pyserial模塊通過串口發(fā)送AT命令給模組進行參數(shù)修改,參考<使用python pyserial模塊串口通信>;
2、通過inter網(wǎng)進行控制命令傳輸,選用UDP進行主機控制,參考<python socket網(wǎng)絡接口編程>;
3、直接通過NB-IoT無線網(wǎng)絡進行控制命令的傳輸;
4、python多窗口處理服務器端程序,實現(xiàn)接收和發(fā)送同時進行;
遠程控制主機腳本
服務器端程序:監(jiān)測UDP對應的端口號,如果接收到register信息則返回allowed,然后進入命令輸入狀態(tài),等待命令輸入完成,發(fā)送給終端,等待終端反饋,并接續(xù)下一個命令傳送。
#!/usr/bin/python3.6 import socket import sys import re BUFFER_SIZE = 1024 TARGET_ADDR = '' TARGET_PORT = 60000 TARGET = (TARGET_ADDR,TARGET_PORT) ss = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) ss.bind(TARGET) print("server online!! wait for register!") data,addrRsv = ss.recvfrom(BUFFER_SIZE) if not data: sys.exit(0) else: print(data) if(re.match(b'register',data)): ss.sendto(b'allowed',addrRsv) else: ss.sendto(b'reject',addrRsv) while True: #等待命令輸入 aa = input('cmd > ') if not aa: break else: cmdV = aa+'\r' ss.sendto(cmdV.encode('utf-8'),addrRsv) #等待結(jié)果返回 data,addrRsv = ss.recvfrom(BUFFER_SIZE) if not data: break else: print(data) ss.close()
客戶主機程序:發(fā)送register并成功接收allowed后,等待控制命令,通過串口轉(zhuǎn)發(fā)給終端模塊,并接收終端模塊的反饋消息,返回給服務器側(cè)。
#!/usr/bin/python3.6 import serial import sys import os import re import socket #初始化UART端口 ser = serial.Serial("COM5",9600,timeout=30) #選擇相應的協(xié)議類型UDP ss = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) BUFFER_SIZE = 1024 TARGET_ADDR = 'IP address' TARGET_PORT = 60000 TARGET = (TARGET_ADDR,TARGET_PORT) aa = '開機命令'.encode('utf-8') #convert to bytes type ser.write(aa) while True: line = ser.readline() if not line: print("can not get cmd result, release!") sys.exit(0) print(line) if ( re.match(b'OK',line) ): break ss.sendto(bytes('register','utf-8'),TARGET) data,addrRsv = ss.recvfrom(BUFFER_SIZE) if re.match(b'allowed',data): print('register successfully!') pass else: print('register failure') sys.exit(0) while True: data,addrRsv = ss.recvfrom(BUFFER_SIZE) if not data: print("time out,release now!!") break elif re.match(b'end',data): print("end of process!!") break; ser.write(data) while True: line = ser.readline() if not line: print("can not get cmd result, release!") break print(line) if ( re.match(b'OK',line) ): ss.sendto(bytes('OK','utf-8'),TARGET) break elif(re.match(b'ERROR',line)): ss.sendto(bytes('ERROR','utf-8'),TARGET) break else: pass ser.close()
多線程窗口
為了使得服務器端能夠?qū)崿F(xiàn)同時實現(xiàn)接收和發(fā)送,可以在服務器端開啟兩個窗口進行監(jiān)聽,示例如下:
啟動代碼
#!/usr/bin/python3.6 import threading import time import subprocess import os import sys def thread_fun1(): #global vlock while(1): print("thread fun1 is running!!!") time.sleep(1) #... ... print(len(sys.argv)) #vlock = threading.Lock() t1 = threading.Thread(target=thread_fun1,args=()) t1.start() addr = 'IP address' port = 60000 cmdStr = "python anotherThread.py %s %d"%(addr,port) #設置creationflags = subprocess.CREATE_NEW_CONSOLE,用來創(chuàng)建新的控制臺窗口 subprocess.Popen(cmdStr,creationflags = subprocess.CREATE_NEW_CONSOLE)
anotherThread.py
#!/usr/bin/python3.6 def thread_fun2(): while(1): aa = input('cmd > ') print("thread fun2 is running!!!") print(aa) if(aa == 'end'): break thread_fun2()
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
pandas 對series和dataframe進行排序的實例
今天小編就為大家分享一篇pandas 對series和dataframe進行排序的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-06-06淺談tf.train.Saver()與tf.train.import_meta_graph的要點
這篇文章主要介紹了淺談tf.train.Saver() 與tf.train.import_meta_graph的要點,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-05-05Python 基于Twisted框架的文件夾網(wǎng)絡傳輸源碼
這篇文章主要介紹了Python 基于Twisted框架的文件夾網(wǎng)絡傳輸源碼,需要的朋友可以參考下2016-08-08python回調(diào)函數(shù)中使用多線程的方法
這篇文章主要介紹了python回調(diào)函數(shù)中使用多線程的方法,需要的朋友可以參考下2017-12-12