#author("2024-11-13T23:48:54+09:00","default:parallel-distributed","parallel-distributed")
#author("2024-11-16T21:38:26+09:00","default:parallel-distributed","parallel-distributed")
[[第7回]]

#code(Python){{
# parallel_mandel.py
# -*- coding: utf-8 -*-
#
import socket
import threading
import time
import sys
#import matplotlib.pyplot as plt
#import numpy as np
import tkinter as tk
import re

# 通信クラス


class Com:
    HOST = ''
    PORT = 9998
    clxa = []  # 複数のclientのsocketとアドレスの組を格納するリスト
    return_from_worker = []
    server_receiver = None
    client_receiver = None

    def start_server(self):
        handle_thread = threading.Thread(target=self.server_thread, args=())
        # 生成した handle_threadの実行開始(start)
        handle_thread.start()

    def server_thread(self):
        print("start server socket")
        self.pid = 0
        # socket.socket()...接続先のsocketを作成
        self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # sock.binde ...socket をこのホスト('')のIPアドレスとPortに結合
        self.soc.bind((self.HOST, self.PORT))
        # sock.listen(10)...clientからの接続を待つ。10は最大の接続要求可能数
        self.soc.listen(10)
        while True:
            # sock.accept()...client から接続があると、クライアントと接続する
            # socket conと接続元IP address を返す
            con, address = self.soc.accept()
            clx = (con, address, self.pid)  # clxに tuple (con, address,pid)を代入
            self.clxa.append(clx)  # clxaにclxを追加
            print("[connect]{}".format(address))
            self.pid = self.pid+1
            # clientと送受信するためのthread, handler を生成。
            handle_thread = threading.Thread(
                target=self.server_handler, args=(clx,))
            # 生成した handle_threadの実行開始(start)
            handle_thread.start()

    # client からsocketを通じてstreamの入力を行うthreadのhander
    def server_handler(self, clx):
        print("at server, connected, start handler")
        con = clx[0]  # clx[0] は con(socket),
        address = clx[1]  # clx[1] は address
        pid = clx[2]  # cls[2] は processor id
        while True:
            try:
                data = con.recv(1024)  # socket con から最大1024byte 受信
                recv_line = data.decode("utf-8")
                #print("[receive]{} - {}".format(address, recv_line))
                self.server_receiver.server_rx(recv_line, pid)
            except socket.error:  # socketにエラーが発生したとき
                con.close()  # socketをclose
                break
            except:
                print("server_handler receive error.")
                break
        fx=con.makefile('r')
        try:
            while True:
                l=fx.readline()
                self.server_receiver.server_rx(l,pid)
        except socket.error:  # socketにエラーが発生したとき
            print("socket error. exit")
            con.close()  # socketをclose
        except:
            import traceback
            traceback.print_exc()
            print("server_handler receive error.")

        #recv_line = ""
        #receiving=""
        #while True:
        #    try:
        #        #data = con.recv(1024)  # socket con から最大1024byte 受信 一般的には複数の行を含む。
        #        #recv_line = recv_line+data.decode("utf-8") #前の行が途中で終わっていた場合、その部分を先頭に加える。
        #        ##print("[receive]{} - {}".format(address, recv_line))
        #        #if recv_line.endswith('\n'):  #行が途中で終わっていない。
        #        #    #print("recv_line is endswith newline")
        #        #    receiving="no rest"
        #        #else:   #行が途中で終わっている。
        #        #    #print("recv_line is not endswith newline")
        #        #    receiving="rest"
        #        #lx=recv_line.splitlines()
        #        ##print("receiving="+receiving)
        #        #for il in range(len(lx)-1): #最後の行の手前まで、一行ずつ処理
        #        #    l=lx[il]
        #        #    self.server_receiver.server_rx(l, pid)  #受け取った1行を処理
        #        #last_line=lx[len(lx)-1]   #最後の行。
        #        ##print("last_line="+last_line)
        #        #if receiving=="rest":   #行が途中で終わっている場合、その部分を次の行の先頭に加える。
        #        #    #print("rest, recv_line="+last_line)
        #        #    recv_line=last_line
        #        #else:
        #        #    #print("no rest")
        #        #    self.server_receiver.server_rx(last_line,pid)
        #        #    recv_line=""
        #    except socket.error:  # socketにエラーが発生したとき
        #        print("socket error. exit")
        #        con.close()  # socketをclose
        #        break
        #    except:
        #        import traceback
        #        traceback.print_exc()
        #        print("server_handler receive error.")
        #        break
    #

    def send_ith_worker(self, line, i):
        # print("send_ith_worker("+line+","+str(i)+")")
        cx = self.clxa[i]
        try:
            cx[0].send(line.encode('utf-8'))
            cx[0].send(bytes(line+'\n','utf-8'))
        except:
            print("send_ith_worker error")

    def start_client(self, host):
        print("start client socket, connect, host="+host)
        try:
            # socket.socket ... socket を作成し、socに代入
            self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            # IPアドレスがhostのserver socketにsocを接続
            self.soc.connect((host, self.PORT))
            # 受信担当の関数handlerを物threadでうごかすhandle_threadを生成。
            self.handle_thread = threading.Thread(
                target=self.client_handler, args=(self.soc,))
            # handle_threadをstart
            self.handle_thread.start()
        except:  # 接続時にエラーがあった場合の処理
            import traceback
            traceback.print_exc()
            print("connect error.")

    # 受信の処理。送信threadとは別に、並行して処理を行う。
    def client_handler(self, soc):
        print("at client, connected, start handler")
        while True:
            try:
                data = soc.recv(1024)
                line = data.decode("utf-8")
                print("[receive]- {}".format(line))
                (self.client_receiver).worker_rx(line)
            except socket.error:
                soc.close()
                break
            except:
                print("client_handler, receive error.")
        fx=soc.makefile('r')
        try:
            while True:
                l=fx.readline()
                (self.client_receiver).worker_rx(l)
        except socket.error:  # socketにエラーが発生したとき
            print("socket error. exit")
            con.close()  # socketをclose
        except:
            import traceback
            traceback.print_exc()
            print("server_handler receive error.")

        #recv_line = ""
        #receiving=""
        #while True:
        #    try:
        #        data = soc.recv(1024)  # socket con から最大1024byte 受信 一般的には複数の行を含む。
        #        recv_line = recv_line+data.decode("utf-8") #前の行が途中で終わっていた場合、その部分を先頭に加える。
        #        #print("[receive]{} - {}".format(address, recv_line))
        #        if recv_line.endswith('\n'):  #行が途中で終わっていない。
        #            #print("recv_line is endswith newline")
        #            receiving="no rest"
        #        else:   #行が途中で終わっている。
        #            #print("recv_line is not endswith newline")
        #            receiving="rest"
        #        lx=recv_line.splitlines()
        #        #print("receiving="+receiving)
        #        for il in range(len(lx)-1): #最後の行の手前まで、一行ずつ処理
        #            l=lx[il]
        #            (self.client_receiver).worker_rx(l)   #受け取った1行を処理
        #        last_line=lx[len(lx)-1]   #最後の行。
        #        #print("last_line="+last_line)
        #        if receiving=="rest":   #行が途中で終わっている場合、その部分を次の行の先頭に加える。
        #            #print("rest, recv_line="+last_line)
        #            recv_line=last_line
        #        else:
        #            #print("no rest")
        #            (self.client_receiver).worker_rx(last_line)   #受け取った1行を処理
        #            recv_line=""
        #    except socket.error:  # socketにエラーが発生したとき
        #        print("socket error. exit")
        #        soc.close()  # socketをclose
        #       break
        #    except:
        #        import traceback
        #        traceback.print_exc()
        #        print("server_handler receive error.")
        #        break

    def get_client_number(self):
        return len(self.clxa)

    def send_to_server(self, line):
        self.soc.send(bytes(line, "utf-8"))
        self.soc.send(bytes((line+'\n'), "utf-8"))

    #
    # server, clientにおいて、受け取ったデータを処理するreceiverの設定
    def set_server_receiver(self, rec):
        self.server_receiver = rec

    def set_client_receiver(self, rec):
        self.client_receiver = rec

    #
    def close_all(self):
        for cx in self.clxa:
            cx[0].close()

# Mandel 並列計算クラス


class Mandel:
    isMaster = False
    results = []
    cmap=["#000000","#050000","#000500","#000005",
          "#100000","#001000","#000010",
          "#200000","#002000","#000020",
          "#505000","#500050","#005050",
          "#7f0000","#007f00","#00007f"]
    receiving=""

    M = 100

    def mandel(self, c):
        k = 0
        z = 0
        while k < self.M and abs(z) < 2:
            z = z**2 - c
            k += 1
        return k

    def server_continue_thread(self,dmy):
        if self.isMaster:
            # master step 3:  worker からの結果を待つ
            print("master step 3. wait for the results of workers.")
            while self.nrec < self.pn:
                time.sleep(0.01)
            # master step 4: 結果の総和と出力
            print("master step 4.sum of all results of workers and output")
            end_time = time.time()
            print("lapse="+str(end_time-self.start_time))
            dmy=input("OK?")
        self.com.close_all()
        self.root.destroy()
        exit(1)

    def draw_point(self, x, y, z):
        #print("draw_point x="+str(x)+" y="+str(y)+" z="+str(z))
        color = "#000000"
        if z >= 100:
            color = "#ff0000"
        elif z > 80:
            color = "#00ff00"
        elif z > 60:
            color = "#0000ff"
        elif z > 40:
            color = "#ffff00"
        elif z > 30:
            color = "#0fff0f"
        elif z > 20:
            color = "#fff0f0"
        elif z > 10:
            color = "#ff00ff"
        elif z > 8:
            color = "#00ffff"
        elif z>0:
            ix=int(2*z)
            if ix <16:
                color=self.cmap[ix]
        #print("x="+str(x)+" y="+str(y))
        ix = (x+1.0)*((self.xmax)/(3.2))
        iy = (y+1.2)*((self.ymax)/(2.4))
        #print("x="+str(x)+" y="+str(y)+" z="+str(z)+" ix="+str(ix)+" iy="+str(iy)+" color="+color)
        if self.canvas != None:
            self.canvas.create_rectangle(float(ix), float(iy), float(ix), float(iy), fill=color, width=0)
        #print("end draw_point")

    def __init__(self, mx, px):
        # SPMD Single Program, Multiple Data で表現
        # px: mx が masterのときははworkerの数、workerのときは masterのIPアドレス。
        self.com = Com()
        if mx == "master":
            self.isMaster = True
            self.pn = int(px)
            self.com.set_server_receiver(self)
        else:
            self.isMaster = False
            self.master_ip = px
            self.px = 0
            self.pn = 0
            self.com.set_client_receiver(self)
        self.xmax = 640
        self.ymax = 480

        if self.isMaster:
            #  for display
            self.root = tk.Tk()
            self.root.geometry("700x500")
            # Canvasの作成
            self.canvas = tk.Canvas(
                self.root, bg="white", width=self.xmax, height=self.ymax)
            # Canvasを配置
            self.canvas.pack(fill=tk.BOTH, expand=True)

            # master step 1: tcpサーバー起動、worker の接続を待つ
            print("master step1, start TCP server, wait worker connection.")
            host = socket.gethostname()
            # ipアドレスを取得、表示
            ip = socket.gethostbyname(host)
            print(ip)  # 192.168.○○○.○○○
            print("workder number="+str(self.pn))
            self.nrec = 0
            self.results = [0.0]*self.pn
            # workerからの接続待ちstart
            self.com.start_server()
            # workerの数がpxになるまで待つ。
            i = 0
            while i < self.pn:
                i = self.com.get_client_number()
            # master sterp 2: wokerに、worker数と担当部分を送る。
            print(
                "master step2, send the pn(number of workers) and pid(assign) to workers.")
            self.nrec = 0
            self.start_time = time.time()
            for i in range(self.pn):
                sending = str(self.pn)+" "+str(i)
                print("send "+sending+" to the "+str(i)+"th client")
                self.com.send_ith_worker(sending, i)  # send n
            continue_thread = threading.Thread(
                target=self.server_continue_thread, args=(None,), daemon=True)
            continue_thread.start()
            self.root.mainloop()

        else:
            # worker step 1: masterに接続
            print("worker step1, connect to the master.")
            (self.com).start_client(self.master_ip)
            # worker step 2: master からの総数(pn)と割り当て通知を待つ
            print("worker step2, wait until receiving pn and pid.")
            while self.pn == 0:
                time.sleep(0.001)
            # worker step 3. 担当部分のpiの計算
            print("worker step3, calculate the assinge part of pi")
            s = 0.0
            # 割り当てられた部分のmandelの計算
            for i in range(0, self.ymax):
                if self.pid == i % self.pn:  # 割り当て部分は、この式で求める。
                    for xx in range(0, self.xmax):
                        #yy = (self.ymax/self.pn)*i+self.pid
                        y = -1.2+float(i)*2.4/float(self.ymax)
                        x = -1.0+xx*3.2/float(self.xmax)
                        z = self.mandel(x+y*1j)
                        self.send_result(x, y, z)
            self.send_end_makar()
            self.com.close_all()
            exit(1)

    def server_rx(self, line, pid):
        #print("line="+line)
        rx=line.split(',')
        kx = rx[0]
        if kx == "p":
            #print("len_rx="+str(len_rx))
            try:
                self.draw_point(float(rx[1]), float(rx[2]), float(rx[3]))
            except:
                import traceback
                traceback.print_exc()
                print("except receiving="+self.receiving+" line="+line)
                return
        elif kx == "e":
            self.nrec = self.nrec+1
            print("nrec="+str(self.nrec))

    def worker_rx(self, data):
        print("worker_rx data="+data)
        # print("pn="+str(self.pn))
        rx = data.split()
        self.pn = int(rx[0])
        self.pid = int(rx[1])
        print("pn="+str(self.pn))

    def send_result(self, x, y, z):
        xline = "p,"+str(x)+","+str(y)+","+str(z)
        #print("c->s "+xline)
        self.com.send_to_server(xline)

    def send_end_makar(self):
        xline = "e,0.0,0.0,0.0"
        self.com.send_to_server(xline)


# 起動
# master: $ python3 parallel_mandel.py master <np>
#                  <np>は workerの数。
#                  workerが<np>個起動され、masterに接続されるまで待つ。
# master-ip=172.16.17.x    #masterを起動すると、masterのIPアドレスが表示される。
#
# worker: $ python3 parallel_mandel.py client 172.16.17.x
#               masterのIPを指定して clientを起動
#

if __name__ == "__main__":
    # master: python3 parallel_mandel.py master <no.of clients>
    # client: python3 parallel_mandel.py worker <serverのIPアドレス>
    args = sys.argv
    calpi = Mandel(args[1], args[2])


}}
----
#counter

トップ   編集 差分 履歴 添付 複製 名前変更 リロード   新規 一覧 検索 最終更新   ヘルプ   最終更新のRSS