python&&多線程多進程及主機管理&&學習筆記
- 多線程知識回顧
- 關于python"假線程"的說法
- python是非線程安全的語言
- 基本多進程示例
- 基本多進程示例2
- 多進程之進程間通訊示例1
- 多進程間內存共享示例1
- 多進程之進程間通訊示例2
- 多進程間內存共享示例2
- 多進程間內存共享示例3
- 進程池Pool
- 利用pool產生多進程示例
- 多進程多線程執(zhí)行示例1
- IT審計
- 堡壘機的開發(fā)示例
- 審計堡壘機的安全控制
- shellinaboxd
- 異步
- Select 、 poll & epoll(異步IO模型)
- select通過單進程同時處理多個非阻塞的socket連接示例
多線程知識回顧
關于python"假線程"的說法
因為存在全局解釋器鎖的原因, 雖然python的程序看上去有多個線程同時工作,但其實同一時刻,僅有一個線程在CPU上工作,所以有python假線程的說法。
在IO密集型的業(yè)務中,使用多線程比單線程速度要快,因為單線程等待時間長;在CPU密集型的業(yè)務中,使用多線程不一定比單線程效率要高,因為線程的切換需要消耗大量資源。
python是非線程安全的語言
python是非線程安全的,需要程序員主動去添加控制。即程序員如果沒有對python程序添加線程安全控制,多個線程同時工作時會同時對一份數(shù)據(jù)進行操作,最終導致數(shù)據(jù)混亂。因此需要添加線程鎖保證數(shù)據(jù)的準確性(同一時刻只存在唯一一個對數(shù)據(jù)進行操作的線程)。
因為GIL(全局解釋器鎖)的存在,python中的多線程是存在弊端的,其不能利用多核的優(yōu)勢?
基本多進程示例
multiprocessingDemo.py
#!/usr/bin/env python
#coding:utf-8
#導入進程池Pool類
from
multiprocessing
import
Pool
import
time
#簡單的多線程多進程示例
def
f
(
x
)
:
time
.
sleep
(
1
)
# print(x)
print
(
x
*
x
)
if
__name__
==
"__main__"
:
#實例化Pool,里面最多放5個進程。
p
=
Pool
(
5
)
#map(函數(shù),列表):以串行的形式(按序單個執(zhí)行),將列表內的每一個值傳入函數(shù),最后返回列表
#p.map(函數(shù),列表):以多進程的形式(并發(fā)執(zhí)行),將列表內的每一個值傳入函數(shù),最后返回列表
print
(
p
.
map
(
f
,
range
(
10
)
)
)
基本多進程示例2
multiprocessingDemo2.py
#!/usr/bin/env python
#coding:utf-8
from
multiprocessing
import
Process
import
os
def
info
(
title
)
:
print
(
title
)
print
(
'module name:'
,
__name__
)
if
hasattr
(
os
,
'getppid'
)
:
#獲取父進程的pid
print
(
'parent process:'
,
os
.
getppid
(
)
)
print
(
'process id :'
,
os
.
getpid
(
)
)
def
f
(
name
)
:
info
(
'function f'
)
print
(
'hello,'
,
name
)
if
__name__
==
'__main__'
:
#在主(父)進程調用函數(shù)info
info
(
'main file'
)
print
(
'----------'
)
#啟動一個子進程p.進程p調用函數(shù)f,函數(shù)f調用函數(shù)info
p
=
Process
(
target
=
f
,
args
=
(
'bob'
,
)
)
p
.
start
(
)
p
.
join
(
)
#如上所示,主進程與子進程同時運行。子進程的ppid為父進程的pid.
#子進程會完全復制父進程的內存空間,多(n)進程占用的內存空間是單進程的n倍。
#每一個進程至少會有一個(主)線程。
多進程之進程間通訊示例1
如下圖代碼段和log所示,啟動10個進程往列表info里面插入數(shù)據(jù),此時列表info的數(shù)據(jù)僅存在一個,即多進程之間內存數(shù)據(jù)相互獨立;啟動10個線程往列表info里面插入數(shù)據(jù),此時列表info的數(shù)據(jù)有十個,即多線程之間內存數(shù)據(jù)共享。
#!/usr/bin/env python
#coding:utf-8
from
multiprocessing
import
Process
import
threading
import
time
def
run
(
info_list
,
n
)
:
info_list
.
append
(
n
)
print
(
info_list
)
info
=
[
]
#啟動十個進程
if
__name__
==
"__main__"
:
for
i
in
range
(
10
)
:
p
=
Process
(
target
=
run
,
args
=
[
info
,
i
]
)
p
.
start
(
)
time
.
sleep
(
3
)
print
(
'------threading--------'
)
for
i
in
range
(
10
)
:
t
=
threading
.
Thread
(
target
=
run
,
args
=
[
info
,
i
]
)
t
.
start
(
)
'''
log:
[0]
[1]
[2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]
------threading--------
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5, 6, 7]
[0, 1, 2, 3, 4, 5, 6, 7, 8]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
'''
多進程間內存共享示例1
如下圖所示,借助queue模塊可以實現(xiàn)多進程間的通訊
queue內的數(shù)據(jù)在進程間是統(tǒng)一的。因為此時的queue是從multiprocessing中引入的(經過multiprocessing封裝過的),
#!/usr/bin/env python
#coding:utf-8
#導入模塊,其中Process,Queue均屬于multiprocessing內部的類
from
multiprocessing
import
Process
,
Queue
def
f
(
q
,
n
)
:
q
.
put
(
[
n
,
'hello'
]
)
#啟動十個進程
if
__name__
==
"__main__"
:
q
=
Queue
(
)
for
i
in
range
(
10
)
:
p
=
Process
(
target
=
f
,
args
=
[
q
,
i
]
)
p
.
start
(
)
for
i
in
range
(
10
)
:
print
(
q
.
get
(
)
)
多進程之進程間通訊示例2
#!/usr/bin/env python
#coding:utf-8
#導入模塊,其中Process,Queue均屬于multiprocessing內部的類
from
multiprocessing
import
Process
,
Lock
def
f
(
l
,
i
)
:
#使用進程鎖可以控制多進程使用同一屏幕的打印輸出
#此時的lock完全克隆自threading里面的lock,/
l
.
acquire
(
)
print
(
'hello,world'
,
i
)
l
.
release
(
)
if
__name__
==
"__main__"
:
lock
=
Lock
(
)
for
num
in
range
(
10
)
:
Process
(
target
=
f
,
args
=
(
lock
,
num
)
)
.
start
(
)
'''
log:
hello,world 1
hello,world 3
hello,world 0
hello,world 2
hello,world 4
hello,world 5
hello,world 6
hello,world 7
hello,world 8
hello,world 9
'''
多進程間內存共享示例2
多進程間可以通過queue/array/value實現(xiàn)進程間的內存共享。
#!/usr/bin/env python
#coding:utf-8
from
multiprocessing
import
Value
,
Array
,
Process
def
f
(
n
,
a
)
:
n
.
value
=
3.1415926
for
i
in
range
(
5
)
:
#對Array列表的按索引排列的前5個元素取反
a
[
i
]
=
-
a
[
i
]
if
__name__
==
"__main__"
:
#生成Value對象,'d'代表有小數(shù)點的數(shù)字
num
=
Value
(
'd'
,
0.0
)
#生成Array對象,'i'代表整數(shù),range(10)代表生成1到10之間的列表
arr
=
Array
(
'i'
,
range
(
10
)
)
#創(chuàng)建執(zhí)行子進程p,執(zhí)行完成后,父進程中的value和array對象的值隨子進程p的操作而改變,即通過value和array可以實現(xiàn)多進程間的內存數(shù)據(jù)交互。
p
=
Process
(
target
=
f
,
args
=
(
num
,
arr
)
)
p
.
start
(
)
p
.
join
(
)
print
(
num
.
value
)
#為什么在方括號內部加冒號?因為如果不加冒號,該行代碼將會報錯(打印一個實例出來?)。
print
(
arr
[
:
]
)
'''
log:
3.1415926
[0, -1, -2, -3, -4, 5, 6, 7, 8, 9]
'''
多進程間內存共享示例3
#!/usr/bin/env python
#coding:utf-8
#使用Manager模塊實現(xiàn)多進程間內存數(shù)據(jù)同步
from
multiprocessing
import
Process
,
Manager
def
f
(
d
,
l
)
:
d
[
1
]
=
'1'
d
[
'2'
]
=
2
d
[
0.25
]
=
None
#對字典列表元素進行反轉
l
.
reverse
(
)
if
__name__
==
"__main__"
:
#實例化
manager
=
Manager
(
)
#父進程聲明manager.字典
d
=
manager
.
dict
(
)
#父進程聲明manager.列表
l
=
manager
.
list
(
range
(
10
)
)
print
(
'before process p :'
,
d
)
print
(
'before process p :'
,
l
)
#創(chuàng)建子進程,子進程調用函數(shù)f對manager.字典填充數(shù)據(jù),對manager.列表進行翻轉。
p
=
Process
(
target
=
f
,
args
=
(
d
,
l
)
)
p
.
start
(
)
p
.
join
(
)
#父進程打印manager.字典和manager.列表,此時數(shù)據(jù)均被子進程執(zhí)行函數(shù)f改變。
print
(
'after process p :'
,
d
)
print
(
'after process p :'
,
l
)
'''
log:
before process p : {}
before process p : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
after process p : {1: '1', '2': 2, 0.25: None}
after process p : [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
'''
進程池Pool
利用pool產生多進程示例
#!/usr/bin/env python
#coding:utf-8
from
multiprocessing
import
Pool
import
time
def
f
(
x
)
:
print
(
x
*
x
)
time
.
sleep
(
1
)
return
x
*
x
if
__name__
==
"__main__"
:
#"processes=4"代表最大能同時運行四個進程
pool
=
Pool
(
processes
=
4
)
resList
=
[
]
for
i
in
range
(
10
)
:
#異步執(zhí)行。一次性將10個進程的實例啟動并放置在pool內。
#"res = pool.apply(f,[i,])"為同步執(zhí)行(串行執(zhí)行)
res
=
pool
.
apply_async
(
f
,
[
i
,
]
)
#將實例化成的對象存儲到列表中
resList
.
append
(
res
)
#從列表中取執(zhí)行的結果.設置timeout超時時間,超過超時時間則報超時錯誤。
for
i
in
resList
:
print
(
i
.
get
(
timeout
=
5
)
)
#查看啟動結果。res.get()方法會等著進程執(zhí)行完畢然后返回結果,會導致阻塞及多進程串行啟動,因此該代碼位置需放置在循環(huán)外部。如上所示。
#res.get()
#使用pool.map可以向結果返回到列表內
print
(
pool
.
map
(
f
,
range
(
10
)
)
)
'''
log;
0
1
4
9
16
0
25
1
36
4
49
9
64
16
81
25
36
49
64
81
0
1
4
9
16
25
36
49
64
81
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
'''
p.start()和p.join()
start()和join()一起使用,可以令并發(fā)執(zhí)行的多進程達到串行執(zhí)行的效果。join()帶有等待的作用。
p.join()與res.get()與pool.apply(f,[i,])均會因為存在等待結果的特性而導致在多進程執(zhí)行時出現(xiàn)串行執(zhí)行的效果。
同時并發(fā)跑多少個進程是合適的?
看cpu的核數(shù),一般并發(fā)進程數(shù)量等于cpu的核數(shù)(或兩倍)即可。過多容易導致出現(xiàn)僵尸進程,即父進程與子進程失聯(lián)。出現(xiàn)僵尸進程后,子進程處于不受管理狀態(tài),最后被操作系統(tǒng)的根進程回收,回收后會處于閑置無法殺死的狀態(tài)。唯一的方法是等待機器重啟或因為特殊原因子進程消失。
ps 命令中的defunct狀態(tài)即代表僵尸進程狀態(tài),僵尸進程過多會對操作系統(tǒng)造成很大的壓力。
以導致僵尸進程出現(xiàn)的用戶身份登錄服務器去殺死僵尸進程會不會成功?
多進程多線程執(zhí)行示例1
#!/usr/bin/env python
#coding:utf-8
from
multiprocessing
import
Pool
import
time
import
threading
import
os
def
threadFun
(
m
,
n
)
:
print
(
'第 %s個進程創(chuàng)建了第 %s個線程,進程pid為%s:'
,
(
m
,
n
,
os
.
getpid
(
)
)
)
def
f
(
x
)
:
for
i
in
range
(
3
)
:
t
=
threading
.
Thread
(
target
=
threadFun
,
args
=
(
x
,
i
)
)
t
.
start
(
)
print
(
'第'
,
x
,
'個進程開始創(chuàng)建線程啦'
)
if
__name__
==
"__main__"
:
#"processes=4"代表最大能同時運行四個進程
pool
=
Pool
(
processes
=
4
)
resList
=
[
]
for
i
in
range
(
3
)
:
#異步執(zhí)行。一次性將10個進程的實例啟動并放置在pool內。
#"res = pool.apply(f,[i,])"為同步執(zhí)行(串行執(zhí)行)
res
=
pool
.
apply_async
(
f
,
[
i
,
]
)
#將實例化成的對象存儲到列表中
resList
.
append
(
res
)
#從列表中取執(zhí)行的結果.設置timeout超時時間,超過超時時間則報超時錯誤。
for
i
in
resList
:
print
(
i
.
get
(
timeout
=
5
)
)
#查看啟動結果。res.get()方法會等著進程執(zhí)行完畢然后返回結果,會導致阻塞及多進程串行啟動,因此該代碼位置需放置在循環(huán)外部。如上所示。
#res.get()
#使用pool.map可以向結果返回到列表內
#print(pool.map(f,range(10)))
'''
log:
第 %s個進程創(chuàng)建了第 %s個線程,進程pid為%s: (0, 0, 193136)
第 0 個進程開始創(chuàng)建線程啦
第 %s個進程創(chuàng)建了第 %s個線程,進程pid為%s: (0, 1, 193136)
第 0 個進程開始創(chuàng)建線程啦
第 0 個進程開始創(chuàng)建線程啦
None
第 %s個進程創(chuàng)建了第 %s個線程,進程pid為%s: (0, 2, 193136)
第 1 個進程開始創(chuàng)建線程啦
第 %s個進程創(chuàng)建了第 %s個線程,進程pid為%s: (1, 0, 193136)
第 1 個進程開始創(chuàng)建線程啦
第 %s個進程創(chuàng)建了第 %s個線程,進程pid為%s: (1, 1, 193136)
第 1 個進程開始創(chuàng)建線程啦
None
第 %s個進程創(chuàng)建了第 %s個線程,進程pid為%s: (1, 2, 193136)
第 %s個進程創(chuàng)建了第 %s個線程,進程pid為%s: (2, 0, 193136)
第 2 個進程開始創(chuàng)建線程啦
第 2 個進程開始創(chuàng)建線程啦
第 %s個進程創(chuàng)建了第 %s個線程,進程pid為%s: (2, 1, 193136)
第 2 個進程開始創(chuàng)建線程啦
None
第 %s個進程創(chuàng)建了第 %s個線程,進程pid為%s: (2, 2, 193136)
'''
IT審計
堡壘機的開發(fā)示例
作用:
為了安全起見,必須在防火墻上做限制,運維區(qū)用戶僅能訪問登錄堡壘機,堡壘機能訪問機房服務器。即堡壘機是運維區(qū)用戶操作機房服務器的唯一接口。
interactive.py
def
posix_shell
(
chan
)
:
import
select
oldtty
=
termios
.
tcgetattr
(
sys
.
stdin
)
#打開文件夾,存儲列表records內的數(shù)據(jù)
f
=
file
(
'records.txt'
,
'ab+'
)
try
:
tty
.
setraw
(
sys
.
stdin
.
fileno
(
)
)
tty
.
setcbreak
(
sys
.
stdin
.
fileno
(
)
)
chan
.
settimeout
(
0.0
)
#定義一個列表,存放用戶在Linux服務器上執(zhí)行的命令
records
=
[
]
while
True
:
r
,
w
,
e
=
select
.
select
(
[
chan
,
sys
.
stdin
]
,
[
]
,
[
]
)
if
chan
in
r
:
try
:
x
=
u
(
chan
.
recv
(
1024
)
)
if
len
(
x
)
==
0
:
sys
.
stdout
.
write
(
"\r\n*** EOF\r\n"
)
break
sys
.
stdout
.
write
(
x
)
sys
.
stdout
.
flush
(
)
except
socket
.
timeout
:
pass
if
sys
.
stdin
in
r
:
x
=
sys
.
stdin
.
read
(
1
)
#在循環(huán)代碼中,使用append將用戶輸入的每一個字符依次存入列表records內
records
.
append
(
x
)
#如果用戶敲擊回車符
if
x
==
'\r'
:
#拼接列表records并顯示
f
.
write
(
''
.
join
(
records
)
.
replace
(
'\r'
,
'\n'
)
)
#置空列表records,重新存儲用戶下一次執(zhí)行的命令
records
=
[
]
if
len
(
x
)
==
0
:
break
chan
.
send
(
x
)
finally
:
termios
.
tcsetattr
(
sys
.
stdin
,
termios
.
TCSADRAIN
,
oldtty
)
f
.
close
(
)
審計堡壘機的安全控制
操作步驟:
#1.在linux服務器上創(chuàng)建新用戶,并修改用戶的環(huán)境變量文件
root@AY1402251551519263aeZ:~
# vim /home/opensips/.bashrc
#2.修改用戶的環(huán)境變量文件,清空其他不需要的部分,添加以下python文件,實現(xiàn)該用戶身份一經登錄服務器便執(zhí)行固定python文件,執(zhí)行結束后立即登出服務器。即用戶無法隨意登錄服務器隨意進行自由操作。
echo
"================================================="
#sudo /usr/bin/python /usr/local/Triaquae2/bin/TriAquae_console
/usr/bin/python /home/audit_agent/menu.py
logout
#3.新用戶遠程登錄堡壘機
#4.選擇機房服務器
#5.登出堡壘機
shellinaboxd
一款基于web的shell軟件
1.編譯安裝
2.無證書狀態(tài)啟動
./shellinaboxd -t
3.web頁面打開http://xxx.xxx.xxx.xxx:4200遠程登錄服務器
異步
Select 、 poll & epoll(異步IO模型)
Select
一個線程實現(xiàn)并發(fā)操作需要使用異步執(zhí)行。
在Linux 服務器上使用"ulimit -n "默認顯示的是終端同時打開的最大文件數(shù),也是select能監(jiān)控的最大文件句柄數(shù)。select不斷的在循環(huán)監(jiān)控文件句柄,不斷的將數(shù)據(jù)從內核態(tài)復制到用戶態(tài)。文件句柄數(shù)越大,真正在工作的文件越少,select的效益就越低。
python的select()方法直接調用操作系統(tǒng)的IO接口,它監(jiān)控sockets,open files,and pipes(所以帶fileno)方法的文件句柄何時變?yōu)閞eadable和writeable,或者通訊錯誤,select使得到同時監(jiān)控多個連接變得簡單。并且這比寫一個長循環(huán)來等待和監(jiān)控多客戶端連接更高效,因為select直接通過操作系統(tǒng)提供的c的網絡接口進行操作,而不是通過python解釋器。
poll
poll沒有最大連接數(shù)限制,缺點:包含大量文件描述符的數(shù)組被整體復制于用戶態(tài)和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨著文件描述符數(shù)量的增加而增加。
另外,select()和poll()將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調用select()和poll()的時候將再次報告這些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為水平觸發(fā)。
epoll
屬于由內核直接支持的實現(xiàn)方法。epoll可以同時支持水平觸發(fā)和邊緣觸發(fā)(只告訴進程哪些文件描述符剛剛變?yōu)榫途w狀態(tài),它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發(fā)),理論上邊緣觸發(fā)的性能要高一些,但代碼實現(xiàn)相當復雜。
epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符是,返回的不是實際的描述符,而是一個代表就緒描述符數(shù)量的值,你只要去epoll指定的一個數(shù)組中依次取得相應數(shù)量的文件描述符即可,這里也使用了內存映射技術,這樣便徹底省掉了這些文件描述符在系統(tǒng)調用時復制的開銷。
另一個本質的改進在于epoll采用了基于事件的就緒通知方式,在select和poll中,進程只有在調用一定的方法后,內核才會對所有監(jiān)視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基于某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。
select通過單進程同時處理多個非阻塞的socket連接示例
sockSelect.py
#!/usr/bin/env python
#coding:utf-8
import
select
import
socket
import
sys
import
queue
#create a TCP/IP socket
server
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
#設置為非阻塞模式
server
.
setblocking
(
0
)
#bind the socket to the port
server_address
=
(
'localhost'
,
10000
)
print
(
sys
.
stderr
,
'starting up on %s port %s'
)
server
.
bind
(
server_address
)
#listen for incoming connections
#maximun number of connections 5
server
.
listen
(
5
)
#select同時監(jiān)控三個 通信列表
#第一個:所有的輸入的data,就是指外部發(fā)過來的數(shù)據(jù)
#sockets from which we except to read
inputs
=
[
server
]
#第二個:監(jiān)控和接收所有要發(fā)出去的data
#sockets to which we expect to write
outputs
=
[
]
#第三個:監(jiān)控錯誤信息
#所有客戶端的進來的連接和數(shù)據(jù)將會被server的主循環(huán)放在上面的list中處理,我們現(xiàn)在的server端需要等待連接可寫之后才能過來,
#然后接收數(shù)據(jù)并返回(因此不是在接收數(shù)據(jù)之后就立刻返回),因為每個連接要把輸入或輸出的數(shù)據(jù)先緩存到queue里,然后再由select取出來再發(fā)出去。
#outgoing message queue (socket:queue)
#定義空字典message_queue
message_queue
=
{
}
while
inputs
:
print
(
'waiting for the next event'
)
readable
,
writable
,
exceptional
=
select
.
select
(
inputs
,
outputs
,
inputs
)
#當你把inputs、outputs、exceptional(這里與inputs共用)傳給select之后,它返回3個新的list,
#我們上面把他們分別賦值為readable、writable、exceptional,所有在readable list 中的socket代表有數(shù)據(jù)可接受,
#所有在writable list 中的存放著你可以對其進行發(fā)送操作的socket連接,當通道出現(xiàn)error時會把error寫到exception列表中
#readable list 中的socket可以有三種可能狀態(tài),第一種是如果這個socket是main "server" socket,它負責監(jiān)聽客戶端的連接,
#如果這個main server socket出現(xiàn)在readable 里面,那代表這是server端已經ready來接收一個新的連接進來了,為了讓這個main server能同時處理多個連接,
#在下面的代碼里,我們把這main server 的 socket設置為非阻塞模式。
#循環(huán)處理readable里面的數(shù)據(jù)
for
s
in
readable
:
#如果s是一個客戶端對象
if
s
is
server
:
#返回客戶端對象connection,和客戶端地址client_address
connection
,
client_address
=
s
.
accept
(
)
print
(
sys
.
stderr
,
'new connection from '
,
client_address
)
#排隊,形成類似非阻塞模式
connection
.
setblocking
(
0
)
#將客戶端對象放入inputs列表內
inputs
.
append
(
connection
)
#為字典message_queu賦值,key為客戶端對象connection,value為隊列對象(存放客戶端發(fā)送過來的數(shù)據(jù))
message_queue
[
connection
]
=
queue
.
Queue
(
)
#如果s不是客戶端對象,而是插入到字典內的客戶端對象,則接受客戶端對象數(shù)據(jù)并存儲到字典內部
else
:
data
=
s
.
recv
(
1024
)
if
data
:
print
(
sys
.
stderr
,
'received "%s" from %s'
%
(
data
,
client_address
)
)
#取客戶端數(shù)據(jù)存放在字典內部
message_queue
[
s
]
.
put
(
data
)
#判斷已經存放了客戶端數(shù)據(jù)的客戶端對象是否在outputs列表中,若沒有,將該客戶端對象插入outputs列表中。
if
s
not
in
outputs
:
outputs
.
append
(
s
)
else
:
print
(
sys
.
stderr
,
'closing'
,
client_address
)
if
s
in
outputs
:
outputs
.
remove
(
s
)
inputs
.
remove
(
s
)
s
.
close
(
)
del
message_queue
[
s
]
for
s
in
writable
:
try
:
#對照writable列表從字典message_queue里面取數(shù)據(jù)(非阻塞模式,字典沒空就拋異常再次重復輪訓)
next_msg
=
message_queue
[
s
]
.
get_nowait
(
)
except
queue
.
Empty
:
print
(
sys
.
stderr
,
'output queue for '
,
s
.
getpeername
(
)
)
outputs
.
remove
(
s
)
else
:
print
(
sys
.
stderr
,
'sending "%s" to %s'
%
(
next_msg
,
client_address
)
)
s
.
send
(
next_msg
.
upper
(
)
)
for
s
in
exceptional
:
print
(
sys
.
stderr
,
'handling exceptional condition for'
)
inputs
.
remove
(
s
)
if
s
in
outputs
:
outputs
.
remove
(
s
)
s
.
close
(
)
del
message_queue
[
s
]
#
sockSelectClient.py
#!/usr/bin/env python
#coding:utf-8
import
socket
import
sys
messages
=
[
'this is a message'
,
'it will be sent'
,
'in parts'
,
]
server_address
=
(
'localhost'
,
10000
)
socks
=
[
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
,
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
,
]
print
(
sys
.
stderr
,
'connecting to %s port %s'
%
server_address
)
for
s
in
socks
:
s
.
connect
(
server_address
)
for
message
in
messages
:
for
s
in
socks
:
print
(
sys
.
stderr
,
'%s:sending "%s"'
%
(
s
.
getsockname
(
)
,
message
)
)
s
.
send
(
message
)
for
s
in
socks
:
data
=
s
.
recv
(
1024
)
print
(
sys
.
stderr
,
'%s:receives "%s"'
%
(
s
.
getsockname
(
)
,
data
)
)
if
not
data
:
print
(
sys
.
stderr
,
'closing socket'
,
s
.
getsockname
(
)
)
更多文章、技術交流、商務合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
