Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

6.3 通信机制实现

进程间通信(IPC)

Dora提供多种IPC机制适应不同场景:

通信方式适用场景性能特点
共享内存大数据传输极高零拷贝,需同步机制
Unix域套接字控制指令低延迟,可靠
TCP/IP分布式节点跨机器通信
消息队列异步处理中高解耦生产消费

项目IPC机制:

# motor/main.py中的IPC处理
for event in node:
    if event["type"] == "INPUT" and event["id"] == "move":
        # 将pyarrow Array转换为MoveData对象
        data = event["value"]
        move_data = MoveData.from_arrow_array(data)
        # 控制电机
        car_controller.Control(move_data)

共享内存优化

共享内存传输实现零拷贝:

# untils/untils.py中的图像处理优化
def translate_image(data, metadata):
    np_array = data.to_numpy()
    if metadata["encoding"] in ["jpeg", "png"]:
        # 避免复制大型图像数据
        byte_data = np_array.tobytes()
        return cv2.imdecode(np.frombuffer(byte_data, np.uint8), cv2.IMREAD_COLOR)
    elif encoding in ["uint8"]:
        height = metadata["height"]
        width = metadata["width"]
        image = np_array.reshape((height, width))
    else:
        return None
    return image

分布式部署方案

Dora支持分布式部署,关键特性包括:

  1. 节点发现:基于mDNS自动发现局域网节点
  2. 数据路由:跨机器数据自动路由
  3. 负载均衡:动态分配计算任务
  4. 容错机制:节点故障自动恢复

在项目中支持分布式节点部署:

# control.py中的WebSocket通信
@socketio.on("control")
def handle_control(data):
    direction = data.get("direction")
    if direction in ["advance", "back", "turn_left", "turn_right"]:
        if global_flag:
            emit("response", {"status": "Moving " + direction})