3D_source_ex02_part_1.py
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
単語検索
|
最終更新
|
ヘルプ
|
ログイン
]
開始行:
[[3D_source_ex02.py]]
#code(python){{
## License: Apache 2.0. See LICENSE file in root directory.
## Copyright(c) 2017 Intel Corporation. All Rights Reserved.
#20230506
#####################################################
## Align Depth to Color ##
#####################################################
# First import the library
import pyrealsense2 as rs
# Import Numpy for easy array manipulation
import numpy as np
# Import OpenCV for easy image rendering
import cv2
import copy
import socket
import threading
import sys
from tkinter import *
import tkinter as tk
from tkinter import scrolledtext
from PIL import Image, ImageTk, ImageOps
PORT = 9998
class Com:
def __init__(self,host):
self.host=host
self.connected=False
self.soc=None
self.handle_thread=None
self.message_window=None
def set_message_window(self,window):
self.message_window=window
def is_connected(sekf):
return self.connected
def connect(self):
print("re connect to host "+self.host)
try:
if self.soc !=None:
self.soc.connect((self.host,PORT))
else:
#socket.socket ... socket を作成し、socに代入
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#IPアドレスがhostのserver socketにsocを接続
self.soc.connect((self.host,PORT))
if self.handle_thread==None:
#受信担当の関数handlerを物threadでうごかすhandle_threadを生成。
self.handle_thread=threading.Thread(target=self.handler)
#handle_threadをstart
self.handle_thread.start()
self.connected=True
if self.message_window:
self.message_window.write_message("connected to "+self.host+":"+str(PORT))
except:
print("connect error.")
if self.message_window:
self.message_window.write_message("error when connect to "+self.host+":"+str(PORT))
self.connected=False
def send_command(self,command):
if self.soc!=None:
try:
self.soc.send(bytes(command,"utf-8"))
except KeyboardInterrupt:
if self.message_window:
self.message_window.write_message("error when sends "+command + " to "+self.host+":"+str(PORT))
self.soc.close()
exit()
else:
self.message_window.write_message("no socket when sends "+command + " to "+self.host+":"+str(PORT))
#受信の処理。送信threadとは別に、並行して処理を行う。
def handler(self):
print("at client, connected, start handler")
while True:
try:
data = self.soc.recv(1024)
line="[receive]- {}".format(data.decode("utf-8"))
print(line)
if self.message_window:
self.message_window.write_message("from "+self.host+":"+str(PORT)+", "+line)
except socket.error:
if self.message_window:
self.message_window.write_message(self.host+":"+str(PORT)+" closed")
self.soc.close()
self.soc=None
break
exit()
def close(self):
self.soc.close()
self.soc=None
class Coms:
def __init__(self):
self.coms=[None,None,None,None,None,None,None,None]
self.coms[0]=Com("192.168.13.21")
self.coms[1]=Com("192.168.13.19")
self.coms[2]=Com("192.168.13.20")
self.coms[3]=Com("192.168.13.22")
self.coms[4]=Com("192.168.13.12")
self.coms[5]=Com("192.168.13.14")
self.coms[6]=Com("192.168.13.15")
#self.coms[7]=Com("192.168.13.16")
self.coms[7]=Com("192.168.13.7")
#self.coms[7]=Com("192.168.1.22")
def connect_all(self):
for i in range(8):
try:
if self.coms[i]!=None:
print("connect to coms["+str(i)+"].. ")
self.coms[i].connect()
else:
print("coms["+str(i)+"] is None, connect, ")
except:
print("coms["+str(i)+"] connect error")
def set_message_window(self,window):
for i in range(8):
if self.coms[i]!=None:
self.coms[i].set_message_window(window)
else:
print("coms["+str(i)+"] is None, set_message_window, ")
def send_command_to_i(self,i,command):
if self.coms[i]!=None:
self.coms[i].send_command(command)
def send_command_to_all(self,command):
for i in range(8):
if self.coms[i]!=None:
self.coms[i].send_command(command)
def close_all(self):
for i in range(8):
if self.coms[i]!=None:
self.coms[i].close()
class Command_Panel:
def __init__(self,root,coms):
self.coms=coms
self.root=root
self.root.title("command panel")
self.root.configure(background="white")
self.root.geometry("810x1300")
self.command_field_for_ith_server=Entry(self.root,width=40)
self.command_field_for_ith_server.place(x=80,y=50)
self.ith_server_field=Entry(self.root,width=5)
self.ith_server_field.place(x=400,y=50)
self.ith_enter_button=Button(self.root,text="enter",bd=4,bg='white',command=self.enter_ith_command,width=8,relief=RIDGE)
self.ith_enter_button.place(x=500,y=50)
self.command_field_for_all_server=Entry(self.root,width=50)
self.command_field_for_all_server.place(x=80,y=90)
self.all_enter_button=Button(self.root,text="enter",bd=4,bg='white',command=self.enter_all_command,width=8,relief=RIDGE)
self.all_enter_button.place(x=500,y=90)
self.message_frame=tk.Frame(self.root,width=50,height=30)
self.message_frame.place(x=80,y=120)
self.message_area=scrolledtext.ScrolledText(self.message_frame)
self.message_area.pack()
self.connect_button=Button(self.root, text="connect",bd=4,bg='white',command=self.click_connect_button, width=8, relief=RIDGE)
self.connect_button.place(x=80,y=10)
self.near_depth_field=Entry(self.root,width=5)
self.near_depth_field.place(x=200,y=10)
self.near_depth_field.insert(tk.END,"0.1")
self.far_depth_field=Entry(self.root,width=5)
self.far_depth_field.place(x=300,y=10)
self.far_depth_field.insert(tk.END,"1.0")
self.sending=BooleanVar()
self.real_sense_sending_check=Checkbutton(self.root,variable=self.sending,text="real sense sending")
self.real_sense_sending_check.place(x=400,y=10)
self.sending.set(False)
self.bg_removed_canvas=tk.Canvas(self.root,width=320,height=240)
#self.bg_removed_canvas.bind('<Button-1>',self.bg_removed_canvas_click)
self.bg_removed_canvas.place(x=80,y=600)
self.depth_colormap_canvas=tk.Canvas(self.root,width=320,height=240)
#self.depth_colormap_canvas.bind('<Button-1>',self.bg_removed_canvas_click)
self.depth_colormap_canvas.place(x=400,y=600)
self.pic_panel_0_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_0_canvas.place(x=80,y=850)
self.pic_panel_1_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_1_canvas.place(x=240,y=850)
self.pic_panel_2_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_2_canvas.place(x=400,y=850)
self.pic_panel_3_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_3_canvas.place(x=560,y=850)
self.pic_panel_4_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_4_canvas.place(x=80,y=970)
self.pic_panel_5_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_5_canvas.place(x=240,y=970)
self.pic_panel_6_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_6_canvas.place(x=400,y=970)
self.pic_panel_7_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_7_canvas.place(x=560,y=970)
def enter_ith_command(self):
command=self.command_field_for_ith_server.get()
ith_s=self.ith_server_field.get()
ith=int(ith_s)
print("send "+command+" to "+str(ith))
self.write_message("send "+command+" to "+str(ith))
self.coms.send_command_to_i(ith,command)
self.command_field_for_ith_server.delete(0,END)
def enter_all_command(self):
command=self.command_field_for_all_server.get()
print("send "+command+" to all")
self.write_message("send "+command+" to all")
self.coms.send_command_to_all(command)
self.command_field_for_all_server.delete(0,END)
def write_message(self, message):
self.message_area.insert(tk.END,message)
self.message_area.insert(tk.END,'\n')
def click_connect_button(self):
self.coms.connect_all()
class Get_Cross_Sections:
def __init__(self,coms,panel):
# Create a pipeline
self.pipeline = rs.pipeline()
self.coms=coms
self.panel=panel
# Create a config and configure the pipeline to stream
# different resolutions of color and depth streams
self.config = rs.config()
self.is_quitting=False
# Get device product line for setting a supporting resolution
self.pipeline_wrapper = rs.pipeline_wrapper(self.pipeline)
self.pipeline_profile = self.config.resolve(self.pipeline_wrapper)
self.device = self.pipeline_profile.get_device()
self.device_product_line = str(self.device.get_info(rs.camera_info.product_line))
self.found_rgb = False
for s in self.device.sensors:
if s.get_info(rs.camera_info.name) == 'RGB Camera':
self.found_rgb = True
break
self.oh=480
self.ow=640
if not self.found_rgb:
print("The demo requires Depth camera with Color sensor")
exit(0)
self.config.enable_stream(rs.stream.depth, self.ow, self.oh, rs.format.z16, 30)
if self.device_product_line == 'L500':
self.config.enable_stream(rs.stream.color, 960, 540, rs.format.bgr8, 30)
else:
self.config.enable_stream(rs.stream.color, self.ow, self.oh, rs.format.bgr8, 30)
# Start streaming
self.profile = self.pipeline.start(self.config)
# Getting the depth sensor's depth scale (see rs-align example for explanation)
self.depth_sensor = self.profile.get_device().first_depth_sensor()
self.depth_scale = self.depth_sensor.get_depth_scale()
print("Depth Scale is: " , self.depth_scale)
# We will be removing the background of objects more than
# clipping_distance_in_meters meters away
self.clipping_distance_in_meters = 1 #1 meter
self.streaming_thread=threading.Thread(target=self.streaming_loop)
self.streaming_thread.start()
def streaming(self):
# Create an align object
# rs.align allows us to perform alignment of depth frames to others frames
# The "align_to" is the stream type to which we plan to align depth frames.
self.align_to = rs.stream.color
self.align = rs.align(self.align_to)
#print("depth range near(m):")
#self.d_near=input()
self.d_near=float(self.panel.near_depth_field.get())
print("depth range far(m):")
#self.d_far=input()
self.d_far=float(self.panel.far_depth_field.get())
self.clipping_near = float(self.d_near) / self.depth_scale
print("depth range: ("+str(self.d_near)+" - "+str(self.d_far)+")")
self.ping_distance_in_meters = float(self.d_far) #1 meter
self.ping_distance = self.clipping_distance_in_meters / self.depth_scale
self.clipping_distance = self.clipping_distance_in_meters / self.depth_scale
# Get frameset of color and depth
frames = self.pipeline.wait_for_frames()
# frames.get_depth_frame() is a 640(ow)x360(oh) depth image
# Align the depth frame to color frame
aligned_frames = self.align.process(frames)
# Get aligned frames
aligned_depth_frame = aligned_frames.get_depth_frame() # aligned_depth_frame is a 640x480 depth image
color_frame = aligned_frames.get_color_frame()
# Validate that both frames are valid
if not aligned_depth_frame or not color_frame:
return #continue
depth_image = np.asanyarray(aligned_depth_frame.get_data())
color_image = np.asanyarray(color_frame.get_data())
# Remove background - Set pixels further than clipping_distance to grey
grey_color = 153
black_color = 0
depth_image_3d = np.dstack((depth_image,depth_image,depth_image)) #depth image is 1 channel, color is 3 channels
print("depth_image_3d[100,100]=",end="")
print(depth_image_3d[100,100])
self.bg_removed = np.where((depth_image_3d > self.clipping_distance) | (depth_image_3d <= self.clipping_near), black_color, color_image)
print("bg_removed[100,100]=",end="")
print(self.bg_removed[100,100])
# Render images:
# depth align to color on left
# depth on right
self.depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
print("bg_removed[0,0]=",end="")
print(self.bg_removed[0,0])
print("bg_removed.shape=",end="")
print(self.bg_removed.shape)
d_min=0
d_max=0
self.d_interval=(self.clipping_distance - self.clipping_near)/8
self.pic_plane=[[],[],[],[],[],[],[],[]]
first=0
xii=-1
for ii in range(0,8):
wa=copy.deepcopy(self.bg_removed)
self.pic_plane[ii]=np.zeros_like(wa)
#print("pic_plane["+str(ii)+"]=",end="")
#print(pic_plane[ii].shape)
for iw in range(0,self.ow):
for ih in range(0,self.oh):
dx=depth_image[ih,iw]
if d_min>dx:
d_min=dx
if d_max<dx:
d_max=dx
dxx=dx-self.clipping_near
pxx=dxx/self.d_interval
ipxx=int(pxx)
if 0<=ipxx and ipxx<=7:
self.pic_plane[ipxx][ih,iw]=self.bg_removed[ih,iw]
if self.panel.sending.get():
#
# sin 3D display
# w: 32.5cm, 16
# h: 50.0cm, 75
# z: 16.0cm, 8
#
# for i in each ii, for each ih,
# send "set4 ix(in ih) iy(in iw) r0,g0,b0 ... r3,g3,b3"
# to i's display's p
#
self.coms.send_command_to_all("clear plane")
for ii in range(8):
for ih in range(75):
#
# pic_plane's h:480(oh), 3d-display's h':75
# h=(480/75)*h'
#
h=(480/75)*(74-ih)
for iy in range(4):
#
# pic_plane's w:640->320, 3d-display's w w':16
# offset 160
# w=160+(320/16)*w'
#
#
# send "setpic4 ix iy r0 g0 b0 ... r3 g3 b3"
#
sending="setpic4 "+str(ih)+" "+str(iy*4)+" "
send_flag=False
for ipxx in range(4):
w=160+(320/16)*(iy*4+ipxx)
dx=depth_image[int(h),int(w)]
dxx=dx-self.clipping_near
pxx=dxx/self.d_interval
if int(pxx)==ii:
send_flag=True
print("h="+str(h)+",w="+str(w)+",pxx="+str(pxx))
print("color=",end="")
print(self.pic_plane[int(pxx)][int(h),int(w)])
print("color2=",end="")
print(self.bg_removed[int(h),int(w)])
ccxx=self.bg_removed[int(h),int(w)]
ccx=[0,0,0]
cw=ccxx[0]
ccx[0]=ccxx[2]
ccx[1]=ccxx[1]
ccx[2]=cw
else:
ccx=[0,0,0]
for cx in range(3):
sending=sending+str(ccx[cx])+" "
if send_flag:
self.panel.write_message("send "+sending+" to "+str(ii))
self.coms.send_command_to_i(ii,sending+'\n')
#else:
#self.panel.write_message("not send "+sending+" to "+str(ii))
self.coms.send_command_to_all("show plane")
print("d_min=",end="")
print(d_min)
print("d_max=",end="")
print(d_max)
if xii!=-1:
print("pic_plane["+str(xii)+"]["+str(xih)+","+str(xiw)+"]=",end="")
print(self.pic_plane[xii][xih,xiw])
print("bg_removed["+str(xih)+","+str(xiw)+"]=",end="")
print(self.bg_removed[xih,xiw])
else:
print("xii==-1")
self.display()
key = cv2.waitKey(1)
# Press esc or 'q' to close the image window
if key & 0xFF == ord('q') or key == 27:
cv2.destroyAllWindows()
self.is_quitting=True
def streaming_loop(self):
# Streaming loop
try:
while True:
print("start streaming.")
self.streaming()
if self.is_quitting:
return
finally:
print("streaming_thread,finally")
self.pipeline.stop()
def display_panel_to_canvas(self,panel,canvas):
'''画像をCanvasに表示する'''
# フレーム画像の取得
#ret, frame = self.capture.read()
# BGR→RGB変換
px = cv2.cvtColor(panel, cv2.COLOR_BGR2RGB)
# NumPyのndarrayからPillowのImageへ変換
pil_image = Image.fromarray(px)
# キャンバスのサイズを取得
canvas0_width = canvas.winfo_width()
canvas0_height = canvas.winfo_height()
# 画像のアスペクト比(縦横比)を崩さずに指定したサイズ(キャンバスのサイズ)全体に画像をリサイズする
pil_image = ImageOps.pad(pil_image, (canvas0_width, canvas0_height))
# PIL.ImageからPhotoImageへ変換する
photo_image = ImageTk.PhotoImage(image=pil_image)
self.imgs.append(photo_image)
# 画像の描画
canvas.create_image(
canvas0_width / 2, # 画像表示位置(Canvasの中心)
canvas0_height / 2,
image=photo_image # 表示画像データ
)
def display(self):
self.imgs=[]
self.display_panel_to_canvas(self.bg_removed,self.panel.bg_removed_canvas)
self.display_panel_to_canvas(self.depth_colormap,self.panel.depth_colormap_canvas)
self.display_panel_to_canvas(self.pic_plane[0],self.panel.pic_panel_0_canvas)
self.display_panel_to_canvas(self.pic_plane[1],self.panel.pic_panel_1_canvas)
self.display_panel_to_canvas(self.pic_plane[2],self.panel.pic_panel_2_canvas)
self.display_panel_to_canvas(self.pic_plane[3],self.panel.pic_panel_3_canvas)
self.display_panel_to_canvas(self.pic_plane[4],self.panel.pic_panel_4_canvas)
self.display_panel_to_canvas(self.pic_plane[5],self.panel.pic_panel_5_canvas)
self.display_panel_to_canvas(self.pic_plane[6],self.panel.pic_panel_6_canvas)
self.display_panel_to_canvas(self.pic_plane[7],self.panel.pic_panel_7_canvas)
# disp_image()を10msec後に実行する
#self.disp_id = self.panel.root.after(10, self.display)
if __name__=="__main__":
root=Tk()
coms=Coms()
p=Command_Panel(root,coms)
coms.set_message_window(p)
cs=Get_Cross_Sections(coms,p)
root.mainloop()
}}
終了行:
[[3D_source_ex02.py]]
#code(python){{
## License: Apache 2.0. See LICENSE file in root directory.
## Copyright(c) 2017 Intel Corporation. All Rights Reserved.
#20230506
#####################################################
## Align Depth to Color ##
#####################################################
# First import the library
import pyrealsense2 as rs
# Import Numpy for easy array manipulation
import numpy as np
# Import OpenCV for easy image rendering
import cv2
import copy
import socket
import threading
import sys
from tkinter import *
import tkinter as tk
from tkinter import scrolledtext
from PIL import Image, ImageTk, ImageOps
PORT = 9998
class Com:
def __init__(self,host):
self.host=host
self.connected=False
self.soc=None
self.handle_thread=None
self.message_window=None
def set_message_window(self,window):
self.message_window=window
def is_connected(sekf):
return self.connected
def connect(self):
print("re connect to host "+self.host)
try:
if self.soc !=None:
self.soc.connect((self.host,PORT))
else:
#socket.socket ... socket を作成し、socに代入
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#IPアドレスがhostのserver socketにsocを接続
self.soc.connect((self.host,PORT))
if self.handle_thread==None:
#受信担当の関数handlerを物threadでうごかすhandle_threadを生成。
self.handle_thread=threading.Thread(target=self.handler)
#handle_threadをstart
self.handle_thread.start()
self.connected=True
if self.message_window:
self.message_window.write_message("connected to "+self.host+":"+str(PORT))
except:
print("connect error.")
if self.message_window:
self.message_window.write_message("error when connect to "+self.host+":"+str(PORT))
self.connected=False
def send_command(self,command):
if self.soc!=None:
try:
self.soc.send(bytes(command,"utf-8"))
except KeyboardInterrupt:
if self.message_window:
self.message_window.write_message("error when sends "+command + " to "+self.host+":"+str(PORT))
self.soc.close()
exit()
else:
self.message_window.write_message("no socket when sends "+command + " to "+self.host+":"+str(PORT))
#受信の処理。送信threadとは別に、並行して処理を行う。
def handler(self):
print("at client, connected, start handler")
while True:
try:
data = self.soc.recv(1024)
line="[receive]- {}".format(data.decode("utf-8"))
print(line)
if self.message_window:
self.message_window.write_message("from "+self.host+":"+str(PORT)+", "+line)
except socket.error:
if self.message_window:
self.message_window.write_message(self.host+":"+str(PORT)+" closed")
self.soc.close()
self.soc=None
break
exit()
def close(self):
self.soc.close()
self.soc=None
class Coms:
def __init__(self):
self.coms=[None,None,None,None,None,None,None,None]
self.coms[0]=Com("192.168.13.21")
self.coms[1]=Com("192.168.13.19")
self.coms[2]=Com("192.168.13.20")
self.coms[3]=Com("192.168.13.22")
self.coms[4]=Com("192.168.13.12")
self.coms[5]=Com("192.168.13.14")
self.coms[6]=Com("192.168.13.15")
#self.coms[7]=Com("192.168.13.16")
self.coms[7]=Com("192.168.13.7")
#self.coms[7]=Com("192.168.1.22")
def connect_all(self):
for i in range(8):
try:
if self.coms[i]!=None:
print("connect to coms["+str(i)+"].. ")
self.coms[i].connect()
else:
print("coms["+str(i)+"] is None, connect, ")
except:
print("coms["+str(i)+"] connect error")
def set_message_window(self,window):
for i in range(8):
if self.coms[i]!=None:
self.coms[i].set_message_window(window)
else:
print("coms["+str(i)+"] is None, set_message_window, ")
def send_command_to_i(self,i,command):
if self.coms[i]!=None:
self.coms[i].send_command(command)
def send_command_to_all(self,command):
for i in range(8):
if self.coms[i]!=None:
self.coms[i].send_command(command)
def close_all(self):
for i in range(8):
if self.coms[i]!=None:
self.coms[i].close()
class Command_Panel:
def __init__(self,root,coms):
self.coms=coms
self.root=root
self.root.title("command panel")
self.root.configure(background="white")
self.root.geometry("810x1300")
self.command_field_for_ith_server=Entry(self.root,width=40)
self.command_field_for_ith_server.place(x=80,y=50)
self.ith_server_field=Entry(self.root,width=5)
self.ith_server_field.place(x=400,y=50)
self.ith_enter_button=Button(self.root,text="enter",bd=4,bg='white',command=self.enter_ith_command,width=8,relief=RIDGE)
self.ith_enter_button.place(x=500,y=50)
self.command_field_for_all_server=Entry(self.root,width=50)
self.command_field_for_all_server.place(x=80,y=90)
self.all_enter_button=Button(self.root,text="enter",bd=4,bg='white',command=self.enter_all_command,width=8,relief=RIDGE)
self.all_enter_button.place(x=500,y=90)
self.message_frame=tk.Frame(self.root,width=50,height=30)
self.message_frame.place(x=80,y=120)
self.message_area=scrolledtext.ScrolledText(self.message_frame)
self.message_area.pack()
self.connect_button=Button(self.root, text="connect",bd=4,bg='white',command=self.click_connect_button, width=8, relief=RIDGE)
self.connect_button.place(x=80,y=10)
self.near_depth_field=Entry(self.root,width=5)
self.near_depth_field.place(x=200,y=10)
self.near_depth_field.insert(tk.END,"0.1")
self.far_depth_field=Entry(self.root,width=5)
self.far_depth_field.place(x=300,y=10)
self.far_depth_field.insert(tk.END,"1.0")
self.sending=BooleanVar()
self.real_sense_sending_check=Checkbutton(self.root,variable=self.sending,text="real sense sending")
self.real_sense_sending_check.place(x=400,y=10)
self.sending.set(False)
self.bg_removed_canvas=tk.Canvas(self.root,width=320,height=240)
#self.bg_removed_canvas.bind('<Button-1>',self.bg_removed_canvas_click)
self.bg_removed_canvas.place(x=80,y=600)
self.depth_colormap_canvas=tk.Canvas(self.root,width=320,height=240)
#self.depth_colormap_canvas.bind('<Button-1>',self.bg_removed_canvas_click)
self.depth_colormap_canvas.place(x=400,y=600)
self.pic_panel_0_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_0_canvas.place(x=80,y=850)
self.pic_panel_1_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_1_canvas.place(x=240,y=850)
self.pic_panel_2_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_2_canvas.place(x=400,y=850)
self.pic_panel_3_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_3_canvas.place(x=560,y=850)
self.pic_panel_4_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_4_canvas.place(x=80,y=970)
self.pic_panel_5_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_5_canvas.place(x=240,y=970)
self.pic_panel_6_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_6_canvas.place(x=400,y=970)
self.pic_panel_7_canvas=tk.Canvas(self.root,width=160,height=120)
self.pic_panel_7_canvas.place(x=560,y=970)
def enter_ith_command(self):
command=self.command_field_for_ith_server.get()
ith_s=self.ith_server_field.get()
ith=int(ith_s)
print("send "+command+" to "+str(ith))
self.write_message("send "+command+" to "+str(ith))
self.coms.send_command_to_i(ith,command)
self.command_field_for_ith_server.delete(0,END)
def enter_all_command(self):
command=self.command_field_for_all_server.get()
print("send "+command+" to all")
self.write_message("send "+command+" to all")
self.coms.send_command_to_all(command)
self.command_field_for_all_server.delete(0,END)
def write_message(self, message):
self.message_area.insert(tk.END,message)
self.message_area.insert(tk.END,'\n')
def click_connect_button(self):
self.coms.connect_all()
class Get_Cross_Sections:
def __init__(self,coms,panel):
# Create a pipeline
self.pipeline = rs.pipeline()
self.coms=coms
self.panel=panel
# Create a config and configure the pipeline to stream
# different resolutions of color and depth streams
self.config = rs.config()
self.is_quitting=False
# Get device product line for setting a supporting resolution
self.pipeline_wrapper = rs.pipeline_wrapper(self.pipeline)
self.pipeline_profile = self.config.resolve(self.pipeline_wrapper)
self.device = self.pipeline_profile.get_device()
self.device_product_line = str(self.device.get_info(rs.camera_info.product_line))
self.found_rgb = False
for s in self.device.sensors:
if s.get_info(rs.camera_info.name) == 'RGB Camera':
self.found_rgb = True
break
self.oh=480
self.ow=640
if not self.found_rgb:
print("The demo requires Depth camera with Color sensor")
exit(0)
self.config.enable_stream(rs.stream.depth, self.ow, self.oh, rs.format.z16, 30)
if self.device_product_line == 'L500':
self.config.enable_stream(rs.stream.color, 960, 540, rs.format.bgr8, 30)
else:
self.config.enable_stream(rs.stream.color, self.ow, self.oh, rs.format.bgr8, 30)
# Start streaming
self.profile = self.pipeline.start(self.config)
# Getting the depth sensor's depth scale (see rs-align example for explanation)
self.depth_sensor = self.profile.get_device().first_depth_sensor()
self.depth_scale = self.depth_sensor.get_depth_scale()
print("Depth Scale is: " , self.depth_scale)
# We will be removing the background of objects more than
# clipping_distance_in_meters meters away
self.clipping_distance_in_meters = 1 #1 meter
self.streaming_thread=threading.Thread(target=self.streaming_loop)
self.streaming_thread.start()
def streaming(self):
# Create an align object
# rs.align allows us to perform alignment of depth frames to others frames
# The "align_to" is the stream type to which we plan to align depth frames.
self.align_to = rs.stream.color
self.align = rs.align(self.align_to)
#print("depth range near(m):")
#self.d_near=input()
self.d_near=float(self.panel.near_depth_field.get())
print("depth range far(m):")
#self.d_far=input()
self.d_far=float(self.panel.far_depth_field.get())
self.clipping_near = float(self.d_near) / self.depth_scale
print("depth range: ("+str(self.d_near)+" - "+str(self.d_far)+")")
self.ping_distance_in_meters = float(self.d_far) #1 meter
self.ping_distance = self.clipping_distance_in_meters / self.depth_scale
self.clipping_distance = self.clipping_distance_in_meters / self.depth_scale
# Get frameset of color and depth
frames = self.pipeline.wait_for_frames()
# frames.get_depth_frame() is a 640(ow)x360(oh) depth image
# Align the depth frame to color frame
aligned_frames = self.align.process(frames)
# Get aligned frames
aligned_depth_frame = aligned_frames.get_depth_frame() # aligned_depth_frame is a 640x480 depth image
color_frame = aligned_frames.get_color_frame()
# Validate that both frames are valid
if not aligned_depth_frame or not color_frame:
return #continue
depth_image = np.asanyarray(aligned_depth_frame.get_data())
color_image = np.asanyarray(color_frame.get_data())
# Remove background - Set pixels further than clipping_distance to grey
grey_color = 153
black_color = 0
depth_image_3d = np.dstack((depth_image,depth_image,depth_image)) #depth image is 1 channel, color is 3 channels
print("depth_image_3d[100,100]=",end="")
print(depth_image_3d[100,100])
self.bg_removed = np.where((depth_image_3d > self.clipping_distance) | (depth_image_3d <= self.clipping_near), black_color, color_image)
print("bg_removed[100,100]=",end="")
print(self.bg_removed[100,100])
# Render images:
# depth align to color on left
# depth on right
self.depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
print("bg_removed[0,0]=",end="")
print(self.bg_removed[0,0])
print("bg_removed.shape=",end="")
print(self.bg_removed.shape)
d_min=0
d_max=0
self.d_interval=(self.clipping_distance - self.clipping_near)/8
self.pic_plane=[[],[],[],[],[],[],[],[]]
first=0
xii=-1
for ii in range(0,8):
wa=copy.deepcopy(self.bg_removed)
self.pic_plane[ii]=np.zeros_like(wa)
#print("pic_plane["+str(ii)+"]=",end="")
#print(pic_plane[ii].shape)
for iw in range(0,self.ow):
for ih in range(0,self.oh):
dx=depth_image[ih,iw]
if d_min>dx:
d_min=dx
if d_max<dx:
d_max=dx
dxx=dx-self.clipping_near
pxx=dxx/self.d_interval
ipxx=int(pxx)
if 0<=ipxx and ipxx<=7:
self.pic_plane[ipxx][ih,iw]=self.bg_removed[ih,iw]
if self.panel.sending.get():
#
# sin 3D display
# w: 32.5cm, 16
# h: 50.0cm, 75
# z: 16.0cm, 8
#
# for i in each ii, for each ih,
# send "set4 ix(in ih) iy(in iw) r0,g0,b0 ... r3,g3,b3"
# to i's display's p
#
self.coms.send_command_to_all("clear plane")
for ii in range(8):
for ih in range(75):
#
# pic_plane's h:480(oh), 3d-display's h':75
# h=(480/75)*h'
#
h=(480/75)*(74-ih)
for iy in range(4):
#
# pic_plane's w:640->320, 3d-display's w w':16
# offset 160
# w=160+(320/16)*w'
#
#
# send "setpic4 ix iy r0 g0 b0 ... r3 g3 b3"
#
sending="setpic4 "+str(ih)+" "+str(iy*4)+" "
send_flag=False
for ipxx in range(4):
w=160+(320/16)*(iy*4+ipxx)
dx=depth_image[int(h),int(w)]
dxx=dx-self.clipping_near
pxx=dxx/self.d_interval
if int(pxx)==ii:
send_flag=True
print("h="+str(h)+",w="+str(w)+",pxx="+str(pxx))
print("color=",end="")
print(self.pic_plane[int(pxx)][int(h),int(w)])
print("color2=",end="")
print(self.bg_removed[int(h),int(w)])
ccxx=self.bg_removed[int(h),int(w)]
ccx=[0,0,0]
cw=ccxx[0]
ccx[0]=ccxx[2]
ccx[1]=ccxx[1]
ccx[2]=cw
else:
ccx=[0,0,0]
for cx in range(3):
sending=sending+str(ccx[cx])+" "
if send_flag:
self.panel.write_message("send "+sending+" to "+str(ii))
self.coms.send_command_to_i(ii,sending+'\n')
#else:
#self.panel.write_message("not send "+sending+" to "+str(ii))
self.coms.send_command_to_all("show plane")
print("d_min=",end="")
print(d_min)
print("d_max=",end="")
print(d_max)
if xii!=-1:
print("pic_plane["+str(xii)+"]["+str(xih)+","+str(xiw)+"]=",end="")
print(self.pic_plane[xii][xih,xiw])
print("bg_removed["+str(xih)+","+str(xiw)+"]=",end="")
print(self.bg_removed[xih,xiw])
else:
print("xii==-1")
self.display()
key = cv2.waitKey(1)
# Press esc or 'q' to close the image window
if key & 0xFF == ord('q') or key == 27:
cv2.destroyAllWindows()
self.is_quitting=True
def streaming_loop(self):
# Streaming loop
try:
while True:
print("start streaming.")
self.streaming()
if self.is_quitting:
return
finally:
print("streaming_thread,finally")
self.pipeline.stop()
def display_panel_to_canvas(self,panel,canvas):
'''画像をCanvasに表示する'''
# フレーム画像の取得
#ret, frame = self.capture.read()
# BGR→RGB変換
px = cv2.cvtColor(panel, cv2.COLOR_BGR2RGB)
# NumPyのndarrayからPillowのImageへ変換
pil_image = Image.fromarray(px)
# キャンバスのサイズを取得
canvas0_width = canvas.winfo_width()
canvas0_height = canvas.winfo_height()
# 画像のアスペクト比(縦横比)を崩さずに指定したサイズ(キャンバスのサイズ)全体に画像をリサイズする
pil_image = ImageOps.pad(pil_image, (canvas0_width, canvas0_height))
# PIL.ImageからPhotoImageへ変換する
photo_image = ImageTk.PhotoImage(image=pil_image)
self.imgs.append(photo_image)
# 画像の描画
canvas.create_image(
canvas0_width / 2, # 画像表示位置(Canvasの中心)
canvas0_height / 2,
image=photo_image # 表示画像データ
)
def display(self):
self.imgs=[]
self.display_panel_to_canvas(self.bg_removed,self.panel.bg_removed_canvas)
self.display_panel_to_canvas(self.depth_colormap,self.panel.depth_colormap_canvas)
self.display_panel_to_canvas(self.pic_plane[0],self.panel.pic_panel_0_canvas)
self.display_panel_to_canvas(self.pic_plane[1],self.panel.pic_panel_1_canvas)
self.display_panel_to_canvas(self.pic_plane[2],self.panel.pic_panel_2_canvas)
self.display_panel_to_canvas(self.pic_plane[3],self.panel.pic_panel_3_canvas)
self.display_panel_to_canvas(self.pic_plane[4],self.panel.pic_panel_4_canvas)
self.display_panel_to_canvas(self.pic_plane[5],self.panel.pic_panel_5_canvas)
self.display_panel_to_canvas(self.pic_plane[6],self.panel.pic_panel_6_canvas)
self.display_panel_to_canvas(self.pic_plane[7],self.panel.pic_panel_7_canvas)
# disp_image()を10msec後に実行する
#self.disp_id = self.panel.root.after(10, self.display)
if __name__=="__main__":
root=Tk()
coms=Coms()
p=Command_Panel(root,coms)
coms.set_message_window(p)
cs=Get_Cross_Sections(coms,p)
root.mainloop()
}}
ページ名: