parallel_pi.py
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
検索
|
最終更新
|
ヘルプ
|
ログイン
]
開始行:
[[第6回]]
#code(Python){{
#parallel_pi.py
# -*- coding: utf-8 -*-
#
import socket
import threading
import time
import sys
#通信クラス
class Com:
HOST = ''
PORT = 9998
clxa=[] #複数のclientのsocketとアドレスの組を格納するリスト
return_from_worker=[]
server_receiver = None
client_receiver = None
def start_server(self):
handle_thread = threading.Thread(target=self.server_thread, args=())
#生成した handle_threadの実行開始(start)
handle_thread.start()
def server_thread(self):
print("start server socket")
self.pid=0
#socket.socket()...接続先のsocketを作成
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#sock.binde ...socket をこのホスト('')のIPアドレスとPortに結合
self.soc.bind((self.HOST, self.PORT))
#sock.listen(10)...clientからの接続を待つ。10は最大の接続要求可能数
self.soc.listen(10)
while True:
#sock.accept()...client から接続があると、クライアントと接続する
#socket conと接続元IP address を返す
con,address = self.soc.accept()
clx=(con,address,self.pid) #clxに tuple (con, address,pid)を代入
self.clxa.append(clx) #clxaにclxを追加
print("[connect]{}".format(address))
self.pid=self.pid+1
#clientと送受信するためのthread, handler を生成。
handle_thread = threading.Thread(target=self.server_handler, args=(clx,))
#生成した handle_threadの実行開始(start)
handle_thread.start()
#client からsocketを通じてstreamの入力を行うthreadのhandler
def server_handler(self,clx):
print("at server, connected, start handler");
con=clx[0] #clx[0] は con(socket),
address=clx[1] #clx[1] は address
pid=clx[2] #cls[2] は processor id
while True:
try:
data = con.recv(1024) #socket con から最大1024byte 受信
recv_line=data.decode("utf-8")
print("[receive]{} - {}".format(address, recv_line))
self.server_receiver.server_rx(recv_line,pid)
except socket.error: #socketにエラーが発生したとき
con.close() #socketをclose
except:
print("server_handler receive error.")
#
def send_ith_worker(self,line,i):
#print("send_ith_worker("+line+","+str(i)+")")
cx=self.clxa[i]
try:
cx[0].send(line.encode('utf-8'))
except:
print("send_ith_worker error")
def start_client(self,host):
print("start client socket, connect, host="+host)
try:
#socket.socket ... socket を作成し、socに代入
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#IPアドレスがhostのserver socketにsocを接続
self.soc.connect((host,self.PORT))
#受信担当の関数handlerを物threadでうごかすhandle_threadを生成。
self.handle_thread=threading.Thread(target=self.client_handler,args=(self.soc,))
#handle_threadをstart
self.handle_thread.start()
except: #接続時にエラーがあった場合の処理
print("connect error.")
#受信の処理。送信threadとは別に、並行して処理を行う。
def client_handler(self,soc):
print("at client, connected, start handler")
while True:
try:
data = soc.recv(1024)
line=data.decode("utf-8")
print("[receive]- {}".format(line))
(self.client_receiver).worker_rx(line)
except socket.error:
soc.close()
break
except:
print("client_handler, receive error.")
def get_client_number(self):
return len(self.clxa)
def send_to_server(self,line):
self.soc.send(bytes(line,"utf-8"))
#
# server, clientにおいて、受け取ったデータを処理するreceiverの設定
def set_server_receiver(self,rec):
self.server_receiver=rec
def set_client_receiver(self,rec):
self.client_receiver=rec
#
def close_all(self):
for cx in self.clxa:
cx[0].close()
#Pi 並列計算クラス
class Pi:
isMaster=False
results=[]
def __init__(self,mx,px):
#SPMD Single Program, Multiple Data で表現
#px: mx が masterのときははworkerの数、workerのときは masterのIPアドレス。
self.com=Com()
if mx=="master":
self.isMaster=True
self.pn=int(px)
self.com.set_server_receiver(self)
else:
self.isMaster=False
self.master_ip=px
self.px=0
self.pn=0
self.com.set_client_receiver(self)
self.n=1000000
self.d=1.0/self.n
self.d2=self.d/2.0
self.s=0.0
if self.isMaster:
#master step 1: tcpサーバー起動、worker の接続を待つ
print("master step1, start TCP server, wait worker connection.")
host = socket.gethostname()
# ipアドレスを取得、表示
ip = socket.gethostbyname(host)
print(ip) # 192.168.○○○.○○○
print("workder number="+str(self.pn))
self.nrec=0
self.results=[0.0]*self.pn
#workerからの接続待ちstart
self.com.start_server()
#workerの数がpxになるまで待つ。
i=0
while i<self.pn:
i=self.com.get_client_number()
#master sterp 2: wokerに、worker数と担当部分を送る。
print("master step2, send the pn(number of workers) and pid(assign) to workers.")
self.nrec=0
start_time=time.time()
for i in range(self.pn):
sending=str(self.pn)+" "+str(i)
print("send "+sending+" to the "+str(i)+"th client")
self.com.send_ith_worker(sending,i) #send n
else:
#worker step 1: masterに接続
print("worker step1, connect to the master.")
(self.com).start_client(self.master_ip)
#worker step 2: master からの総数と割り当て通知を待つ
print("worker step2, wait until receiving pn and pid.")
while self.pn==0:
time.sleep(0.001)
#worker step 3. 担当部分のpiの計算
print("worker step3, calculate the assinge part of pi")
s=0.0
#割り当てられた部分のpiの計算
for i in range(0,self.n):
if self.pid==i%self.pn: #割り当て部分は、この式で求める。
x=i*self.d+self.d2
f=self.d/(x*x+1.0)
s=s+f
#print("x="+str(x)+",s="+str(s))
s=s*4.0
sending=str(s)
print("end calc. sending="+sending)
#worker step 4: masterに担当部分の計算結果を送る。
print("worker step4, send the worker's results to the master.")
self.com.send_to_server(sending)
if self.isMaster:
#master step 3: worker からの結果を待つ
print("master step 3. wait for the results of workers.")
while self.nrec<self.pn:
time.sleep(0.01)
#master step 4: 結果の総和と出力
print("master step 4.sum of all results of workers and output")
sum=0.0
for i in range(self.pn):
sum=sum+self.results[i]
print(sum)
end_time=time.time()
print("lapse="+str(end_time-start_time))
self.com.close_all()
exit(1)
def server_rx(self,x,pid):
print("server_rx x="+x+" pid="+str(pid)+" nrec="+str(self.nrec))
self.results[pid]=float(x)
self.nrec=self.nrec+1
print("nrec="+str(self.nrec))
def worker_rx(self,data):
print("worker_rx data="+data)
#print("pn="+str(self.pn))
rx=data.split()
self.pn=int(rx[0])
self.pid=int(rx[1])
print("pn="+str(self.pn))
# 起動
# master: $ python3 parallel_pi.py master <np>
# <np>は workerの数。
# workerが<np>個起動され、masterに接続されるまで待つ。
# master-ip=172.16.17.x #masterを起動すると、masterのIPアドレスが表示される。
#
# worker: $ python3 parallel_pi.py client 172.16.17.x
# masterのIPを指定して clientを起動
#
#
if __name__ == "__main__":
# master: python parallel_pi.py master <クライアントの数>
# client: python parallel_pi.py worker <masterのIPアドレス>
args=sys.argv
calpi=Pi(args[1],args[2])
}}
----
#counter
終了行:
[[第6回]]
#code(Python){{
#parallel_pi.py
# -*- coding: utf-8 -*-
#
import socket
import threading
import time
import sys
#通信クラス
class Com:
HOST = ''
PORT = 9998
clxa=[] #複数のclientのsocketとアドレスの組を格納するリスト
return_from_worker=[]
server_receiver = None
client_receiver = None
def start_server(self):
handle_thread = threading.Thread(target=self.server_thread, args=())
#生成した handle_threadの実行開始(start)
handle_thread.start()
def server_thread(self):
print("start server socket")
self.pid=0
#socket.socket()...接続先のsocketを作成
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#sock.binde ...socket をこのホスト('')のIPアドレスとPortに結合
self.soc.bind((self.HOST, self.PORT))
#sock.listen(10)...clientからの接続を待つ。10は最大の接続要求可能数
self.soc.listen(10)
while True:
#sock.accept()...client から接続があると、クライアントと接続する
#socket conと接続元IP address を返す
con,address = self.soc.accept()
clx=(con,address,self.pid) #clxに tuple (con, address,pid)を代入
self.clxa.append(clx) #clxaにclxを追加
print("[connect]{}".format(address))
self.pid=self.pid+1
#clientと送受信するためのthread, handler を生成。
handle_thread = threading.Thread(target=self.server_handler, args=(clx,))
#生成した handle_threadの実行開始(start)
handle_thread.start()
#client からsocketを通じてstreamの入力を行うthreadのhandler
def server_handler(self,clx):
print("at server, connected, start handler");
con=clx[0] #clx[0] は con(socket),
address=clx[1] #clx[1] は address
pid=clx[2] #cls[2] は processor id
while True:
try:
data = con.recv(1024) #socket con から最大1024byte 受信
recv_line=data.decode("utf-8")
print("[receive]{} - {}".format(address, recv_line))
self.server_receiver.server_rx(recv_line,pid)
except socket.error: #socketにエラーが発生したとき
con.close() #socketをclose
except:
print("server_handler receive error.")
#
def send_ith_worker(self,line,i):
#print("send_ith_worker("+line+","+str(i)+")")
cx=self.clxa[i]
try:
cx[0].send(line.encode('utf-8'))
except:
print("send_ith_worker error")
def start_client(self,host):
print("start client socket, connect, host="+host)
try:
#socket.socket ... socket を作成し、socに代入
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#IPアドレスがhostのserver socketにsocを接続
self.soc.connect((host,self.PORT))
#受信担当の関数handlerを物threadでうごかすhandle_threadを生成。
self.handle_thread=threading.Thread(target=self.client_handler,args=(self.soc,))
#handle_threadをstart
self.handle_thread.start()
except: #接続時にエラーがあった場合の処理
print("connect error.")
#受信の処理。送信threadとは別に、並行して処理を行う。
def client_handler(self,soc):
print("at client, connected, start handler")
while True:
try:
data = soc.recv(1024)
line=data.decode("utf-8")
print("[receive]- {}".format(line))
(self.client_receiver).worker_rx(line)
except socket.error:
soc.close()
break
except:
print("client_handler, receive error.")
def get_client_number(self):
return len(self.clxa)
def send_to_server(self,line):
self.soc.send(bytes(line,"utf-8"))
#
# server, clientにおいて、受け取ったデータを処理するreceiverの設定
def set_server_receiver(self,rec):
self.server_receiver=rec
def set_client_receiver(self,rec):
self.client_receiver=rec
#
def close_all(self):
for cx in self.clxa:
cx[0].close()
#Pi 並列計算クラス
class Pi:
isMaster=False
results=[]
def __init__(self,mx,px):
#SPMD Single Program, Multiple Data で表現
#px: mx が masterのときははworkerの数、workerのときは masterのIPアドレス。
self.com=Com()
if mx=="master":
self.isMaster=True
self.pn=int(px)
self.com.set_server_receiver(self)
else:
self.isMaster=False
self.master_ip=px
self.px=0
self.pn=0
self.com.set_client_receiver(self)
self.n=1000000
self.d=1.0/self.n
self.d2=self.d/2.0
self.s=0.0
if self.isMaster:
#master step 1: tcpサーバー起動、worker の接続を待つ
print("master step1, start TCP server, wait worker connection.")
host = socket.gethostname()
# ipアドレスを取得、表示
ip = socket.gethostbyname(host)
print(ip) # 192.168.○○○.○○○
print("workder number="+str(self.pn))
self.nrec=0
self.results=[0.0]*self.pn
#workerからの接続待ちstart
self.com.start_server()
#workerの数がpxになるまで待つ。
i=0
while i<self.pn:
i=self.com.get_client_number()
#master sterp 2: wokerに、worker数と担当部分を送る。
print("master step2, send the pn(number of workers) and pid(assign) to workers.")
self.nrec=0
start_time=time.time()
for i in range(self.pn):
sending=str(self.pn)+" "+str(i)
print("send "+sending+" to the "+str(i)+"th client")
self.com.send_ith_worker(sending,i) #send n
else:
#worker step 1: masterに接続
print("worker step1, connect to the master.")
(self.com).start_client(self.master_ip)
#worker step 2: master からの総数と割り当て通知を待つ
print("worker step2, wait until receiving pn and pid.")
while self.pn==0:
time.sleep(0.001)
#worker step 3. 担当部分のpiの計算
print("worker step3, calculate the assinge part of pi")
s=0.0
#割り当てられた部分のpiの計算
for i in range(0,self.n):
if self.pid==i%self.pn: #割り当て部分は、この式で求める。
x=i*self.d+self.d2
f=self.d/(x*x+1.0)
s=s+f
#print("x="+str(x)+",s="+str(s))
s=s*4.0
sending=str(s)
print("end calc. sending="+sending)
#worker step 4: masterに担当部分の計算結果を送る。
print("worker step4, send the worker's results to the master.")
self.com.send_to_server(sending)
if self.isMaster:
#master step 3: worker からの結果を待つ
print("master step 3. wait for the results of workers.")
while self.nrec<self.pn:
time.sleep(0.01)
#master step 4: 結果の総和と出力
print("master step 4.sum of all results of workers and output")
sum=0.0
for i in range(self.pn):
sum=sum+self.results[i]
print(sum)
end_time=time.time()
print("lapse="+str(end_time-start_time))
self.com.close_all()
exit(1)
def server_rx(self,x,pid):
print("server_rx x="+x+" pid="+str(pid)+" nrec="+str(self.nrec))
self.results[pid]=float(x)
self.nrec=self.nrec+1
print("nrec="+str(self.nrec))
def worker_rx(self,data):
print("worker_rx data="+data)
#print("pn="+str(self.pn))
rx=data.split()
self.pn=int(rx[0])
self.pid=int(rx[1])
print("pn="+str(self.pn))
# 起動
# master: $ python3 parallel_pi.py master <np>
# <np>は workerの数。
# workerが<np>個起動され、masterに接続されるまで待つ。
# master-ip=172.16.17.x #masterを起動すると、masterのIPアドレスが表示される。
#
# worker: $ python3 parallel_pi.py client 172.16.17.x
# masterのIPを指定して clientを起動
#
#
if __name__ == "__main__":
# master: python parallel_pi.py master <クライアントの数>
# client: python parallel_pi.py worker <masterのIPアドレス>
args=sys.argv
calpi=Pi(args[1],args[2])
}}
----
#counter
ページ名: