#author("2024-11-13T23:48:54+09:00","default:parallel-distributed","parallel-distributed") #author("2024-11-16T21:38:26+09:00","default:parallel-distributed","parallel-distributed") [[第7回]] #code(Python){{ # parallel_mandel.py # -*- coding: utf-8 -*- # import socket import threading import time import sys #import matplotlib.pyplot as plt #import numpy as np import tkinter as tk import re # 通信クラス class Com: HOST = '' PORT = 9998 clxa = [] # 複数のclientのsocketとアドレスの組を格納するリスト return_from_worker = [] server_receiver = None client_receiver = None def start_server(self): handle_thread = threading.Thread(target=self.server_thread, args=()) # 生成した handle_threadの実行開始(start) handle_thread.start() def server_thread(self): print("start server socket") self.pid = 0 # socket.socket()...接続先のsocketを作成 self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # sock.binde ...socket をこのホスト('')のIPアドレスとPortに結合 self.soc.bind((self.HOST, self.PORT)) # sock.listen(10)...clientからの接続を待つ。10は最大の接続要求可能数 self.soc.listen(10) while True: # sock.accept()...client から接続があると、クライアントと接続する # socket conと接続元IP address を返す con, address = self.soc.accept() clx = (con, address, self.pid) # clxに tuple (con, address,pid)を代入 self.clxa.append(clx) # clxaにclxを追加 print("[connect]{}".format(address)) self.pid = self.pid+1 # clientと送受信するためのthread, handler を生成。 handle_thread = threading.Thread( target=self.server_handler, args=(clx,)) # 生成した handle_threadの実行開始(start) handle_thread.start() # client からsocketを通じてstreamの入力を行うthreadのhander def server_handler(self, clx): print("at server, connected, start handler") con = clx[0] # clx[0] は con(socket), address = clx[1] # clx[1] は address pid = clx[2] # cls[2] は processor id while True: try: data = con.recv(1024) # socket con から最大1024byte 受信 recv_line = data.decode("utf-8") #print("[receive]{} - {}".format(address, recv_line)) self.server_receiver.server_rx(recv_line, pid) except socket.error: # socketにエラーが発生したとき con.close() # socketをclose break except: print("server_handler receive error.") break fx=con.makefile('r') try: while True: l=fx.readline() self.server_receiver.server_rx(l,pid) except socket.error: # socketにエラーが発生したとき print("socket error. exit") con.close() # socketをclose except: import traceback traceback.print_exc() print("server_handler receive error.") #recv_line = "" #receiving="" #while True: # try: # #data = con.recv(1024) # socket con から最大1024byte 受信 一般的には複数の行を含む。 # #recv_line = recv_line+data.decode("utf-8") #前の行が途中で終わっていた場合、その部分を先頭に加える。 # ##print("[receive]{} - {}".format(address, recv_line)) # #if recv_line.endswith('\n'): #行が途中で終わっていない。 # # #print("recv_line is endswith newline") # # receiving="no rest" # #else: #行が途中で終わっている。 # # #print("recv_line is not endswith newline") # # receiving="rest" # #lx=recv_line.splitlines() # ##print("receiving="+receiving) # #for il in range(len(lx)-1): #最後の行の手前まで、一行ずつ処理 # # l=lx[il] # # self.server_receiver.server_rx(l, pid) #受け取った1行を処理 # #last_line=lx[len(lx)-1] #最後の行。 # ##print("last_line="+last_line) # #if receiving=="rest": #行が途中で終わっている場合、その部分を次の行の先頭に加える。 # # #print("rest, recv_line="+last_line) # # recv_line=last_line # #else: # # #print("no rest") # # self.server_receiver.server_rx(last_line,pid) # # recv_line="" # except socket.error: # socketにエラーが発生したとき # print("socket error. exit") # con.close() # socketをclose # break # except: # import traceback # traceback.print_exc() # print("server_handler receive error.") # break # def send_ith_worker(self, line, i): # print("send_ith_worker("+line+","+str(i)+")") cx = self.clxa[i] try: cx[0].send(line.encode('utf-8')) cx[0].send(bytes(line+'\n','utf-8')) except: print("send_ith_worker error") def start_client(self, host): print("start client socket, connect, host="+host) try: # socket.socket ... socket を作成し、socに代入 self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # IPアドレスがhostのserver socketにsocを接続 self.soc.connect((host, self.PORT)) # 受信担当の関数handlerを物threadでうごかすhandle_threadを生成。 self.handle_thread = threading.Thread( target=self.client_handler, args=(self.soc,)) # handle_threadをstart self.handle_thread.start() except: # 接続時にエラーがあった場合の処理 import traceback traceback.print_exc() print("connect error.") # 受信の処理。送信threadとは別に、並行して処理を行う。 def client_handler(self, soc): print("at client, connected, start handler") while True: try: data = soc.recv(1024) line = data.decode("utf-8") print("[receive]- {}".format(line)) (self.client_receiver).worker_rx(line) except socket.error: soc.close() break except: print("client_handler, receive error.") fx=soc.makefile('r') try: while True: l=fx.readline() (self.client_receiver).worker_rx(l) except socket.error: # socketにエラーが発生したとき print("socket error. exit") con.close() # socketをclose except: import traceback traceback.print_exc() print("server_handler receive error.") #recv_line = "" #receiving="" #while True: # try: # data = soc.recv(1024) # socket con から最大1024byte 受信 一般的には複数の行を含む。 # recv_line = recv_line+data.decode("utf-8") #前の行が途中で終わっていた場合、その部分を先頭に加える。 # #print("[receive]{} - {}".format(address, recv_line)) # if recv_line.endswith('\n'): #行が途中で終わっていない。 # #print("recv_line is endswith newline") # receiving="no rest" # else: #行が途中で終わっている。 # #print("recv_line is not endswith newline") # receiving="rest" # lx=recv_line.splitlines() # #print("receiving="+receiving) # for il in range(len(lx)-1): #最後の行の手前まで、一行ずつ処理 # l=lx[il] # (self.client_receiver).worker_rx(l) #受け取った1行を処理 # last_line=lx[len(lx)-1] #最後の行。 # #print("last_line="+last_line) # if receiving=="rest": #行が途中で終わっている場合、その部分を次の行の先頭に加える。 # #print("rest, recv_line="+last_line) # recv_line=last_line # else: # #print("no rest") # (self.client_receiver).worker_rx(last_line) #受け取った1行を処理 # recv_line="" # except socket.error: # socketにエラーが発生したとき # print("socket error. exit") # soc.close() # socketをclose # break # except: # import traceback # traceback.print_exc() # print("server_handler receive error.") # break def get_client_number(self): return len(self.clxa) def send_to_server(self, line): self.soc.send(bytes(line, "utf-8")) self.soc.send(bytes((line+'\n'), "utf-8")) # # server, clientにおいて、受け取ったデータを処理するreceiverの設定 def set_server_receiver(self, rec): self.server_receiver = rec def set_client_receiver(self, rec): self.client_receiver = rec # def close_all(self): for cx in self.clxa: cx[0].close() # Mandel 並列計算クラス class Mandel: isMaster = False results = [] cmap=["#000000","#050000","#000500","#000005", "#100000","#001000","#000010", "#200000","#002000","#000020", "#505000","#500050","#005050", "#7f0000","#007f00","#00007f"] receiving="" M = 100 def mandel(self, c): k = 0 z = 0 while k < self.M and abs(z) < 2: z = z**2 - c k += 1 return k def server_continue_thread(self,dmy): if self.isMaster: # master step 3: worker からの結果を待つ print("master step 3. wait for the results of workers.") while self.nrec < self.pn: time.sleep(0.01) # master step 4: 結果の総和と出力 print("master step 4.sum of all results of workers and output") end_time = time.time() print("lapse="+str(end_time-self.start_time)) dmy=input("OK?") self.com.close_all() self.root.destroy() exit(1) def draw_point(self, x, y, z): #print("draw_point x="+str(x)+" y="+str(y)+" z="+str(z)) color = "#000000" if z >= 100: color = "#ff0000" elif z > 80: color = "#00ff00" elif z > 60: color = "#0000ff" elif z > 40: color = "#ffff00" elif z > 30: color = "#0fff0f" elif z > 20: color = "#fff0f0" elif z > 10: color = "#ff00ff" elif z > 8: color = "#00ffff" elif z>0: ix=int(2*z) if ix <16: color=self.cmap[ix] #print("x="+str(x)+" y="+str(y)) ix = (x+1.0)*((self.xmax)/(3.2)) iy = (y+1.2)*((self.ymax)/(2.4)) #print("x="+str(x)+" y="+str(y)+" z="+str(z)+" ix="+str(ix)+" iy="+str(iy)+" color="+color) if self.canvas != None: self.canvas.create_rectangle(float(ix), float(iy), float(ix), float(iy), fill=color, width=0) #print("end draw_point") def __init__(self, mx, px): # SPMD Single Program, Multiple Data で表現 # px: mx が masterのときははworkerの数、workerのときは masterのIPアドレス。 self.com = Com() if mx == "master": self.isMaster = True self.pn = int(px) self.com.set_server_receiver(self) else: self.isMaster = False self.master_ip = px self.px = 0 self.pn = 0 self.com.set_client_receiver(self) self.xmax = 640 self.ymax = 480 if self.isMaster: # for display self.root = tk.Tk() self.root.geometry("700x500") # Canvasの作成 self.canvas = tk.Canvas( self.root, bg="white", width=self.xmax, height=self.ymax) # Canvasを配置 self.canvas.pack(fill=tk.BOTH, expand=True) # master step 1: tcpサーバー起動、worker の接続を待つ print("master step1, start TCP server, wait worker connection.") host = socket.gethostname() # ipアドレスを取得、表示 ip = socket.gethostbyname(host) print(ip) # 192.168.○○○.○○○ print("workder number="+str(self.pn)) self.nrec = 0 self.results = [0.0]*self.pn # workerからの接続待ちstart self.com.start_server() # workerの数がpxになるまで待つ。 i = 0 while i < self.pn: i = self.com.get_client_number() # master sterp 2: wokerに、worker数と担当部分を送る。 print( "master step2, send the pn(number of workers) and pid(assign) to workers.") self.nrec = 0 self.start_time = time.time() for i in range(self.pn): sending = str(self.pn)+" "+str(i) print("send "+sending+" to the "+str(i)+"th client") self.com.send_ith_worker(sending, i) # send n continue_thread = threading.Thread( target=self.server_continue_thread, args=(None,), daemon=True) continue_thread.start() self.root.mainloop() else: # worker step 1: masterに接続 print("worker step1, connect to the master.") (self.com).start_client(self.master_ip) # worker step 2: master からの総数(pn)と割り当て通知を待つ print("worker step2, wait until receiving pn and pid.") while self.pn == 0: time.sleep(0.001) # worker step 3. 担当部分のpiの計算 print("worker step3, calculate the assinge part of pi") s = 0.0 # 割り当てられた部分のmandelの計算 for i in range(0, self.ymax): if self.pid == i % self.pn: # 割り当て部分は、この式で求める。 for xx in range(0, self.xmax): #yy = (self.ymax/self.pn)*i+self.pid y = -1.2+float(i)*2.4/float(self.ymax) x = -1.0+xx*3.2/float(self.xmax) z = self.mandel(x+y*1j) self.send_result(x, y, z) self.send_end_makar() self.com.close_all() exit(1) def server_rx(self, line, pid): #print("line="+line) rx=line.split(',') kx = rx[0] if kx == "p": #print("len_rx="+str(len_rx)) try: self.draw_point(float(rx[1]), float(rx[2]), float(rx[3])) except: import traceback traceback.print_exc() print("except receiving="+self.receiving+" line="+line) return elif kx == "e": self.nrec = self.nrec+1 print("nrec="+str(self.nrec)) def worker_rx(self, data): print("worker_rx data="+data) # print("pn="+str(self.pn)) rx = data.split() self.pn = int(rx[0]) self.pid = int(rx[1]) print("pn="+str(self.pn)) def send_result(self, x, y, z): xline = "p,"+str(x)+","+str(y)+","+str(z) #print("c->s "+xline) self.com.send_to_server(xline) def send_end_makar(self): xline = "e,0.0,0.0,0.0" self.com.send_to_server(xline) # 起動 # master: $ python3 parallel_mandel.py master <np> # <np>は workerの数。 # workerが<np>個起動され、masterに接続されるまで待つ。 # master-ip=172.16.17.x #masterを起動すると、masterのIPアドレスが表示される。 # # worker: $ python3 parallel_mandel.py client 172.16.17.x # masterのIPを指定して clientを起動 # if __name__ == "__main__": # master: python3 parallel_mandel.py master <no.of clients> # client: python3 parallel_mandel.py worker <serverのIPアドレス> args = sys.argv calpi = Mandel(args[1], args[2]) }} ---- #counter