#author("2023-01-21T08:25:12+09:00","default:parallel-distributed","parallel-distributed") #author("2023-01-23T23:07:27+09:00","default:parallel-distributed","parallel-distributed") [[第6回]] #code(Python){{ #parallel_pi.py # -*- coding: utf-8 -*- # import socket import threading import time import sys #通信クラス class Com: HOST = '' PORT = 9998 clxa=[] #複数のclientのsocketとアドレスの組を格納するリスト return_from_worker=[] server_receiver = None client_receiver = None def start_server(self): handle_thread = threading.Thread(target=self.server_thread, args=()) #生成した handle_threadの実行開始(start) handle_thread.start() def server_thread(self): print("start server socket") self.pid=0 #socket.socket()...接続先のsocketを作成 self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #sock.binde ...socket をこのホスト('')のIPアドレスとPortに結合 self.soc.bind((self.HOST, self.PORT)) #sock.listen(10)...clientからの接続を待つ。10は最大の接続要求可能数 self.soc.listen(10) while True: #sock.accept()...client から接続があると、クライアントと接続する #socket conと接続元IP address を返す con,address = self.soc.accept() clx=(con,address,self.pid) #clxに tuple (con, address,pid)を代入 self.clxa.append(clx) #clxaにclxを追加 print("[connect]{}".format(address)) self.pid=self.pid+1 #clientと送受信するためのthread, handler を生成。 handle_thread = threading.Thread(target=self.server_handler, args=(clx,)) #生成した handle_threadの実行開始(start) handle_thread.start() #client からsocketを通じてstreamの入力を行うthreadのhander #client からsocketを通じてstreamの入力を行うthreadのhandler def server_handler(self,clx): print("at server, connected, start handler"); con=clx[0] #clx[0] は con(socket), address=clx[1] #clx[1] は address pid=clx[2] #cls[2] は processor id while True: try: data = con.recv(1024) #socket con から最大1024byte 受信 recv_line=data.decode("utf-8") print("[receive]{} - {}".format(address, recv_line)) self.server_receiver.server_rx(recv_line,pid) except socket.error: #socketにエラーが発生したとき con.close() #socketをclose except: print("server_handler receive error.") # def send_ith_worker(self,line,i): #print("send_ith_worker("+line+","+str(i)+")") cx=self.clxa[i] try: cx[0].send(line.encode('utf-8')) except: print("send_ith_worker error") def start_client(self,host): print("start client socket, connect, host="+host) try: #socket.socket ... socket を作成し、socに代入 self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #IPアドレスがhostのserver socketにsocを接続 self.soc.connect((host,self.PORT)) #受信担当の関数handlerを物threadでうごかすhandle_threadを生成。 self.handle_thread=threading.Thread(target=self.client_handler,args=(self.soc,)) #handle_threadをstart self.handle_thread.start() except: #接続時にエラーがあった場合の処理 print("connect error.") #受信の処理。送信threadとは別に、並行して処理を行う。 def client_handler(self,soc): print("at client, connected, start handler") while True: try: data = soc.recv(1024) line=data.decode("utf-8") print("[receive]- {}".format(line)) (self.client_receiver).worker_rx(line) except socket.error: soc.close() break except: print("client_handler, receive error.") def get_client_number(self): return len(self.clxa) def send_to_server(self,line): self.soc.send(bytes(line,"utf-8")) # # server, clientにおいて、受け取ったデータを処理するreceiverの設定 def set_server_receiver(self,rec): self.server_receiver=rec def set_client_receiver(self,rec): self.client_receiver=rec # def close_all(self): for cx in self.clxa: cx[0].close() #Pi 並列計算クラス class Pi: isMaster=False results=[] def __init__(self,mx,px): #SPMD Single Program, Multiple Data で表現 #px: mx が masterのときははworkerの数、workerのときは masterのIPアドレス。 self.com=Com() if mx=="master": self.isMaster=True self.pn=int(px) self.com.set_server_receiver(self) else: self.isMaster=False self.master_ip=px self.px=0 self.pn=0 self.com.set_client_receiver(self) self.n=1000000 self.d=1.0/self.n self.d2=self.d/2.0 self.s=0.0 if self.isMaster: #master step 1: tcpサーバー起動、worker の接続を待つ print("master step1, start TCP server, wait worker connection.") host = socket.gethostname() # ipアドレスを取得、表示 ip = socket.gethostbyname(host) print(ip) # 192.168.○○○.○○○ print("workder number="+str(self.pn)) self.nrec=0 self.results=[0.0]*self.pn #workerからの接続待ちstart self.com.start_server() #workerの数がpxになるまで待つ。 i=0 while i<self.pn: i=self.com.get_client_number() #master sterp 2: wokerに、worker数と担当部分を送る。 print("master step2, send the pn(number of workers) and pid(assign) to workers.") self.nrec=0 start_time=time.time() for i in range(self.pn): sending=str(self.pn)+" "+str(i) print("send "+sending+" to the "+str(i)+"th client") self.com.send_ith_worker(sending,i) #send n else: #worker step 1: masterに接続 print("worker step1, connect to the master.") (self.com).start_client(self.master_ip) #worker step 2: master からの総数と割り当て通知を待つ print("worker step2, wait until receiving pn and pid.") while self.pn==0: time.sleep(0.001) #worker step 3. 担当部分のpiの計算 print("worker step3, calculate the assinge part of pi") s=0.0 #割り当てられた部分のpiの計算 for i in range(0,self.n): if self.pid==i%self.pn: #割り当て部分は、この式で求める。 x=i*self.d+self.d2 f=self.d/(x*x+1.0) s=s+f #print("x="+str(x)+",s="+str(s)) s=s*4.0 sending=str(s) print("end calc. sending="+sending) #worker step 4: masterに担当部分の計算結果を送る。 print("worker step4, send the worker's results to the master.") self.com.send_to_server(sending) if self.isMaster: #master step 3: worker からの結果を待つ print("master step 3. wait for the results of workers.") while self.nrec<self.pn: time.sleep(0.01) #master step 4: 結果の総和と出力 print("master step 4.sum of all results of workers and output") sum=0.0 for i in range(self.pn): sum=sum+self.results[i] print(sum) end_time=time.time() print("lapse="+str(end_time-start_time)) self.com.close_all() exit(1) def server_rx(self,x,pid): print("server_rx x="+x+" pid="+str(pid)+" nrec="+str(self.nrec)) self.results[pid]=float(x) self.nrec=self.nrec+1 print("nrec="+str(self.nrec)) def worker_rx(self,data): print("worker_rx data="+data) #print("pn="+str(self.pn)) rx=data.split() self.pn=int(rx[0]) self.pid=int(rx[1]) print("pn="+str(self.pn)) # 起動 # master: $ python3 parallel_pi.py master <np> # <np>は workerの数。 # workerが<np>個起動され、masterに接続されるまで待つ。 # master-ip=172.16.17.x #masterを起動すると、masterのIPアドレスが表示される。 # # worker: $ python3 parallel_pi.py client 172.16.17.x # masterのIPを指定して clientを起動 # # if __name__ == "__main__": # master: python parallel_pi.py master <クライアントの数> # client: python parallel_pi.py worker <masterのIPアドレス> args=sys.argv calpi=Pi(args[1],args[2]) }} ---- #counter