python如何用pymodbus庫進(jìn)行modbus tcp通信
使用pymodbus庫進(jìn)行modbus tcp通信
使用python解決工業(yè)通信問題是一個(gè)非常好的選擇,python具有豐富的生態(tài),可以輕松解決工業(yè)通信的各種問題。
本篇主要介紹使用pymodbus庫進(jìn)行modbus tcp仿真,實(shí)現(xiàn)pc端讀取plc或工業(yè)設(shè)備modbus變量。
安裝pymodbus:
pip install -U pymodbus
創(chuàng)建modbus tcp server
這里我們先創(chuàng)建一個(gè)虛擬的modbus設(shè)備,如果你手里有一個(gè)plc或者工業(yè)設(shè)備,可以直接跳過本節(jié)。
- modbus_server.py
''' * @Author: liuzhao * @Last Modified time: 2022-10-05 09:56:13 ''' from pymodbus.server.sync import ( StartTcpServer, ) from pymodbus.datastore import ( ModbusSequentialDataBlock, ModbusServerContext, ModbusSlaveContext, ) from pymodbus.version import version datablock = ModbusSequentialDataBlock.create() context = ModbusSlaveContext( di=datablock, co=datablock, hr=datablock, ir=datablock, ) single = True # Build data storage store = ModbusServerContext(slaves=context, single=single) if __name__ == '__main__': address = ("0.0.0.0", 503) StartTcpServer( context=store, # Data storage address=address, # listen address allow_reuse_address=True, # allow the reuse of an address )
直接運(yùn)行該腳本,就可以在本機(jī)的503端口創(chuàng)建一臺modbus設(shè)備了,具體實(shí)現(xiàn)暫不深追,我們學(xué)習(xí)的重點(diǎn)是客戶端對modbus變量的讀寫。
讀寫modbus變量
modbus變量類型以及地址
Object type | Access | Size | Address |
---|---|---|---|
Coil | Read-write | 1 bit | 00001 – 09999 |
Discrete input | Read-only | 1 bit | 10001 – 19999 |
Input register | Read-only | 16 bits | 30001 – 39999 |
Holding register | Read-write | 16 bits | 40001 – 49999 |
coil是線圈,Discrete input是數(shù)字量輸入,Input register是模擬量輸入,Holding register是保持寄存器。一般地址范圍是0-65535
讀取常規(guī)變量
讀寫線圈 | 讀取輸入變量 | 讀寫保持寄存器
from pymodbus.client.sync import ModbusTcpClient from pymodbus.bit_read_message import ReadCoilsResponse from pymodbus.register_read_message import ReadInputRegistersResponse from pymodbus.exceptions import ConnectionException # 連接失敗,用于異常處理 host = '127.0.0.1' port = 503 client = ModbusTcpClient(host,port) # 寫入線圈 client.write_coil(1, True) client.write_coil(2, False) client.write_coil(3, True) # 讀取線圈 注意對于離散量的讀取,第二個(gè)參數(shù)cout是有坑的,必須為8的倍數(shù)個(gè) result:ReadCoilsResponse = client.read_coils(address=1,cout=8) # 從地址1開始讀,讀取8個(gè)線圈,一次讀8的倍數(shù)個(gè)線圈,不設(shè)置為8的倍數(shù)可能會(huì)出現(xiàn)問題 print(result.isError()) # 不建議使用 print(result.getBit(7)) # 這里的參數(shù)address不是plc里的地址,而是python列表的address, print('read_coils ') # 建議使用 print(result.bits) # 打印讀取結(jié)果,一共8位 # 讀取其中的位 print( result.bits[0], result.bits[1], result.bits[2] ) # 相當(dāng)于result.getBit(0) # 讀取數(shù)字輸入 result = client.read_discrete_inputs(address=10001,count=8) # 從10001開始讀,讀取8位 print(result.bits) # 讀取模擬輸入寄存器 input_register_result:ReadInputRegistersResponse = client.read_input_registers(1,count=8) # print(f'is_error:{input_register_result.isError()}') print('read_input_registers ') print(input_register_result.registers) print(input_register_result.getRegister(0)) # 讀寫保持寄存器 client.write_register(address=40001,value=100) result:ReadInputRegistersResponse = client.read_holding_registers(address=40001,count=1) print('read_holding_registers ') print(result.registers) ?!£P(guān)閉連接 client.close()
讀取復(fù)雜變量
字符串、浮點(diǎn)數(shù)、負(fù)數(shù)等
這里需要注意modbus設(shè)備的存儲結(jié)構(gòu)是低位低字節(jié)還是低位高字節(jié),也就是設(shè)備內(nèi)存的字節(jié)、字的排列順序。
根據(jù)不同的設(shè)備,對照下表調(diào)整正確的組合方式。
Word Order | Byte order | Word1 | Word2 |
---|---|---|---|
Big | Big | 0x1234 | 0x5678 |
Big | Little | 0x3412 | 0x7856 |
Little | Big | 0x5678 | 0x1234 |
Little | Little | 0x7856 | 0x3412 |
# 復(fù)雜數(shù)據(jù)類型 from collections import OrderedDict import logging from pymodbus.client.sync import ModbusTcpClient as ModbusClient from pymodbus.constants import Endian from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder ORDER_DICT = {"<": "LITTLE", ">": "BIG"} def run_binary_payload_client(host:str,port:int): for word_endian, byte_endian in ( (Endian.Big, Endian.Big), (Endian.Big, Endian.Little), (Endian.Little, Endian.Big), (Endian.Little, Endian.Little), ): print("-" * 60) print(f"Word Order: {ORDER_DICT[word_endian]}") print(f"Byte Order: {ORDER_DICT[byte_endian]}") print() builder = BinaryPayloadBuilder( wordorder=word_endian, byteorder=byte_endian, ) # 寫入的變量 my_string = "abcd-efgh123345765432" builder.add_string(my_string) builder.add_bits([0, 1, 0, 1, 1, 0, 1, 0]) builder.add_8bit_int(-0x12) builder.add_8bit_uint(0x12) builder.add_16bit_int(-0x5678) builder.add_16bit_uint(0x1234) builder.add_32bit_int(-0x1234) builder.add_32bit_uint(0x12345678) builder.add_16bit_float(12.34) builder.add_16bit_float(-12.34) builder.add_32bit_float(22.34) builder.add_32bit_float(-22.34) builder.add_64bit_int(-0xDEADBEEF) builder.add_64bit_uint(0x12345678DEADBEEF) builder.add_64bit_uint(0x12345678DEADBEEF) builder.add_64bit_float(123.45) builder.add_64bit_float(-123.45) registers = builder.to_registers() print("Writing Registers:") print(registers) print("\n") payload = builder.build() address = 40001 # 從40001開始寫入 # We can write registers client.write_registers(address, registers, unit=1) # 寫入 # 讀取復(fù)雜變量 print("Reading Registers:") address = 40001 count = len(payload) print(f"payload_len {count}") result = client.read_holding_registers(address, count, slave=1) print(result.registers) print("\n") decoder = BinaryPayloadDecoder.fromRegisters( result.registers, byteorder=byte_endian, wordorder=word_endian ) # Make sure word/byte order is consistent between BinaryPayloadBuilder and BinaryPayloadDecoder assert ( decoder._byteorder == builder._byteorder # pylint: disable=protected-access ) # nosec assert ( decoder._wordorder == builder._wordorder # pylint: disable=protected-access ) # nosec decoded = OrderedDict( [ ("string", decoder.decode_string(len(my_string))), ("bits", decoder.decode_bits()), ("8int", decoder.decode_8bit_int()), ("8uint", decoder.decode_8bit_uint()), ("16int", decoder.decode_16bit_int()), ("16uint", decoder.decode_16bit_uint()), ("32int", decoder.decode_32bit_int()), ("32uint", decoder.decode_32bit_uint()), ("16float", decoder.decode_16bit_float()), ("16float2", decoder.decode_16bit_float()), ("32float", decoder.decode_32bit_float()), ("32float2", decoder.decode_32bit_float()), ("64int", decoder.decode_64bit_int()), ("64uint", decoder.decode_64bit_uint()), ("ignore", decoder.skip_bytes(8)), ("64float", decoder.decode_64bit_float()), ("64float2", decoder.decode_64bit_float()), ] ) print("Decoded Data") for name, value in iter(decoded.items()): print( "%s\t" % name, # pylint: disable=consider-using-f-string hex(value) if isinstance(value, int) else value, ) print("\n") # 關(guān)閉連接 client.close() if __name__ == "__main__": run_binary_payload_client("127.0.0.1", 503)
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python實(shí)現(xiàn)對數(shù)坐標(biāo)系繪制與自定義映射
這篇文章主要為大家學(xué)習(xí)介紹了如何利用Python實(shí)現(xiàn)對數(shù)坐標(biāo)系繪制與坐標(biāo)自定義映射,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解一下2023-08-08在VS Code上搭建Python開發(fā)環(huán)境的方法
這篇文章主要介紹了在VS Code上搭建Python開發(fā)環(huán)境的方法,需要的朋友可以參考下2018-04-04Python自動(dòng)檢測SSL證書是否過期的實(shí)現(xiàn)示例
SSL證書是有有效期的,一旦過期就會(huì)失效,從而帶來安全風(fēng)險(xiǎn),本文主要介紹了Python自動(dòng)檢測SSL證書是否過期的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下2023-11-11python庫Tsmoothie模塊數(shù)據(jù)平滑化異常點(diǎn)抓取
這篇文章主要為大家介紹了python庫Tsmoothie模塊數(shù)據(jù)平滑化技術(shù)實(shí)現(xiàn)異常點(diǎn)抓取,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06Python學(xué)習(xí)之時(shí)間包使用教程詳解
本文主要介紹了Python中的內(nèi)置時(shí)間包:datetime包?與?time包?,通過學(xué)習(xí)時(shí)間包可以讓我們的開發(fā)過程中對時(shí)間進(jìn)行輕松的處理,快來跟隨小編一起學(xué)習(xí)一下吧2022-03-03python 邊緣擴(kuò)充方式的實(shí)現(xiàn)示例
本文主要介紹了python 邊緣擴(kuò)充方式的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03