6.4 框架应用实践

节点开发规范

Dora节点开发遵循统一接口:

class CustomNode:
    def __init__(self, config):
        """初始化配置"""
        self.config = config
    
    def on_event(self, event):
        """事件处理入口"""
        if event['type'] == 'INPUT':
            self.handle_input(event)
        elif event['type'] == 'TIMER':
            self.handle_timer(event)
    
    def handle_input(self, event):
        """处理输入数据"""
        data = event['data']
        # 处理逻辑...
        self.send_output('result', processed_data)
    
    def destroy(self):
        """资源清理"""
        pass

数据流拓扑构建

Dora使用YAML定义数据流拓扑:

# car_cv.yaml
nodes:
  - id: video_capture
    path: nodes/video.py
    inputs:
      tick: dora/timer/millis/33  # 30FPS
    outputs:
      - image
      
  - id: ball_detector
    path: nodes/detector.py
    inputs:
      image: video_capture/image
    outputs:
      - position
      
  - id: controller
    path: nodes/controller.py
    inputs:
      position: ball_detector/position
    outputs:
      - motor_cmd

性能优化技巧

  1. 批处理优化:合并小数据包减少通信开销

    def on_event(self, event):
        if event['type'] == 'TIMER':
            # 定时批量处理
            self.process_batch()
    
  2. 数据压缩:对图像等大数据启用压缩

    self.send_output('image', frame, compress='jpeg', quality=80)
    
  3. 零拷贝共享:使用共享内存传递大数据

    # 创建共享内存缓冲区
    shm = dora.create_shared_buffer('video_frame', 1024 * 1024)
    
    # 写入数据
    with shm.write_lock() as buffer:
        buffer[:len(frame)] = frame
    
  4. 异步处理:非关键任务异步执行

    async def process_frame_async(frame):
        # 异步处理
        result = await heavy_computation(frame)
        return result
    

小结

Dora-RS框架是网球捡拾小车系统的核心基础设施。本章详细解析了其数据流驱动的架构设计、核心组件实现原理、高效通信机制以及实际应用技巧。通过Dora框架,我们实现了:

  • 视觉采集、识别、控制模块的解耦
  • 系统资源的高效利用
  • 实时性能的保障
  • 系统的可扩展性和可维护性