问题3及解决
-
存在问题03-识别耗时太长
这个方案中存在的问题就是YOLO执行速度较慢,在电脑端测的时候还感受不出来,但是在飞腾派开发板测试时候差距比较明显。不算模型记载时间,如果电脑端识别一张图片用时90ms左右时,则在飞腾派识别一张图片要用时900ms左右。这个延迟已经严重影响到飞腾派对小车进行实时控制了,例如在0ms时上一张图片识别结果出来,对小车发送控制指令并拍摄新图片进行识别,900ms后新图片识别结果出来,对小车发出新的控制指令,但是这个指令对应的状态应该是900ms前的状态,900ms后小车可能已经移动到了新的位置,因此再执行这个指令就不准确了,表现就是小车在一个球前一直左右摇摆。
-
改进方案03
-
降低小车移动速度
这就是比较简单的适配飞腾派识别速度的方案,但是实际测试下来效果并不理想,速度太慢时候有的轮子可能会转不动,并且测试不同速度也并不能适配YOLO的识别速度,因此这个方案不可行。
-
换香橙派开发板
香橙派开发板具有NPU,并且四个核的性能也比飞腾派要高,飞腾派也是四核,但是是两个大核两个小核,性能并不是很好。换用香橙派之后,在并未启用NPU时候识别速度可以达到500ms左右,但是接上小车之后,这个速度还是有点慢,依然存在小车左右摇摆的情况。
Orange Pi AIpro(20T)硬件规格参数
飞腾派 硬件规格书 模块规格 -
多进程调度优化
主要思路是根据开发板有的CPU核心数开启多个子进程,每个进程强制使用不同的核心,这样进程的推理不会互相抢占核心。这样可以用调度程序来干预进程之间识别的时间间隔,假如进程1和进程2之间开始识别相差200ms,且识别一张图片速度是一样的,这样就可以得到进程1识别结果后的200ms就可以得到进程2的识别结果,就可以让前后两次识别的时间间隔控制在
200ms
。先说结论,还是存在问题,假如说识别一张图的时间是500ms,那么得到第一张图的结果是在拍摄之后的500ms,因此就算图与图之间的识别间隔缩短了,但是得到这张图对应的结果及小车的控制指令还是在500ms之后,所以实验下来依然存在左右摇摆的情况。
下面是关于多进程调度的开发过程
多进程的开发目前先在飞腾派上做实验。
首先在使用多进程前,需要先看一下在没有强制一个进程使用一个核的时候,每个核的使用率情况。因为开发板是
Linux
系统,因此可以使用htop
命令查看CPU核心情况。这是
htop
指令的界面,下面是正在运行的进程信息,最上面几行标有“0”、“1”,“2”,“3”的就是CPU核心的使用率,下面还有内存的使用率“Mem”。可以看到没运行程序的时候使用率还是比较低的。下面这张就是还没有强制进程使用固定核心的使用率。
从使用率可以看到,进程是四个核都使用了,因为是只有一个进程,所以使用率还没有占满,从旁边的运行时间可以看到大约是900ms识别一张图。
接着测试一下只是用一个核心的使用率及识别速度。这里可以使用
python
的psutil
库。pip install psutil
import os import psutil # 强制进程使用的cpu核心 # 通过pid获取操作进程的权柄 p=psutil.Process(os.getpid()) # 指定进程可以使用的CPU核心序号 p.cpu_affinity([0])
下面是使用0号核心的使用率及识别速度。
下面是使用1号核心的使用率及识别速度。
下面是使用2号核心的使用率及识别速度。
下面是使用3号核心的使用率及识别速度。
可以看到单用一个核心的情况下,使用率都是拉满的。但是值得注意的是,在2、3核心上的识别时间要比0、1核心上的识别时间短。这就说明2、3核心是所谓的大核,而0、1核心是小核。大小核性能差距还是比较大的,大核上时间大约是1.6秒,小核上时间大约是4秒。
以上是CPU核心的测试结果,下面开始开发多进程的使用及调度。
需要用到
multiprocessing
库,这个库在Python3.4
以上版本是自带的,因此无需安装。import multiprocessing def func(): print('多进程开启') p=multiprocessing.Process(target=func, args=()) p.start()
多进程的开启方法和线程不太一样,不需要继承库中的类,而是需要直接写好一个函数,在
multiprocessing.Process
方法中传入这个函数及需要的参数,target
即函数名;args
是需要的参数,类型是元组类型。因此只需要把YOLO识别部分的代码写到一个函数中就可以了。但是这个时候还涉及到一个问题,怎么传入需要识别的图像,首先想到的是在外部定义一个全局变量,然后主进程获取到图像后直接赋值给这个变量,子进程就在函数内直接访问。下面是一个例子,先说结论,这是错误的。
frame = None data = None def detect(): global frame global data # 先假定下面这个函数是封装的YOLO识别函数 data = process_img(frame) # 先使用上面说到的多线程获取摄像头图像的方法来获取最新图像 frame = cap.getFrame() p = multiprocessing.Process(target = detect, args = ()) p.start()
报错原因是进程与进程之间无法共享内存,查阅了一下找到了
multiprocessing
库提供进程间共享数据的方法。import multiprocessing # 创建一个管理器对象,该对象可以创建能在多个进程间共享的列表和字典 manger=multiprocessing.Manager() # 创建一个共享的字典对象 shared_dict = manger.dict() # 初始化两个需要用到的元素 shared_dict['frame'] = None shared_dict['data'] = None # 但是还是不能直接在进程的函数中直接使用这个对象,需要通过函数的参数传入 def detect(sd): # 先假定下面这个函数是封装的YOLO识别函数 sd['data'] = process_img(sd['frame']) # 使用前面定义过的管理摄像头的线程获取当前图像 frame = cap.getFrame() p = multiprocessing.Process(target = detect, args = (shared_dict)) p.start()
测试以上代码后,是可以正常运行的。那么下面就可以开始开发多进程代码了。
def detect(sd, process_id, cpu_core): # 这里有必要说一下为什么在这里才引用识别的库 # 因为识别的库process.py是自己写的,所以是在项目的目录下存放的 # 之前也和其他库一样放在开头,但是运行到这报错,大概意思也是多进程无法调用 # 因此把这个自己的库就放到了多进程中去引用,相当于每个进程都需要引用一次 from src.process import process_img # 强制当前进程使用固定的CPU核心 process=psutil.Process(os.getpid()) process.cpu_affinity([cpu_core]) print('start') # 共享字典中的run元素是布尔类型,用来标志是否需要结束 # 方便主进程优雅的结束子进程 while sd['run']: # 在共享字典中根据每个进程的id储存图像帧,如sd['frame0'] # 因为每个元素初始化时候为None,为防止主进程还没有加载好摄像头 # 因此这里用循环判断去等待元素不为None while sd['frame'+str(process_id)] is None: # 等待一段时间,防止访问过快 time.sleep(0.009) frame = sd['frame'+str(process_id)] # 将当前子进程对应的帧在共享字典中清空 # 用于子进程判断下一帧是不是新的,对应前面那个while循环 sd['frame'+str(process_id)] = None # 如果当前要处理的帧是空,则退出循环,也可以用continue跳过当前循环 if frame is None: break data = process_img(frame) # 可以对帧进行处理用于显示,这里为了不占用处理图像的时间就注释了 # 也可以把这个放在主进程处理 # for v in data: # cv2.rectangle(frame, (v[0], v[1]), (v[0] + v[2], v[1] + v[3]), (0, 255, 0), 2) # 把处理过后需要显示的帧存入子进程对应的process元素,如sd['process0'] sd['process'+str(process_id)] = frame # 同理在对应的data元素存入识别后的数据,如sd['data0'] sd['data'+str(process_id)] = data # 用于测试子进程是否结束 print('end',process_id)
以上是子进程用于获取各自需要识别的帧,以及返回各自对应的结果的函数,每个子进程单独存放需要识别的帧以及识别的结果,是防止两个进程识别过快时,后一个进程将前一个的结果覆盖,因此分开存放。下面需要对主进程的调度进行开发。
manger=multiprocessing.Manager() shared_dict=manger.dict() # 其他有序号的元素在新建子进程时候一起初始化 shared_dict['run']=True shared_dict['frame']=None # 用于显示当前画面的帧,因为可能有图像处理,因此显示帧单独存放一个元素 shared_dict['process'] = None class capThread(threading.Thread): #继承父类threading.Thread def __init__(self): threading.Thread.__init__(self) self.frame=None self.runFlag=True def getFrame(self): return self.frame def stop_run(self): self.runFlag=False def run(self): cap = cv2.VideoCapture(0) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M','J','P','G')) if not cap.isOpened(): print("无法打开摄像头") exit() # 给多进程用的结束标志这里也可以用上,两种方法都可以结束这个线程 while self.runFlag and shared_dict['run']: ret, frame = cap.read() # 如果获取摄像头图像失败,则结束所有进程 if not ret: print("读取摄像头画面失败") self.runFlag = False shared_dict['run']=False # 图像翻转,根据情况使用 self.frame = cv2.flip(frame, -1) # 这里也在共享字典中加一项用于储存摄像头当前的最新图像帧 shared_dict['frame']=self.frame # 当显示帧元素不为空,就进行展示 # 因为cv2.imshow函数需要和开启摄像头在同一个线程中,因此在这里进行显示 if shared_dict['process'] is not None: cv2.imshow('process',shared_dict['process']) time.sleep(0.050) print('#',end='') if cv2.waitKey(1) & 0xFF == ord("q"): shared_dict['run']=False self.runFlag = False break # 用于获取当前时间戳,毫秒级的 def now(): return time.time()*1000 # 两个变量,用于计算识别一次的时间间隔 last_time = int(now()) # micro second now_time = int(now()) # 新建摄像头管理的多线程对象,并启动 cap = capThread() cap.start() # 等待摄像头初始化,并存入第一帧图像 while shared_dict['frame'] is None: time.sleep(0.05) # 用于记录总时间 count_time = 0 # 用于记录最长识别时间 max_time = 0 # 用于记录最短识别时间 min_time = now() # 用于记录识别次数 count = 0 # 强制主线程使用的cpu核心 # 因为飞腾派的0,1核是小核,因此让主线程使用0,1核 p=psutil.Process(os.getpid()) p.cpu_affinity([0,1]) # 用于存放子进程对象 process_list=[] # 用于标记每个进程使用的CPU核序号,也是根据这个决定开启的子进程数量的 # 因为飞腾派小核性能差距很大,因此这里不使用0,1小核,使用2,3大核 # 而一个大核上可以多开几个进程,但是测试下来每个核两个进程和三个进程速度差不多 cpu_core_list = [2,2,3,3] # 子进程数量 processNum = len(cpu_core_list)#3 # 这个是希望子进程识别间隔的时间,单位是毫秒ms,需要是float类型,后面用到再讲 gip_time = 200.0 # 子进程初始化 for v in range(processNum): # 初始化子进程需要用到的带序号的元素 shared_dict['frame'+str(v)]=None shared_dict['data'+str(v)]=None # 建立子进程对象,传入需要的参数:共享字典,进程序号,使用CPU核心序号 p=multiprocessing.Process(target=detect, args=(shared_dict, v, cpu_core_list[v])) p.start() # 传入当前帧,子进程将进行第一次识别 # 这里copy函数是拷贝图像到一个新的地址空间 # 是为了防止frame元素更新时候不更新地址,只更新地址内的图像 shared_dict['frame'+str(v)] = shared_dict['frame'].copy() # 添加到子进程列表,用于后面等待子进程结束时使用 process_list.append(p) # 将这时的时间戳最为第一个识别的开始 last_time = int(now()) # 这里的循环是为了等待所有进程初始化 # 因为第一次运行时加载模型时间一般是比较长的,并且时间不是很固定 # 因此先等待所有进程都完成第一次识别,后续的识别时间差距就比较小了 for v in range(processNum): while shared_dict['data'+str(v)] is None: time.sleep(0.0009) shared_dict['data'+str(v)] = None # 这里循环就是开始第一次识别传入图像 for v in range(processNum): shared_dict['frame'+str(v)] = shared_dict['frame'].copy() # 根据希望子进程间隔时间进行等待 # 因为是第一次启动子进程,因此需要人工等待一段时间来实现进程与进程间识别的间隔 time.sleep(gip_time / 1000.0) last_time = int(now()) while shared_dict['run']: # 将所有进程循环一遍作为一轮循环,也是为了控制各进程的先后顺序 for v in range(processNum): while shared_dict['data'+str(v)] is None: time.sleep(0.0009) data=shared_dict['data'+str(v)] # 记录当前完成识别的时间 now_time = int(now()) run_time = now_time - last_time print(v,' :',run_time,'ms') last_time=now_time count_time += run_time if run_time > max_time: max_time = run_time if run_time < min_time: min_time = run_time count += 1 ''' 这里主要讲识别的优化,因此机器人的控制逻辑这里就略过了 ''' execute_cmd(cmd) # 如果识别时间短于希望的间隔时间,则根据差值进行等待 # 这里就是人工进行调度上的干预 if run_time < gip_time: print('wait ',(gip_time*1.0-run_time)/1000.0) time.sleep((gip_time*1.0-run_time)/1000.0) shared_dict['frame'+str(v)] = shared_dict['frame'] shared_dict['data'+str(v)] = None # 取出要显示的图像帧,并进行处理 image = shared_dict['process'+str(v)] for vv in data: cv2.rectangle(image, (vv[0], vv[1]), (vv[0] + vv[2], vv[1] + vv[3]), (0, 0, 255), 2) n=int(now()) cv2.imwrite('./frame/'+str(n)+'.jpg',image) # 存入摄像头管理线程要显示的图像帧中 shared_dict['process'] = image # 结束运行,停止所有机器人活动 execute_cmd('stop') # 等待摄像头管理线程的运行结束 capThread.join() # 等待所有进程运行结束 for v in process_list: v.join() # 输出时间信息 print('avg time: ','%.2f'%(count_time/count),'ms') print('max time: ','%.2f'%max_time,'ms') print('min time: ','%.2f'%min_time,'ms')
另外就是如果不使用
cv2.imshow()
进行图像显示,这个时候就没法通过键盘按q
进行退出了,可以控制台输入Ctrl + C
结束,但实际使用时候,需要多次Ctrl + C
才能结束,并且有时候无法彻底结束,也不会输出最后的时间统计信息,因此可以使用对Ctrl + C
进行拦截,替换成自己的结束函数。需要使用自带库signal
,用法如下。import signal # 定义自己的结束函数,这里的参数应该是signal规定的,照抄即可 def signal_handler(signal, frame): global shared_dict print ('\nSignal Catched! You have just type Ctrl+C!') # 使用共享字典中的运行标志来结束所有进程及线程 shared_dict['run']=False # 将Ctrl + C时发出的结束信号对应的处理函数替换为自己的 # Ctrl + C实际时系统对python运行的进程发出了一个结束的信号 # 因此这里替换的是收到对应信号时候的处理函数 signal.signal(signal.SIGINT, signal_handler)
-