python中websockets與主線程傳遞參數(shù)的實現(xiàn)
一、子線程創(chuàng)建websockets服務(wù)端接收客戶端數(shù)據(jù)并存入隊列
發(fā)送的消息客戶端與服務(wù)端統(tǒng)一,多種消息加入判斷的標(biāo)簽
服務(wù)端:web_server.py
import asyncio import json import base64 import queue import threading import time import cv2 import moment import numpy as np import requests import websockets class WebServer: def __init__(self, host, port): self.host = host self.port = port self.msg_queue = queue.Queue() self.clients = [] self.flag = True async def echo(self, websocket, path): client_ip, client_port = websocket.remote_address self.clients.append(websocket) while True: try: # 在這里處理收到的消息 # async for recv_text in websocket: recv_text = await websocket.recv() with open("aa.txt","w") as f: f.write(recv_text) data = json.loads(recv_text) #if type(data) is not dict: # 判斷數(shù)據(jù) # continue self.msg_queue.put(res) except websockets.ConnectionClosed: print("ConnectionClosed...", websocket.remote_address) # 鏈接斷開 self.clients.remove(websocket) break except websockets.InvalidState: print("InvalidState...", websocket.remote_address) # 無效狀態(tài) self.clients.remove(websocket) break except Exception as err: print("ws:", err) pass def connect(self): asyncio.set_event_loop(asyncio.new_event_loop()) start_server = websockets.serve(self.echo, self.host, self.port) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever() print("連接成功!") def run(self): t = threading.Thread(target=self.connect) t.start() print("已啟動!")
二、主線程內(nèi)啟動子線程接收并處理數(shù)據(jù)
收到消息后根據(jù)情況處理消息
主線程調(diào)用服務(wù)端:main.py
from web_server import WebServer class MainThread: def __init__(self): self.ws = WebServer("192.168.6.28", 8000) self.ws.run() def run(self): while True: try: data = self.ws.msg_queue.get() # flag = data.get("flag") # 內(nèi)容標(biāo)簽 判斷是否是自己想要的內(nèi)容 # if not flag: # continue try: # 處理數(shù)據(jù) print(data) pass except Exception as e: print("報錯:", e) except Exception as err: print("報錯:", err) pass if __name__ == '__main__': M = MainThread() M.run()
客戶端:web_client.py
客戶端連接服務(wù)端,并發(fā)送消息
import json import websocket class WebClient: def __init__(self, host, port): self.host = host self.port = port self.conn = None self.flag = False def connect(self): try: url = f"ws://{self.host}:{self.port}" self.conn = websocket.create_connection(url) self.flag = True except Exception as err: self.flag = False def close(self): self.conn.close() def recv(self): data = self.conn.recv(1024) print(data) def send(self, data): self.conn.send(data) if __name__ == '__main__': host = "192.168.6.28" # host = "127.0.0.1" port = 8000 ws = WebClient(host, port) if not ws.flag: ws.connect() with open("bb.txt") as f: data = f.read() ws.send(data)
到此這篇關(guān)于python中websockets與主線程傳遞參數(shù)的實現(xiàn)的文章就介紹到這了,更多相關(guān)python websockets與主線程傳遞參數(shù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python?操作pdf?pdfplumber讀取PDF寫入Excel
這篇文章主要介紹了Python?操作pdf?pdfplumber讀取PDF寫入Excel,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以考察一下2022-08-08Python根據(jù)詞頻信息(xlsx、csv文件)繪制詞云圖全過程(wordcloud)
這篇文章主要給大家介紹了關(guān)于Python根據(jù)詞頻信息(xlsx、csv文件)繪制詞云圖的相關(guān)資料,wordcloud是基于Python開發(fā)的詞云生成庫,功能強大使用簡單,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-06-06pytorch查看網(wǎng)絡(luò)參數(shù)顯存占用量等操作
這篇文章主要介紹了pytorch查看網(wǎng)絡(luò)參數(shù)顯存占用量等操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05如何利用python將Xmind用例轉(zhuǎn)為Excel用例
這篇文章主要介紹了如何利用python將Xmind用例轉(zhuǎn)為Excel用例,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-06-06python 實現(xiàn)將list轉(zhuǎn)成字符串,中間用空格隔開
今天小編就為大家分享一篇python 實現(xiàn)將list轉(zhuǎn)成字符串,中間用空格隔開,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-12-12Python實現(xiàn)json對值進(jìn)行模糊搜索的示例詳解
我經(jīng)常使用json進(jìn)行存儲配置,于是常常遇到這樣的問題:如果想要對某個數(shù)組里的值進(jìn)行模糊搜索,同時輸出相關(guān)的其他數(shù)組相同位置的的值該如何實現(xiàn)呢?本文就來和大家詳細(xì)聊聊2023-01-01Python如何用str.format()批量生成網(wǎng)址(豆瓣讀書為例)
這篇文章主要介紹了Python如何用str.format()批量生成網(wǎng)址(豆瓣讀書為例),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-09-09django認(rèn)證系統(tǒng) Authentication使用詳解
這篇文章主要介紹了django認(rèn)證系統(tǒng) Authentication使用詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-07-07在Keras中利用np.random.shuffle()打亂數(shù)據(jù)集實例
這篇文章主要介紹了在Keras中利用np.random.shuffle()打亂數(shù)據(jù)集實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-06-06