#author("2022-05-31T21:39:11+09:00","default:TeleportDresser","TeleportDresser") #author("2022-12-05T23:27:32+09:00","default:TeleportDresser","TeleportDresser") #code(Python){{ # coding: utf-8 # # # +---------+ +--------------+ # Wiki <->| fwb4pi |<---->|tcp_server_ex1| # +---------+ +--------------+ # ^ # | # v # +------------------+ # |RemoteCommandReder| # | | | # | v | # | handler | # | | | # | v | # | parser | # | | | # +----|-------------+ # | # | put # v # +------------------+ # |TeleportDresser | # | | | # | v | # | handler | # +------------------+ # import cv2 import numpy as np from moviepy.editor import ImageSequenceClip # coding: utf-8 from PIL import Image, ImageFont, ImageDraw import smbus import time import sys import requests import os import socket import threading from collections import deque import subprocess import copy from subprocess import check_output #from PIL import Image #import smbus #import time #import sys #import requests class RemoteCommandReader: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) HOST = 'localhost' PORT = 9998 def __init__(self): print("start RemoteCommandReader.__init__") self.client_start() self.teleport_dress=Teleport_Dress() print(self.teleport_dress) self.teleport_dress.setRemoteCommandReader(self) while True: try: time.sleep(0.01) except: print("error") def python2fwbln(self,py2x_message): msg=py2x_message+'\n' self.sock.sendall(msg.encode('utf-8')) def python2fwb(self,py2x_message): msg=py2x_message self.sock.sendall(msg.encode('utf-8')) def parse(self,line): self.python2fwb(">"+line) words=line.split() print(words) if words[0]=='show': fx=words[1] print(fx) print(self.teleport_dress) self.teleport_dress.putImage(fx) elif words[0]=='text': if len(words)>=2: words.pop(0) fx = ' '.join(words) else: fx="" print("text "+fx) self.teleport_dress.putText(fx) elif words[0]=='txtColor': fx=words[1] print("txtColor "+fx) self.teleport_dress.setTextColor(fx) elif words[0]=='clear': self.teleport_dress.clearImage() elif words[0]=='clearAll': self.teleport_dress.putText('') self.teleport_dress.clearImage() elif words[0]=='ls': self.returnFileList() #elif words[0]=='ip_addr': #self.returnIpAddr() elif words[0]=='shutdown': self.shutdown() def returnIpAddr(self): out = check_output(["hostname","-I"]) self.python2fwbln(out) def returnFileList(self): cpath=os.getcwd() os.chdir("/home/pi/Pictures") filenames = [f.name for f in os.scandir()] filenames.sort() for fn in filenames: self.python2fwbln(fn) os.chdir(cpath) def shutdown(self): os.system("sudo shutdown -h now") def client_start(self): """クライアントのスタート""" self.sock.connect((self.HOST, self.PORT)) handle_thread = threading.Thread(target=self.handler, args=(self.sock,), daemon=True) handle_thread.start() def handler(self,sock): """サーバからメッセージを受信し、表示する""" while True: data = sock.recv(1024) print("[受信]{}".format(data.decode("utf-8"))) line=data.decode("utf-8") self.parse(line) received=data.decode("utf-8") lines=received.splitlines() for line in lines: self.parse(line) class Teleport_Dress: off_x=0 off_y=0 imax=75 #display height pixel size jmax=16 #display width pixel size # dx=img_width/jmax # dy=img_height/imax impose_text="" text_color=(0,0,0) def __init__(self): print("start Teleport_Dress.__init__") #self.font_path = './font/font_jb004_running_brush_wi.ttf' self.font_path = '/usr/share/fonts/opentype/noto/NotoSansCJK-Bold.ttc' self.pixels=[[(0,0,0) for j in range(self.jmax)] for i in range(self.imax)] self.addrs = [0x30,0x31,0x32,0x33] self.fourpix=[0x00, 0x00, 0x04, 0,0,0, 0,0,0, 0,0,0, 0,0,0] self.i2c = smbus.SMBus(1) # 注:ラズパイのI2Cポート self.queue=deque([]) self.show_loop_start() self.text_x=0 def setTextColor(self,x): if x=="white": self.text_color=(255,255,255) elif x=="black": self.text_color=(0,0,0) elif x=="red": self.text_color=(255,0,0) elif x=="green": self.text_color=(0,255,0) elif x=="blue": self.text_color=(0,0,255) elif x=="yellow": self.text_color=(255,255,0) elif x=="cyan": self.text_color=(0,255,255) elif x=="magenta": self.text_color=(255,0,255) elif x=="orange": self.text_color=(255,128,0) elif x=="pink": self.text_color=(200,80,80) def setRemoteCommandReader(self, x): self.rcReader=x def cv2pil(self,image): ''' OpenCV型 -> PIL型 ''' new_image = image.copy() if new_image.ndim == 2: # モノクロ pass elif new_image.shape[2] == 3: # カラー new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR2RGB) elif new_image.shape[2] == 4: # 透過 new_image = cv2.cvtColor(new_image, cv2.COLOR_BGRA2RGBA) new_image = Image.fromarray(new_image) return new_image def gif2img(self,file_name): #print("gif2img.."+file_name) try: path="/home/pi/Pictures/"+file_name gif = cv2.VideoCapture(path) print("gif=",gif) except: import traceback traceback.print_exc() rtn="error: show ...no "+file_name self.rcReader.pythohn2fwb(rtn) gif = cv2.VideoCapture('/home/pi/Pictures/giphy-girl-ex1.gif') fps = gif.get(cv2.CAP_PROP_FPS) # fpsは1秒あたりのコマ数 images = [] i = 0 while True: is_success, img = gif.read() if not is_success: #print("gif.read() is not success") break #print("gif read i=",i) images.append(img) i+=1 ex=('anime',images) return ex def pic2imgs(self,file_name): print("pic2imgs("+file_name+")") img=[] #subprocess.call(["cd","~/Pictures"]) try: img = Image.open("/home/pi/Pictures/"+file_name) print("img open success."); #self.show_one_pic(img) except: import traceback traceback.print_exc() rtn="error: show "+file_name self.rcReader.python2fwbln(rtn) #fps = gif.get(cv2.CAP_PROP_FPS) # fpsは1秒あたりのコマ数 # images = [] # # when the width of the img was # larger than the width of the teleport dresser, # make an animation # imgw,imgh=img.size print("imgw="+str(imgw)+",imgh="+str(imgh)) if imgw >int(imgh/1.2) : print("imgw>imgh/1.2") wd=int(imgh/2) dx=int(wd/self.jmax) nx=int(imgw/dx-wd/dx-1) #print("imgw="+str(imgw)+",wd="+str(wd)+",dx="+str(dx)+",nx="+str(nx)) #transform right try: np_img=np.array(img) #imgc=cv2.hconcat([np_img,np_img]) #print("cv2.hconcat") #print(type(imgc)) except: import traceback traceback.print_exc() rtn="error: np.array(img)" self.rcReader.python2fwbln(rtn) off_x=0 off_y=0 for i in range(nx): #print("i="+str(i)) #print("off_x+i*dx="+str(off_x+i*dx)) try: #subimg=imgc[off_y:imgh,off_x+i*dx:off_x+i*dx+wd] subimg=np_img[off_y:imgh,off_x+i*dx:off_x+i*dx+wd] img2=Image.fromarray(subimg) images.append(copy.copy(img2)) except: import traceback traceback.print_exc() rtn="error: subimg" self.rcReader.python2fwbln(rtn) else: print("imgw<=imgh/1.2") # # # i = 0 for i in range(5): img2=copy.copy(img) #self.show_one_pic(img2) images.append(img2) print(i) print("images-len="+str(len(images))) rtn=('img',images) return rtn def clearImage(self): ex=('clear','x') print("clear") self.queue.append(ex) def putImage(self,file_name): print("putImage..."+file_name) if file_name.startswith('http://') or file_name.startswith('https://'): #subprocess.call(["cd ~/Pictures"],shell=True) cpath=os.getcwd() os.chdir("/home/pi/Pictures") wgetx = "wget -N "+file_name subprocess.call([wgetx],shell=True) #subprocess.call(["cd ~/python"],shell=True) os.chdir(cpath) path=file_name.split("/") plen=len(path) xname=path[plen-1] print("xname="+xname) if xname.endswith('.gif') or xname.endswith('.GIF'): ex=self.gif2img(xname) self.queue.append(ex) elif xname.endswith('.jpg') or xname.endswith('.JPG') or \ xname.endswith('.JPEG') or xname.endswith('.jpeg'): ex=('img',xname) #ex=self.pic2imgs(xname) self.queue.append(ex) elif xname.endswith('.png') or xname.endswith('.PNG'): ex=('img',xname) self.queue.append(ex) else: path=file_name.split("/") plen=len(path) xname=path[plen-1] print("xname="+xname); if xname.endswith('.gif') or xname.endswith('.GIF'): ex=self.gif2img(xname) self.queue.append(ex) elif xname.endswith('.jpg') or xname.endswith('.JPG') or \ xname.endswith('.JPEG') or xname.endswith('.jpeg'): #ex=self.pic2imgs(xname) ex=('img',xname) self.queue.append(ex) elif xname.endswith('.png') or xname.endswith('.PNG'): ex=('img',xname) #ex=self.pic2imgs(xname) self.queue.append(ex) print("queue="+str(len(self.queue))) def putText(self,txt): self.impose_text=txt def show_loop_start(self): """show_loop_start""" self.handle_thread_01 = threading.Thread(target=self.handler, daemon=True) self.handle_thread_01.start() def handler(self): """queueからメッセージを受信し、表示する""" last_anime=[] while True: try: qlen = len(self.queue) if qlen>0: print("handler, qlen="+str(qlen)) kimg=self.queue.popleft() print("handler, kimg[0]="+kimg[0]) if kimg[0]=='anime': last_anime=kimg self.show_one_anime_x(kimg) elif kimg[0]=='img': print("handler, kimg[0]="+kimg[0]+" kimg[1]="+kimg[1]) x=self.pic2imgs(kimg[1]) last_anime=x self.show_one_anime_x(x) elif kimg[0]=='clear': last_anime=[] self.clear_pic() else: if last_anime!=[]: self.show_one_anime_x(last_anime) else: time.sleep(0.1) except: if last_anime!=[]: self.show_one_anime_x(last_anime) def show_jpg_png(self,file_name): print("show_jpg_png("+file_name+")") #subprocess.call(["cd","~/Pictures"]) try: img = Image.open("/home/pi/Pictures/"+file_name) self.show_one_pic(img) except: import traceback traceback.print_exc() rtn="error: show "+file_name self.rcReader.python2fwbln(rtn) def show_one_anime_x(self,images): if images[0]=='anime': self.show_one_anime(images[1]) elif images[0]=='img': self.show_one_anime2(images[1]) def show_one_anime(self,images): #print("show_one_anime() images-len="+str(len(images))) try: for t in range(len(images)): #cv2.imshow('test', images[t]) #cv2.waitKey(int(1000/fps)) self.show_one_pic(self.cv2pil(images[t])) except KeyboardInterrupt: print('stop by ctrl-c') except: import traceback traceback.print_exc() rtn="error: show gif" self.rcReader.python2fwbln(rtn) self.handle_thread_01.stop() def show_one_anime2(self,images): #print("show_one_anime2() images-len="+str(len(images))) try: for t in range(len(images)): #cv2.imshow('test', images[t]) #cv2.waitKey(int(1000/fps)) self.show_one_pic(images[t]) except KeyboardInterrupt: print('stop by ctrl-c') except: import traceback traceback.print_exc() rtn="error: show anime2(jpg/png)" self.rcReader.python2fwbln(rtn) self.handle_thread_01.stop() # # command: # clear... 0x00 # show ... 0x01 # set1 ix,iy,r,g,b # ... 0x02 *,*, *,*,* # setn ix,iy,n, r0,g0,b0, ..r(n-1),g(n-1),b(n-1) n<=8 # ... 0x03 *,*, *, *,*,*, ..., *,*,* # def show_one_pic(self,img_in): #print('show_one_pic width:',img_width) #print('show_one_pic height:',img_height) #print("show_one_pic") wwx,wwy=img_in.size fs=int(wwy/2) px=self.text_x py=int(wwx/4) if self.impose_text!="": img = img_in.copy() font = ImageFont.truetype(self.font_path, fs) draw = ImageDraw.Draw(img) draw.text((px,py), self.impose_text, fill=self.text_color, font=font) #img=self.puttext(img_in, self.impose_text, (px,py), self.font_path, fs, (0,0,0)) size = draw.textsize(self.impose_text, font=font) if self.text_x+size[0]<(wwx-wwx/3): self.text_x=wwx-wwx/3 else: #self.text_x=self.text_x-int(wwx/27) self.text_x=self.text_x-int(wwx/13) #print("text_size=("+str(size[0])+",",str(size[1])+")") else: img=img_in img_width, img_height = img.size dx=img_width/self.jmax dy=img_height/self.imax #print("dx=",dx) #print("dy=",dy) for i in range(self.imax): for j in range(self.jmax): x=self.off_x+j*dx y=self.off_y+i*dy rgb=img.getpixel((x,y)) #print("rgb[",j,",",i,"]=",rgb) self.pixels[i][j]=rgb self.i2c.write_byte(self.addrs[0],0x00) self.i2c.write_byte(self.addrs[1],0x00) self.i2c.write_byte(self.addrs[2],0x00) self.i2c.write_byte(self.addrs[3],0x00) for i in range(self.imax): for j in range(4): for k in range(4): p=self.pixels[i][4*j+k] self.fourpix[3+k*3+0]=p[0] self.fourpix[3+k*3+1]=p[1] self.fourpix[3+k*3+2]=p[2] self.fourpix[0]=i; self.fourpix[1]=0; i2c_addr=self.addrs[j] try: self.i2c.write_i2c_block_data(i2c_addr,0x03,self.fourpix) except: print('i2c write error at:(',i,',',j,')') #time.sleep(0.00001) self.i2c.write_byte(self.addrs[0],0x01) self.i2c.write_byte(self.addrs[1],0x01) self.i2c.write_byte(self.addrs[2],0x01) self.i2c.write_byte(self.addrs[3],0x01) def clear_pic(self): print("clear_pic") self.i2c.write_byte(self.addrs[0],0x00) self.i2c.write_byte(self.addrs[1],0x00) self.i2c.write_byte(self.addrs[2],0x00) self.i2c.write_byte(self.addrs[3],0x00) time.sleep(0.001) self.i2c.write_byte(self.addrs[0],0x01) self.i2c.write_byte(self.addrs[1],0x01) self.i2c.write_byte(self.addrs[2],0x01) self.i2c.write_byte(self.addrs[3],0x01) def main(): print("start teleport_dress_ex1.py") RemoteCommandReader() if __name__ == "__main__": main() }}