欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python如何用pymodbus庫進(jìn)行modbus tcp通信

 更新時(shí)間:2024年06月29日 09:44:13   作者:brandon_l  
這篇文章主要介紹了python如何用pymodbus庫進(jìn)行modbus tcp通信問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

使用pymodbus庫進(jìn)行modbus tcp通信

使用python解決工業(yè)通信問題是一個(gè)非常好的選擇,python具有豐富的生態(tài),可以輕松解決工業(yè)通信的各種問題。

本篇主要介紹使用pymodbus庫進(jìn)行modbus tcp仿真,實(shí)現(xiàn)pc端讀取plc或工業(yè)設(shè)備modbus變量。

安裝pymodbus:

pip install -U pymodbus

創(chuàng)建modbus tcp server

這里我們先創(chuàng)建一個(gè)虛擬的modbus設(shè)備,如果你手里有一個(gè)plc或者工業(yè)設(shè)備,可以直接跳過本節(jié)。

  • modbus_server.py
'''
 * @Author: liuzhao 
 * @Last Modified time: 2022-10-05 09:56:13 
'''

from pymodbus.server.sync import (
    StartTcpServer,
)
from pymodbus.datastore import (
    ModbusSequentialDataBlock,
    ModbusServerContext,
    ModbusSlaveContext,
)
from pymodbus.version import version

datablock = ModbusSequentialDataBlock.create()
context = ModbusSlaveContext(
    di=datablock,
    co=datablock,
    hr=datablock,
    ir=datablock,
    )
single = True

# Build data storage
store = ModbusServerContext(slaves=context, single=single)


if __name__ == '__main__':

	address = ("0.0.0.0", 503)
	StartTcpServer(
	    context=store,  # Data storage
	    address=address,  # listen address
	  	allow_reuse_address=True,  # allow the reuse of an address
	)

直接運(yùn)行該腳本,就可以在本機(jī)的503端口創(chuàng)建一臺modbus設(shè)備了,具體實(shí)現(xiàn)暫不深追,我們學(xué)習(xí)的重點(diǎn)是客戶端對modbus變量的讀寫。

讀寫modbus變量

modbus變量類型以及地址

Object typeAccessSizeAddress
CoilRead-write1 bit00001 – 09999
Discrete inputRead-only1 bit10001 – 19999
Input registerRead-only16 bits30001 – 39999
Holding registerRead-write16 bits40001 – 49999

coil是線圈,Discrete input是數(shù)字量輸入,Input register是模擬量輸入,Holding register是保持寄存器。一般地址范圍是0-65535

讀取常規(guī)變量

讀寫線圈 | 讀取輸入變量 | 讀寫保持寄存器

from pymodbus.client.sync import ModbusTcpClient
from pymodbus.bit_read_message import ReadCoilsResponse
from pymodbus.register_read_message import ReadInputRegistersResponse
from pymodbus.exceptions import ConnectionException      # 連接失敗,用于異常處理

host = '127.0.0.1'
port = 503
client = ModbusTcpClient(host,port)


# 寫入線圈
client.write_coil(1, True)
client.write_coil(2, False)
client.write_coil(3, True)

# 讀取線圈    注意對于離散量的讀取,第二個(gè)參數(shù)cout是有坑的,必須為8的倍數(shù)個(gè)
result:ReadCoilsResponse = client.read_coils(address=1,cout=8)     # 從地址1開始讀,讀取8個(gè)線圈,一次讀8的倍數(shù)個(gè)線圈,不設(shè)置為8的倍數(shù)可能會(huì)出現(xiàn)問題
print(result.isError())

# 不建議使用
print(result.getBit(7))            # 這里的參數(shù)address不是plc里的地址,而是python列表的address,

print('read_coils ')

# 建議使用
print(result.bits)        # 打印讀取結(jié)果,一共8位
# 讀取其中的位
print(                   
    result.bits[0],
    result.bits[1],
    result.bits[2]
    )         # 相當(dāng)于result.getBit(0)


# 讀取數(shù)字輸入
result = client.read_discrete_inputs(address=10001,count=8)    # 從10001開始讀,讀取8位
print(result.bits)


# 讀取模擬輸入寄存器
input_register_result:ReadInputRegistersResponse = client.read_input_registers(1,count=8)
# print(f'is_error:{input_register_result.isError()}')
print('read_input_registers ')
print(input_register_result.registers)   
print(input_register_result.getRegister(0))   


# 讀寫保持寄存器
client.write_register(address=40001,value=100)
result:ReadInputRegistersResponse = client.read_holding_registers(address=40001,count=1)
print('read_holding_registers ')
print(result.registers)

?!£P(guān)閉連接
client.close()

讀取復(fù)雜變量

字符串、浮點(diǎn)數(shù)、負(fù)數(shù)等

這里需要注意modbus設(shè)備的存儲結(jié)構(gòu)是低位低字節(jié)還是低位高字節(jié),也就是設(shè)備內(nèi)存的字節(jié)、字的排列順序。

根據(jù)不同的設(shè)備,對照下表調(diào)整正確的組合方式。

Word OrderByte orderWord1Word2
BigBig0x12340x5678
BigLittle0x34120x7856
LittleBig0x56780x1234
LittleLittle0x78560x3412
# 復(fù)雜數(shù)據(jù)類型

from collections import OrderedDict
import logging

from pymodbus.client.sync import ModbusTcpClient as ModbusClient

from pymodbus.constants import Endian
from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder



ORDER_DICT = {"<": "LITTLE", ">": "BIG"}


def run_binary_payload_client(host:str,port:int):
  
    for word_endian, byte_endian in (
        (Endian.Big, Endian.Big),
        (Endian.Big, Endian.Little),
        (Endian.Little, Endian.Big),
        (Endian.Little, Endian.Little),
    ):
        print("-" * 60)
        print(f"Word Order: {ORDER_DICT[word_endian]}")
        print(f"Byte Order: {ORDER_DICT[byte_endian]}")
        print()
    
        builder = BinaryPayloadBuilder(
            wordorder=word_endian,
            byteorder=byte_endian,
        )
		
		# 寫入的變量
        my_string = "abcd-efgh123345765432"
        builder.add_string(my_string)
        builder.add_bits([0, 1, 0, 1, 1, 0, 1, 0])
        builder.add_8bit_int(-0x12)
        builder.add_8bit_uint(0x12)
        builder.add_16bit_int(-0x5678)
        builder.add_16bit_uint(0x1234)
        builder.add_32bit_int(-0x1234)
        builder.add_32bit_uint(0x12345678)
        builder.add_16bit_float(12.34)
        builder.add_16bit_float(-12.34)
        builder.add_32bit_float(22.34)
        builder.add_32bit_float(-22.34)
        builder.add_64bit_int(-0xDEADBEEF)
        builder.add_64bit_uint(0x12345678DEADBEEF)
        builder.add_64bit_uint(0x12345678DEADBEEF)
        builder.add_64bit_float(123.45)
        builder.add_64bit_float(-123.45)
        registers = builder.to_registers()
        print("Writing Registers:")
        print(registers)
        print("\n")
        payload = builder.build()
        address = 40001          # 從40001開始寫入
        # We can write registers
        client.write_registers(address, registers, unit=1)    # 寫入
     	
     	# 讀取復(fù)雜變量
        print("Reading Registers:")
        address = 40001

        count = len(payload)
        print(f"payload_len {count}")
        result = client.read_holding_registers(address, count, slave=1)
        print(result.registers)
        print("\n")
        decoder = BinaryPayloadDecoder.fromRegisters(
            result.registers, byteorder=byte_endian, wordorder=word_endian
        )
        # Make sure word/byte order is consistent between BinaryPayloadBuilder and BinaryPayloadDecoder
        assert (
            decoder._byteorder == builder._byteorder  # pylint: disable=protected-access
        )  # nosec
        assert (
            decoder._wordorder == builder._wordorder  # pylint: disable=protected-access
        )  # nosec

        decoded = OrderedDict(
            [
                ("string", decoder.decode_string(len(my_string))),
                ("bits", decoder.decode_bits()),
                ("8int", decoder.decode_8bit_int()),
                ("8uint", decoder.decode_8bit_uint()),
                ("16int", decoder.decode_16bit_int()),
                ("16uint", decoder.decode_16bit_uint()),
                ("32int", decoder.decode_32bit_int()),
                ("32uint", decoder.decode_32bit_uint()),
                ("16float", decoder.decode_16bit_float()),
                ("16float2", decoder.decode_16bit_float()),
                ("32float", decoder.decode_32bit_float()),
                ("32float2", decoder.decode_32bit_float()),
                ("64int", decoder.decode_64bit_int()),
                ("64uint", decoder.decode_64bit_uint()),
                ("ignore", decoder.skip_bytes(8)),
                ("64float", decoder.decode_64bit_float()),
                ("64float2", decoder.decode_64bit_float()),
            ]
        )
        print("Decoded Data")
        for name, value in iter(decoded.items()):
            print(
                "%s\t" % name,  # pylint: disable=consider-using-f-string
                hex(value) if isinstance(value, int) else value,
            )
        print("\n")

	# 關(guān)閉連接
    client.close()


if __name__ == "__main__":
    run_binary_payload_client("127.0.0.1", 503)    

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論