#author("2023-01-21T08:25:12+09:00","default:parallel-distributed","parallel-distributed")
#author("2023-01-23T23:07:27+09:00","default:parallel-distributed","parallel-distributed")
[[第6回]]

#code(Python){{
#parallel_pi.py
# -*- coding: utf-8 -*-
#
import socket
import threading
import time
import sys

#通信クラス
class Com:
    HOST = ''
    PORT = 9998
    clxa=[] #複数のclientのsocketとアドレスの組を格納するリスト
    return_from_worker=[]
    server_receiver = None
    client_receiver = None
    def start_server(self):
        handle_thread = threading.Thread(target=self.server_thread, args=())
        #生成した handle_threadの実行開始(start)
        handle_thread.start()

    def server_thread(self):
        print("start server socket")
        self.pid=0
        #socket.socket()...接続先のsocketを作成
        self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        #sock.binde ...socket をこのホスト('')のIPアドレスとPortに結合
        self.soc.bind((self.HOST, self.PORT))
        #sock.listen(10)...clientからの接続を待つ。10は最大の接続要求可能数
        self.soc.listen(10)
        while True:
            #sock.accept()...client から接続があると、クライアントと接続する
            #socket conと接続元IP address を返す
            con,address = self.soc.accept()
            clx=(con,address,self.pid) #clxに tuple (con, address,pid)を代入
            self.clxa.append(clx) #clxaにclxを追加
            print("[connect]{}".format(address))
            self.pid=self.pid+1
            #clientと送受信するためのthread, handler を生成。
            handle_thread = threading.Thread(target=self.server_handler, args=(clx,))
            #生成した handle_threadの実行開始(start)
            handle_thread.start()

    #client からsocketを通じてstreamの入力を行うthreadのhander
    #client からsocketを通じてstreamの入力を行うthreadのhandler
    def server_handler(self,clx):
        print("at server, connected, start handler");
        con=clx[0] #clx[0] は con(socket),
        address=clx[1] #clx[1] は address
        pid=clx[2] #cls[2] は processor id
        while True:
            try:
                data = con.recv(1024) #socket con から最大1024byte 受信
                recv_line=data.decode("utf-8")
                print("[receive]{} - {}".format(address, recv_line))
                self.server_receiver.server_rx(recv_line,pid)
            except socket.error: #socketにエラーが発生したとき
                con.close() #socketをclose
            except:
                print("server_handler receive error.")
    #
    def send_ith_worker(self,line,i):
        #print("send_ith_worker("+line+","+str(i)+")")
        cx=self.clxa[i]
        try:
            cx[0].send(line.encode('utf-8'))
        except:
            print("send_ith_worker error")

    def start_client(self,host):
        print("start client socket, connect, host="+host)
        try:
            #socket.socket ... socket を作成し、socに代入
            self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            #IPアドレスがhostのserver socketにsocを接続
            self.soc.connect((host,self.PORT))
            #受信担当の関数handlerを物threadでうごかすhandle_threadを生成。
            self.handle_thread=threading.Thread(target=self.client_handler,args=(self.soc,))
            #handle_threadをstart
            self.handle_thread.start()
        except: #接続時にエラーがあった場合の処理
            print("connect error.")

    #受信の処理。送信threadとは別に、並行して処理を行う。
    def client_handler(self,soc):
        print("at client, connected, start handler")
        while True:
            try:
                data = soc.recv(1024)
                line=data.decode("utf-8")
                print("[receive]- {}".format(line))
                (self.client_receiver).worker_rx(line)
            except socket.error:
                soc.close()
                break
            except:
                print("client_handler, receive error.")

    def get_client_number(self):
        return len(self.clxa)
    def send_to_server(self,line):
        self.soc.send(bytes(line,"utf-8"))

    #
    # server, clientにおいて、受け取ったデータを処理するreceiverの設定
    def set_server_receiver(self,rec):
        self.server_receiver=rec
    def set_client_receiver(self,rec):
        self.client_receiver=rec

    #
    def close_all(self):
        for cx in self.clxa:
            cx[0].close()

#Pi 並列計算クラス
class Pi:
    isMaster=False
    results=[]
    def __init__(self,mx,px):
        #SPMD Single Program, Multiple Data で表現
        #px: mx が masterのときははworkerの数、workerのときは masterのIPアドレス。
        self.com=Com()
        if mx=="master":
            self.isMaster=True
            self.pn=int(px)
            self.com.set_server_receiver(self)
        else:
            self.isMaster=False
            self.master_ip=px
            self.px=0
            self.pn=0
            self.com.set_client_receiver(self)
        self.n=1000000
        self.d=1.0/self.n
        self.d2=self.d/2.0
        self.s=0.0
        if self.isMaster:
            #master step 1: tcpサーバー起動、worker の接続を待つ
            print("master step1, start TCP server, wait worker connection.")
            host = socket.gethostname()
            # ipアドレスを取得、表示
            ip = socket.gethostbyname(host)
            print(ip) # 192.168.○○○.○○○
            print("workder number="+str(self.pn))
            self.nrec=0
            self.results=[0.0]*self.pn
            #workerからの接続待ちstart
            self.com.start_server()
            #workerの数がpxになるまで待つ。
            i=0
            while i<self.pn:
                i=self.com.get_client_number()
            #master sterp 2: wokerに、worker数と担当部分を送る。
            print("master step2, send the pn(number of workers) and pid(assign) to workers.")
            self.nrec=0
            start_time=time.time()
            for i in range(self.pn):
                sending=str(self.pn)+" "+str(i)
                print("send "+sending+" to the "+str(i)+"th client")
                self.com.send_ith_worker(sending,i) #send n
        else:
            #worker step 1: masterに接続
            print("worker step1, connect to the master.")
            (self.com).start_client(self.master_ip)
            #worker step 2: master からの総数と割り当て通知を待つ
            print("worker step2, wait until receiving pn and pid.")
            while self.pn==0:
                time.sleep(0.001)
            #worker step 3. 担当部分のpiの計算
            print("worker step3, calculate the assinge part of pi")
            s=0.0
            #割り当てられた部分のpiの計算
            for i in range(0,self.n):
                if self.pid==i%self.pn: #割り当て部分は、この式で求める。
                    x=i*self.d+self.d2
                    f=self.d/(x*x+1.0)
                    s=s+f
                    #print("x="+str(x)+",s="+str(s))
            s=s*4.0
            sending=str(s)
            print("end calc. sending="+sending)
            #worker step 4: masterに担当部分の計算結果を送る。
            print("worker step4, send the worker's results to the master.")
            self.com.send_to_server(sending)
        if self.isMaster:
            #master step 3:  worker からの結果を待つ
            print("master step 3. wait for the results of workers.")
            while self.nrec<self.pn:
                time.sleep(0.01)
            #master step 4: 結果の総和と出力
            print("master step 4.sum of all results of workers and output")
            sum=0.0
            for i in range(self.pn):
                sum=sum+self.results[i]
            print(sum)
            end_time=time.time()
            print("lapse="+str(end_time-start_time))
        self.com.close_all()
        exit(1)

    def server_rx(self,x,pid):
        print("server_rx x="+x+" pid="+str(pid)+" nrec="+str(self.nrec))
        self.results[pid]=float(x)
        self.nrec=self.nrec+1
        print("nrec="+str(self.nrec))

    def worker_rx(self,data):
        print("worker_rx data="+data)
        #print("pn="+str(self.pn))
        rx=data.split()
        self.pn=int(rx[0])
        self.pid=int(rx[1])
        print("pn="+str(self.pn))


# 起動
# master: $ python3 parallel_pi.py master <np>
#                  <np>は workerの数。
#                  workerが<np>個起動され、masterに接続されるまで待つ。
# master-ip=172.16.17.x    #masterを起動すると、masterのIPアドレスが表示される。
#
# worker: $ python3 parallel_pi.py client 172.16.17.x
#               masterのIPを指定して clientを起動
#
#

if __name__ == "__main__":
    # master: python parallel_pi.py master <クライアントの数>
    # client: python parallel_pi.py worker <masterのIPアドレス>
    args=sys.argv
    calpi=Pi(args[1],args[2])

}}
----
#counter

トップ   編集 差分 履歴 添付 複製 名前変更 リロード   新規 一覧 検索 最終更新   ヘルプ   最終更新のRSS