Python訪問OPCUA服務(wù)器,訂閱一個(gè)變量標(biāo)簽方式
Python訪問OPCUA服務(wù)器,訂閱一個(gè)變量標(biāo)簽
from opcua import Client from opcua import ua import time #By Weloveut:Python訪問OPCUA服務(wù)器,展示get變量的方法和注冊逢變觸發(fā)的回調(diào)函數(shù) #自定義回調(diào)函數(shù) class SubHandler(object): def data_change(self, handle, node, val, attr): print("Python: New data change event", handle, node, val, attr) #創(chuàng)建UA的客戶端,包含IP地址和端口號 client = Client("opc.tcp://172.32.1.82:4840/") #建立到服務(wù)器連接 client.connect() #獲取object對象 objects = client.get_objects_node() #獲取根對象 root = client.get_root_node() #依據(jù)變量地址創(chuàng)建變量對象 myvar = client.get_node('ns=3;s="clocl0.5hz"."tags"[0]') #獲取變量當(dāng)前值 valuetmp=myvar.get_value() print(valuetmp) #注冊到服務(wù)器的變量訂閱,變量逢變觸發(fā) handler = SubHandler() sub = client.create_subscription(500, handler) sub.subscribe_data_change(myvar) time.sleep(100000) client.disconnect()
opcua-python學(xué)習(xí)踩坑記錄
OPCUA通信協(xié)議介紹
OPC UA(OPC 統(tǒng)一架構(gòu))是一種用于工業(yè)控制系統(tǒng)的通信協(xié)議。它是一種開放的、平臺無關(guān)的協(xié)議,可用于連接各種工業(yè)設(shè)備和軟件應(yīng)用程序,使它們能夠互相通信并共享數(shù)據(jù)。
OPC UA 的主要優(yōu)點(diǎn)在于,它能夠提供一種統(tǒng)一的方式來連接不同廠商的設(shè)備和應(yīng)用程序。這意味著,使用 OPC UA 的軟件應(yīng)用程序可以與使用其他協(xié)議的設(shè)備和應(yīng)用程序進(jìn)行通信,而無需考慮底層的通信協(xié)議。
其實(shí)這類似于物聯(lián)網(wǎng)中的mqtt協(xié)議,只不過mqtt較為輕量級,opcua則是用于工業(yè)相關(guān)的控制系統(tǒng)
OPCUA的兩種工作方式
- 客戶端/服務(wù)器模式:在這種模式下,OPC UA 客戶端向 OPC UA 服務(wù)器請求數(shù)據(jù),服務(wù)器接收請求并向客戶端發(fā)送數(shù)據(jù)。
- 發(fā)布/訂閱模式:在這種模式下,OPC UA 服務(wù)器將數(shù)據(jù)“發(fā)布”到網(wǎng)絡(luò)上,而客戶端可以“訂閱”特定的數(shù)據(jù)。當(dāng)服務(wù)器更新數(shù)據(jù)時(shí),客戶端會收到更新的數(shù)據(jù)。
這兩種模式可以根據(jù)應(yīng)用場景的需要進(jìn)行選擇。例如,客戶端/服務(wù)器模式適用于客戶端需要向服務(wù)器請求單個(gè)數(shù)據(jù)項(xiàng)的情況,而發(fā)布/訂閱模式則適用于客戶端需要定期接收大量數(shù)據(jù)的情況。
個(gè)人覺得客戶端/服務(wù)器模式類似于TCP通訊協(xié)議,是點(diǎn)對點(diǎn)的,對實(shí)時(shí)性要求比較高,而發(fā)布訂閱模式類似于UDP通訊協(xié)議,對實(shí)時(shí)性要求并沒有那么高,但接收數(shù)據(jù)較多
其中發(fā)布訂閱模式對小的嵌入式設(shè)備(stm32系列)比較友好,可以融合物聯(lián)網(wǎng)的mqtt協(xié)議,真正將opcua納入網(wǎng)絡(luò),而不只是自己的工業(yè)網(wǎng)絡(luò)
OPCUA-python是什么?
顧名思義,那就是利用python實(shí)現(xiàn)這種通信協(xié)議的庫唄,這個(gè)庫目前已經(jīng)停止更新,開發(fā)者們開發(fā)了一個(gè)新的庫,名為opcua-asyncio,可通過如下鏈接訪問:
https://github.com/FreeOpcUa/opcua-asyncio
不過做一些比較基本的功能,只用OPCUA-python這個(gè)庫還是夠用的
服務(wù)器常用函數(shù)
endpoint = "opc.tcp://{}:{}".format(url, port) myserver.set_endpoint(endpoint)
設(shè)置endpoint(其實(shí)endpoint就像一個(gè)服務(wù)器鏈接)
myserver = opcua.Server()
開啟opcua服務(wù)器
objects = myserver.get_objects_node()
設(shè)置服務(wù)器節(jié)點(diǎn)(使客戶端得以訪問)
addspace = myserver.register_namespace(name)
設(shè)置服務(wù)器命名空間
root = myserver.get_root_node()
獲取服務(wù)器根節(jié)點(diǎn)
objects.add_method(1, "Conveyor", Start_Conveyor_prog)
為節(jié)點(diǎn)(objects) 添加方法Start_Conveyor_prog
客戶端常用函數(shù)
myclient = Client(endpoint)
在客戶端引用endpoint
myclient.get_node("ns=2;i=3")
在客戶端里使用ns=2;i=3這個(gè)節(jié)點(diǎn)
Start_Lathe_Prog1 = method[4]
調(diào)用服務(wù)器里設(shè)置的方法
Workpiece = objects_node.call_method(Start_Conveyor_prog,[WorkpieceID.get_value()])
call_method函數(shù),第一個(gè)參數(shù)是節(jié)點(diǎn)名稱,第二個(gè)參數(shù)是值,是這個(gè)節(jié)點(diǎn)里的值,屬于數(shù)組,或者傳遞字典也是可以的
return_value_kuka_prog1 = objects_node.call_method(Start_Kuka_Prog1)
返回某個(gè)方法
示例程序:溫度實(shí)時(shí)檢測,每隔五秒傳到客戶端
服務(wù)器程序:
import opcua from time import sleep from random import randint from datetime import datetime # Assign endpoint URL url = "localhost" port = 5001 endpoint = "opc.tcp://{}:{}".format(url, port) # create instance/object of type opcua.Server myserver = opcua.Server() # Assign endpoint url on the OPC UA server address space myserver.set_endpoint(endpoint) # Create a name for OPC UA namespace name = "Temperature Sensor" # Register the name on OPC UA namespace addspace = myserver.register_namespace(name) # Get reference to the Objects node of the OPC UA server objects = myserver.get_objects_node() # Create objects on the object node param = objects.add_object(addspace, "parameters") # Create variables Sensor_name = param.add_variable(addspace, "Sensor Name", "Temperature_Sensor_SF12") Temperature = param.add_variable(addspace, "Temperature Value", 'NA') # Set variable as writable Sensor_name.set_writable() Temperature.set_writable() ######################################################################################### # Present the data structure of the OPC UA server root = myserver.get_root_node() print("Root Node Id: %s" % root) print("Children of root are: %s" % root.get_children()) print() my_objects = myserver.get_objects_node() print("Defined object Node Id: %s" % my_objects) print("Children of the defined object are: %s" % my_objects.get_children()) print() sensor_name_node = my_objects.get_children()[1].get_children()[0] print("Sensor name node Id: %s" % sensor_name_node) print("Sensor name node browse name: %s" % sensor_name_node.get_browse_name()) print("Sensor name default value: %s" % sensor_name_node.get_value()) print() temperature_node = my_objects.get_children()[1].get_children()[1] print("Temperature node Id: %s" % temperature_node) print("Temperature node browse name: %s" % temperature_node.get_browse_name()) print("Temperature default value: %s" % temperature_node.get_value()) print() ######################################################################################## # starting! The server will continue running myserver.start() current_time = str(datetime.now().time())[:-7] print("{} - Server is initialising...".format(current_time)) while True: sleep(5) current_time = str(datetime.now().time())[:-7] current_temp = randint(10, 25) Temperature.set_value(current_temp) # publish temperature value print("{} - Current temperature: {} Celsius".format(current_time, current_temp))
客戶端程序:
from opcua import Client from datetime import datetime # Assign endpoint URL url = "localhost" port = 5001 endpoint = "opc.tcp://{}:{}".format(url, port) # Assign endpoint url on the OPC UA client address space myclient = Client(endpoint) # Connect to server myclient.connect() # Assign nodes Temperature_node = myclient.get_node("ns=2;i=3") # Subhandler Class from OPC UA class SubHandler(object): """ Subscription Handler. To receive events from server for a subscription """ def datachange_notification(self, node, val, data): """ called for every datachange notification from server """ global DataChange # Global variable current_time = str(datetime.now().time())[:-7] DataChange = val # Assigning value to global variable print("{} - Received temperature: {} Celsius".format(current_time, val)) # Initailise variable nodes = myclient.get_objects_node() DataChange = "null" # Create Sub handler handler = SubHandler() # create subscription by passing period in milliseconds and handler subscribe = myclient.create_subscription(0, handler) # pass variable node to subscribe data change method handler = subscribe.subscribe_data_change(Temperature_node)
程序運(yùn)行效果:服務(wù)器
程序運(yùn)行效果:客戶端
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
詳解python ThreadPoolExecutor異常捕獲
本文主要介紹了詳解python ThreadPoolExecutor異常捕獲,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01

詳解Python中__str__和__repr__方法的區(qū)別

python實(shí)現(xiàn)簡易動態(tài)時(shí)鐘

Django實(shí)現(xiàn)從數(shù)據(jù)庫中獲取到的數(shù)據(jù)轉(zhuǎn)換為dict

python添加列表元素append(),extend()及?insert()

Python實(shí)現(xiàn)微信公眾平臺自定義菜單實(shí)例