使用python進行websocket的客戶端壓力測試,這個代碼是從github上 找到。然后簡單修改了下。大神運用了進程池,以及線程池的內容。所以保存下來,學習學習
然后需要說明的是:本次用的python2.7,也嘗試用python3.6,但是老實出現websocket-client包和python3不能兼容的情況,提示沒有相關的方法。所以不得已最后又采用了python2
# -*- coding:utf-8 -*- # __author__ == 'chenmingle' import websocket import time import threading import json import multiprocessing import uuid from threadpool import ThreadPool, makeRequests # 修改成自己的websocket地址 WS_URL = "xxxx" # 定義進程數 processes = 4 # 定義線程數(每個文件可能限制1024個,可以修改fs.file等參數) thread_num = 700 index = 1 def on_message(ws, message): # print(message) pass def on_error(ws, error): print(error) pass def on_close(ws): # print("### closed ###") pass def on_open(ws): global index index = index + 1 def send_thread(): # 設置你websocket的內容 # 每隔10秒發送一下數據使鏈接不中斷 while True: ws.send(u'hello服務器') time.sleep(10) t = threading.Thread(target=send_thread) t.start() def on_start(num): time.sleep(5) # websocket.enableTrace(True) ws = websocket.WebSocketApp(WS_URL + str(num), on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever() def thread_web_socket(): # 線程池 pool_list = ThreadPool(thread_num) num = list() # 設置開啟線程的數量 for ir in range(thread_num): num.append(ir) requests = makeRequests(on_start, num) [pool_list.putRequest(req) for req in requests] pool_list.wait() if __name__ == "__main__": # 進程池 pool = multiprocessing.Pool(processes=processes) # 設置開啟進程的數量 for i in xrange(processes): pool.apply_async(thread_web_socket) pool.close() pool.join()
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
