欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python基于WebSocket實(shí)現(xiàn)簡易屏幕共享工具

 更新時間:2025年01月03日 09:08:49   作者:CrazyDragon_King  
這篇文章主要為大家詳細(xì)介紹了Python如何基于WebSocket實(shí)現(xiàn)簡易屏幕共享工具,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下

前面寫了兩個簡單的屏幕共享工具,不過那只是為了驗(yàn)證通過截屏的方式是否可行,因?yàn)橥ǔJ謩咏仄恋念l率很低,而對于視頻來說它的幀率要求就很高了,至少要一秒30幀率左右。所以,經(jīng)過實(shí)際的截屏工具驗(yàn)證,我了解了幾個Python截屏庫的特點(diǎn)和限制。例如,多數(shù)截屏庫都不支持很高的截屏速度,并且截屏是典型的 CPU 密集任務(wù)(我嘗試使用多線程截屏,發(fā)現(xiàn)速度更慢了,之后有時間我也會把這一點(diǎn)整理成文章發(fā)出來)。
所以,我的初始的想法其實(shí)是基于 WebSocket 來實(shí)現(xiàn)的?,F(xiàn)在,就讓我們對先前的代碼進(jìn)行重構(gòu),采用 WebSocket 來傳輸圖片數(shù)據(jù)。不過我這里沒有使用到它的雙向傳輸?shù)奶匦?,只是將原?HTTP 傳輸?shù)膱D片換成通過 WebSocket 來傳輸了。不過這里后續(xù)還有很多東西可以開發(fā),如果有時間的話,也可以基于這個做一些有趣的東西。

演示

我這個筆記本的性能可能不太行,我只要打開視頻幀率就降低了很多,哈哈。

截取播放B站視頻

截取攝像頭畫面

這樣甚至可以遠(yuǎn)程共享畫面了,如果兩個人都布置一個,就可以各自看到對話了(不過沒有聲音,且效率低下,可能也就只能在局域網(wǎng)使用),不過這樣它是相當(dāng)于從服務(wù)器的地方獲取的數(shù)據(jù),而我們平時使用的視頻通話工具都是從客戶端獲取的數(shù)據(jù)。

而且頁面越多,幀率越低,這里可能要優(yōu)化一下或者它就是這么累贅,只能個人使用。=

說明

注意,這里的測試環(huán)境是 Windows,因?yàn)橛行煲蕾囉?Windows 提供的特性,所以需要在 Windows 上運(yùn)行它。在程序啟動時,它會開啟一個截屏的線程,不斷去獲取屏幕的截屏,如果有用戶訪問,它就會通過 WebSocket 連接,把獲取的屏幕截圖數(shù)據(jù)傳送給前端,前端的邏輯就是將它繪制在 canvas 上,并添加幀率顯示。

注意:這部分前端的代碼是 AI 生成,對于驗(yàn)證小功能來說,AI 真是太完美了。而且,其實(shí)這整個部分都可以讓 AI 來做,但是具體是哪些的組合還是自己選擇的,畢竟每個人的偏好和技術(shù)棧不同。

代碼

所有的代碼都在這里了,大概60行代碼,我把前端壓縮成一行代碼了。如果要運(yùn)行代碼先要安裝 flask, flask_sock, pillow, dxcam 庫。

import time
from flask import Flask
from flask_sock import Sock
from io import BytesIO
from PIL import Image
import dxcam

# 創(chuàng)建應(yīng)用
app = Flask(__name__)
sockets = Sock(app)
# 整個應(yīng)用只創(chuàng)建一個即可
camera = dxcam.create(device_idx=0, output_idx=1)  # output_idx 0 是第一塊屏幕,1 是第二塊屏幕
camera.start(target_fps=60, video_mode=True)
JPEG_QUALITY = 80 # 默認(rèn)的jpeg圖片質(zhì)量,越高需要的計(jì)算量越大,同時越清晰


# 直接前后端寫一起了,這個只是一個演示的demo
INDEX_HTML = """
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Screen Sharing</title> <style> body { display: flex; justify-content: center; align-items: center; height: 100vh; margin: 0; background-color: #f0f0f0; } canvas { border: 1px solid black; } </style> </head> <body> <canvas id="screenCanvas" width="960" height="540"></canvas> <script> const canvas = document.getElementById('screenCanvas'); const ctx = canvas.getContext('2d'); const ws = new WebSocket('ws://'+window.location.host+'/remote_desktop'); ws.onopen = () => { console.log("WebSocket connected"); }; ws.onerror = (event) => { console.error("WebSocket error:", event); }; let lastFrameTime = performance.now(); let frameCount = 0; let fps = 0; ws.onmessage = (event) => { const image = new Image(); image.src = URL.createObjectURL(event.data); image.onload = () => { ctx.drawImage(image, 0, 0, canvas.width, canvas.height); frameCount++; const now = performance.now(); const elapsed = now - lastFrameTime; if (elapsed >= 1000) { fps = frameCount / (elapsed / 1000); frameCount = 0; lastFrameTime = now; } displayFPS(); }; }; function displayFPS() { ctx.font = '30px Arial'; ctx.fillStyle = 'red'; ctx.fillText(`FPS: ${Math.round(fps)}`, 50, 50); } </script> </body> </html>
"""

@app.route('/', methods=['GET'])
def index():
    """簡單的前端"""
    return INDEX_HTML


@sockets.route('/remote_desktop')
def get_desktop(ws):
    """
    獲取一幀圖片,并發(fā)送給前端
    """
    buffer = BytesIO()
    fps = 0    # 計(jì)算后端生成的幀率
    frame_count = 0
    last_frame_time = time.perf_counter()
    while True:
        reset_buffer(buffer)  # 每次重置buffer,方便復(fù)用
        img_data = get_frame(buffer)
        frame_count += 1
        elapsed = time.perf_counter() - last_frame_time
        if elapsed >= 1:
            fps = frame_count // elapsed
            last_frame_time = time.perf_counter()
            frame_count = 0
            print("backend real frame fps: ", fps)
        ws.send(img_data)


def get_frame(buffer):
    """
    獲取一幀,然后處理成jpeg并返回二進(jìn)制數(shù)據(jù)
    """
    image_array = camera.get_latest_frame()
    Image.fromarray(image_array).save(buffer, format="JPEG", quality=JPEG_QUALITY)
    return buffer.getvalue()


def reset_buffer(buffer):
    # 重置buffer,方便復(fù)用
    buffer.truncate(0)
    buffer.seek(0)


if __name__ == '__main__':    
    print("remote desktop server starting...")
    app.run("0.0.0.0", 9000)

方法補(bǔ)充

除了上文的方法,小編為大家整理了其他兩個實(shí)現(xiàn)屏幕共享的方法,希望對大家有所幫助

方法一:50 行代碼實(shí)現(xiàn)簡易屏幕共享工具

思路

它的思路很簡單,程序不斷的截屏然后發(fā)送給瀏覽器去顯示,效果也還湊合吧。這里最重要的就是 multipart/x-mixed-replace; boundary=frame 這個首部的使用了,好幾年前還在大學(xué)時,我買了一個樹莓派+攝像頭,使用一個開源的軟件叫 motion,搭建了一個局域網(wǎng)的視頻 監(jiān)控。這個 motion 的網(wǎng)頁端就是使用該首部實(shí)現(xiàn)的,因?yàn)槲耶?dāng)時很好奇就去了解了它。

代碼

如果要運(yùn)行代碼需要安裝對應(yīng)的依賴:

pip install flask, mss, PIL
import time
from io import BytesIO
from flask import Flask, Response
import mss
from PIL import Image, ImageDraw, ImageFont

app = Flask(__name__)

@app.route('/monitor', methods=['GET'])
def monitor():
    """獲取屏幕數(shù)據(jù)并返回"""
    def gen_img():
        with mss.mss() as sct:
            # 獲取屏幕的全部區(qū)域
            screen  = sct.monitors[1]
            buffer = BytesIO()
            fps = 0  # 幀率
            frame_count = 0 # 計(jì)算幀率,這是服務(wù)端生成的速率,到了客戶端還會更低
            start = time.time()
            while True:
                # 抓取屏幕并返回
                screenshot = sct.grab(screen)                                                                           
                # 將截圖數(shù)據(jù)轉(zhuǎn)換為 JPEG 格式的二進(jìn)制數(shù)據(jù)
                img = Image.frombytes("RGB", screenshot.size, screenshot.bgra, "raw", "BGRX")
                if fps > 0:
                    img = add_fps(img, fps)
                img.save(buffer, format="JPEG")
                jpeg_data = buffer.getvalue()
                yield (b'--frame\r\n' +
                    b'Content-Type: image/jpeg\r\n\r\n' + jpeg_data + b'\r\n')
                # 重置buffer,方便復(fù)用
                buffer.seek(0)
                buffer.truncate(0)
                frame_count += 1
                elapsed = time.time() - start
                if elapsed >= 1:
                    fps = frame_count // elapsed
                    frame_count = 0
                    start = time.time()
    return Response(gen_img(), mimetype="multipart/x-mixed-replace; boundary=frame")

def add_fps(img, fps):
    """在圖片上添加fps"""
    draw = ImageDraw.Draw(img)
    draw.text((100, 100), f"fps: {fps}", (255, 0, 0), ImageFont.truetype("arial.ttf", 100))
    return img

if __name__ == '__main__':
    print("monitoring...")
    app.run("127.0.0.1", 9000)

方法二:簡易共享屏幕工具改進(jìn)版

改進(jìn)

1.降低分辨率

方法一那個測試的幀率低,最大的原因是電腦的分辨率有點(diǎn)高了,它是2.5K屏幕,所以這次換成了 1920*1080 的屏幕來測試,效果就好很多了,基本都在 50 幀左右。

2.更換截屏庫

更換了一個速度更快的截屏庫,這個截屏庫的速度很快,比方法一那個快多了,但是反應(yīng)到實(shí)際的幀率上就低了,所以還是后續(xù)其他的操作太過于耗時,但是暫時也沒有想到啥優(yōu)化的好方法。

3.降低圖片的質(zhì)量

JPEG 格式的圖片是可以選擇壓縮質(zhì)量的,適當(dāng)?shù)恼{(diào)低質(zhì)量可以提高處理的速度。

代碼

from flask import Flask, Response
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
import time
import dxcam

app = Flask(__name__)
# 整個應(yīng)用只創(chuàng)建一個即可
camera = dxcam.create(device_idx=0, output_idx=1)  # output_idx 0 是第一塊屏幕,1 是第二塊屏幕
camera.start(target_fps=60, video_mode=True)

@app.route('/monitor', methods=['GET'])
def monitor():
    """獲取屏幕數(shù)據(jù)并返回"""
    def gen_img():
        # 獲取屏幕的全部區(qū)域
        fps = 0  # 幀率
        frame_count = 0 # 計(jì)算幀率,這是服務(wù)端生成的速率,到了客戶端還會更低
        start_time = time.perf_counter()
        buffer = BytesIO()
        while True:
            reset_buffer(buffer)  # 每次重置buffer,方便復(fù)用
            t1 = time.perf_counter()
            yield get_frame(buffer, fps)
            print("get frame fps: ", 1.0 / (time.perf_counter() - t1))
            frame_count += 1
            elapsed_time = time.perf_counter() - start_time
            if elapsed_time >= 1:
                fps = frame_count // elapsed_time
                frame_count = 0
                start_time = time.perf_counter()
                print("real frame fps: ", fps)
    return Response(gen_img(), mimetype="multipart/x-mixed-replace; boundary=frame")

def get_frame(buffer, fps):
    """
    獲取一幀,然后處理成jpeg并返回二進(jìn)制數(shù)據(jù)
    """
    image_array = camera.get_latest_frame()
    img = Image.fromarray(image_array)
    if fps > 0:
        img = add_fps(img, fps)
    img.save(buffer, format="JPEG", quality=20)
    return (b'--frame\r\n' +
            b'Content-Type: image/jpeg\r\n\r\n' + buffer.getvalue() + b'\r\n')


def reset_buffer(buffer):
    # 重置buffer,方便復(fù)用
    buffer.truncate(0)
    buffer.seek(0)


def add_fps(img, fps):
    """在圖片上添加fps"""
    draw = ImageDraw.Draw(img)
    draw.text((100, 100), f"fps: {fps}", (255, 0, 0), ImageFont.truetype("arial.ttf", 100))
    return img


if __name__ == '__main__':    
    print("monitoring...")
    app.run("127.0.0.1", 9000)

到此這篇關(guān)于Python基于WebSocket實(shí)現(xiàn)簡易屏幕共享工具的文章就介紹到這了,更多相關(guān)Python WebSocket屏幕共享內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論