wanshenmean
8 天以前 fd18eaba5e1c086a588509371f91310e7aafff9c
Code/WCS/WIDESEAWCS_Server/WIDESEAWCS_Tasks/RobotJob/RobotJob.cs
@@ -1,190 +1,251 @@
using Microsoft.Extensions.Logging;
using Quartz;
using WIDESEA_Core;
using WIDESEAWCS_Common.TaskEnum;
using WIDESEAWCS_Common;
using WIDESEAWCS_Core.Caches;
using WIDESEAWCS_Core.Helper;
using WIDESEAWCS_Core.LogHelper;
using WIDESEAWCS_ITaskInfoService;
using WIDESEAWCS_Model.Models;
using WIDESEAWCS_QuartzJob;
using WIDESEAWCS_Tasks.SocketServer;
using WIDESEAWCS_Tasks.Workflow;
using WIDESEAWCS_Tasks.Workflow.Abstractions;
namespace WIDESEAWCS_Tasks
{
    /// <summary>
    /// 机械手任务作业 - 负责协调机械手客户端连接、消息处理和任务执行
    /// 机器人任务作业(Quartz Job)- 负责调度与生命周期管理
    /// </summary>
    /// <remarks>
    /// Quartz 定时任务,每秒执行一次(默认),主要职责:
    /// 1. 从 JobDataMap 获取设备信息
    /// 2. 确保 TCP 客户端已连接并订阅消息
    /// 3. 轮询待处理的机器人任务
    /// 4. 调用工作流编排器执行任务状态机
    ///
    /// 使用 [DisallowConcurrentExecution] 禁止并发执行,确保同一设备的任务串行处理。
    /// 具体的状态机流程逻辑委托给 <see cref="RobotWorkflowOrchestrator"/> 处理。
    /// </remarks>
    [DisallowConcurrentExecution]
    public class RobotJob : IJob
    {
        private const int MaxTaskTotalNum = 48;
        /// <summary>
        /// 消息事件订阅标志
        /// </summary>
        /// <remarks>
        /// 使用原子操作确保全局只订阅一次 TCP 消息事件。
        /// 防止多个 Job 实例重复订阅导致消息被多次处理。
        /// </remarks>
        private static int _messageSubscribedFlag;
        /// <summary>
        /// 机械手客户端连接管理器
        /// </summary>
        /// <remarks>
        /// 负责管理 TCP 连接的生命周期,包括连接、断开、消息发送等。
        /// </remarks>
        private readonly RobotClientManager _clientManager;
        private readonly RobotStateManager _stateManager;
        private readonly RobotMessageHandler _messageHandler;
        private readonly RobotTaskProcessor _taskProcessor;
        private readonly IRobotTaskService _robotTaskService;
        /// <summary>
        /// 机械手状态管理器
        /// </summary>
        /// <remarks>
        /// 负责管理 Redis 缓存中的机械手状态,包括读写和并发控制。
        /// </remarks>
        private readonly RobotStateManager _stateManager;
        /// <summary>
        /// 消息路由器
        /// </summary>
        /// <remarks>
        /// 负责处理从 TCP 服务器接收到的消息,并分发给合适的处理器。
        /// 是消息处理管道的入口。
        /// </remarks>
        private readonly IRobotMessageRouter _messageRouter;
        /// <summary>
        /// 机器人任务处理器
        /// </summary>
        /// <remarks>
        /// 负责任务的下发、状态更新、与 WMS 的交互等。
        /// </remarks>
        private readonly RobotTaskProcessor _taskProcessor;
        /// <summary>
        /// 机器人工作流编排器
        /// </summary>
        /// <remarks>
        /// 负责执行任务的状态机流转,根据当前状态决定下一步动作。
        /// 这是核心的业务逻辑编排组件。
        /// </remarks>
        private readonly IRobotWorkflowOrchestrator _workflowOrchestrator;
        /// <summary>
        /// 日志记录器
        /// </summary>
        private readonly ILogger<RobotJob> _logger;
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <remarks>
        /// 采用依赖注入方式获取所需服务,并完成组件的初始化和组装。
        /// 这里体现了"控制反转"和"依赖注入"的设计原则。
        /// </remarks>
        /// <param name="tcpSocket">TCP Socket 服务器实例</param>
        /// <param name="robotTaskService">机器人任务服务</param>
        /// <param name="taskService">通用任务服务</param>
        /// <param name="cache">缓存服务</param>
        /// <param name="httpClientHelper">HTTP 客户端帮助类,用于调用 WMS 接口</param>
        /// <param name="logger">日志记录器</param>
        public RobotJob(
            TcpSocketServer tcpSocket,
            IRobotTaskService robotTaskService,
            ITaskService taskService,
            ICacheService cache,
            HttpClientHelper httpClientHelper)
            HttpClientHelper httpClientHelper,
            ILogger<RobotJob> logger)
        {
            _robotTaskService = robotTaskService;
            // 初始化状态管理器,传入缓存服务
            _stateManager = new RobotStateManager(cache, _logger);
            _logger = logger;
            // 初始化管理器
            _stateManager = new RobotStateManager(cache);
            _taskProcessor = new RobotTaskProcessor(tcpSocket, _stateManager, robotTaskService, taskService, httpClientHelper);
            _clientManager = new RobotClientManager(tcpSocket, _stateManager);
            _messageHandler = new RobotMessageHandler(tcpSocket, _stateManager, cache, robotTaskService, _taskProcessor);
            // 创建 Socket 网关,封装 TcpSocketServer 的访问
            // 后续替换通信实现时只需替换网关层
            ISocketClientGateway socketGateway = new SocketClientGateway(tcpSocket);
            // 订阅客户端管理器的事件
            // 初始化任务处理器
            _taskProcessor = new RobotTaskProcessor(socketGateway, _stateManager, robotTaskService, taskService, httpClientHelper, _logger);
            // 初始化客户端管理器
            _clientManager = new RobotClientManager(tcpSocket, _stateManager, _logger);
            // 初始化命令处理器
            // 简单命令处理器:处理状态更新等简单命令
            var simpleCommandHandler = new RobotSimpleCommandHandler(_taskProcessor, socketGateway);
            // 前缀命令处理器:处理 pickfinished、putfinished 等带参数的命令
            var prefixCommandHandler = new RobotPrefixCommandHandler(robotTaskService, _taskProcessor, _stateManager, socketGateway);
            // 初始化消息路由器
            _messageRouter = new RobotMessageHandler(socketGateway, _stateManager, cache, simpleCommandHandler, prefixCommandHandler, logger);
            // 初始化工作流编排器
            _workflowOrchestrator = new RobotWorkflowOrchestrator(_stateManager, _clientManager, _taskProcessor, robotTaskService, _logger);
            // 订阅客户端断开连接事件
            _clientManager.OnClientDisconnected += OnClientDisconnected;
            // 订阅TCP服务器的消息事件(全局只订阅一次)
            // 全局只订阅一次 TCP 消息事件(保持原有行为)
            // 使用 Interlocked.CompareExchange 实现原子操作
            if (System.Threading.Interlocked.CompareExchange(ref _messageSubscribedFlag, 1, 0) == 0)
            {
                tcpSocket.MessageReceived += _messageHandler.HandleMessageReceivedAsync;
                Console.WriteLine($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}] 机器人TCP消息事件已订阅");
                // 将消息路由器的处理方法绑定到 TCP 服务器的消息接收事件
                tcpSocket.MessageReceived += _messageRouter.HandleMessageReceivedAsync;
                _logger.LogError("机器手TCP消息事件已订阅");
                QuartzLogger.Error($"机器手TCP消息事件已订阅");
            }
        }
        /// <summary>
        /// 客户端断开连接时的处理
        /// 客户端断开连接的事件处理
        /// </summary>
        /// <remarks>
        /// 当客户端断开连接时记录日志,便于排查问题。
        /// </remarks>
        /// <param name="sender">事件发送者</param>
        /// <param name="state">断开连接的机械手状态</param>
        private void OnClientDisconnected(object? sender, RobotSocketState state)
        {
            // 可以在这里添加断开连接后的处理逻辑
            Console.WriteLine($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}] 客户端已断开连接: {state.IPAddress}");
            _logger.LogError("客户端已断开连接");
            QuartzLogger.Error($"客户端已断开连接", state.RobotCrane.DeviceName);
        }
        /// <summary>
        /// Quartz Job 的执行入口
        /// </summary>
        /// <remarks>
        /// 执行流程:
        /// 1. 从 JobDataMap 获取设备信息
        /// 2. 确保客户端已连接并订阅消息
        /// 3. 轮询待处理任务
        /// 4. 调用工作流编排器执行任务
        ///
        /// 注意:此方法可能频繁调用(每秒一次),需注意性能。
        /// </remarks>
        /// <param name="context">Quartz 作业执行上下文,包含作业详情和数据</param>
        public async Task Execute(IJobExecutionContext context)
        {
            // 从 JobDataMap 中获取作业参数
            bool flag = context.JobDetail.JobDataMap.TryGetValue("JobParams", out object? value);
            // 将参数转换为机器人设备信息
            RobotCraneDevice robotCrane = (RobotCraneDevice?)value ?? new RobotCraneDevice();
            // 如果没有获取到有效的设备信息,直接返回
            if (!flag || robotCrane.IsNullOrEmpty())
            {
                return;
            }
            // 获取设备 IP 地址,作为状态缓存的键
            string ipAddress = robotCrane.IPAddress;
            // 获取或创建状态
            // 获取或创建设备状态对象
            RobotSocketState state = _stateManager.GetOrCreateState(ipAddress, robotCrane);
            // 更新设备基础信息(以防设备信息在运行期间发生变化)
            state.RobotCrane = robotCrane;
            try
            {
                // 确保客户端已连接并订阅消息事件
                // 如果客户端未连接或订阅失败,直接返回等待下次调度
                if (!_clientManager.EnsureClientSubscribed(ipAddress, robotCrane))
                {
                    return; // 客户端未连接或订阅失败,跳过本次执行
                    return;
                }
                // 获取任务并处理
                Dt_RobotTask? task = _taskProcessor.GetTask(robotCrane);
                if (state.CurrentAction == "Picking" || state.CurrentAction == "Puting")
                {
                    return;
                }
                // 轮询获取该设备的待处理任务
                var task = _taskProcessor.GetTask(robotCrane);
                // 如果没有获取到待处理任务,且RobotArmObject为1(有物料),则获取该设备执行中的任务
                if (task == null && state.RobotArmObject == 1)
                {
                    task = _taskProcessor.GetExecutingTask(robotCrane);
                }
                // 如果有待处理任务
                if (task != null)
                {
                    // 每次判断前重新从缓存获取最新状态
                    // 获取最新的设备状态
                    var latestState = _stateManager.GetState(ipAddress);
                    if (latestState == null) return;
                    if (latestState.RobotTaskTotalNum < MaxTaskTotalNum)
                    if (latestState == null)
                    {
                        await ProcessTaskAsync(latestState, task, ipAddress);
                        // 状态不存在,可能设备未初始化
                        return;
                    }
                    // 检查任务总数是否未达到上限
                    if (latestState.RobotTaskTotalNum < RobotConst.MaxTaskTotalNum)
                    {
                        // 调用工作流编排器执行任务
                        // 编排器会根据当前状态决定下一步动作
                        await _workflowOrchestrator.ExecuteAsync(latestState, task, ipAddress);
                    }
                }
            }
            catch (Exception)
            catch (Exception ex)
            {
                // 异常处理已在各组件中处理
            }
        }
        /// <summary>
        /// 处理机械手任务
        /// </summary>
        private async Task ProcessTaskAsync(RobotSocketState latestState, Dt_RobotTask task, string ipAddress)
        {
            // 处理正在执行的任务
            if (latestState.RobotRunMode == 2 && latestState.RobotControlMode == 1 && latestState.OperStatus != "Running")
            {
                // 取货完成状态处理
                if ((latestState.CurrentAction == "PickFinished" || latestState.CurrentAction == "AllPickFinished") && latestState.RobotArmObject == 1 &&
                    task.RobotTaskState == TaskRobotStatusEnum.RobotPickFinish.GetHashCode())
                {
                    await HandlePickFinishedStateAsync(latestState, task, ipAddress);
                }
                // 放货完成状态处理
                else if ((latestState.CurrentAction == "PutFinished" || latestState.CurrentAction == "AllPutFinished") && latestState.OperStatus == "Homed" &&
                    latestState.RobotArmObject == 0 &&
                    (task.RobotTaskState == TaskRobotStatusEnum.RobotPutFinish.GetHashCode() ||
                    task.RobotTaskState != TaskRobotStatusEnum.RobotExecuting.GetHashCode()))
                {
                    await HandlePutFinishedStateAsync(latestState, task, ipAddress);
                }
            }
        }
        /// <summary>
        /// 处理取货完成状态
        /// </summary>
        private async Task HandlePickFinishedStateAsync(RobotSocketState latestState, Dt_RobotTask task, string ipAddress)
        {
            string taskString = $"Putbattery,{task.RobotTargetAddress}";
            bool result = await _clientManager.SendToClientAsync(ipAddress, taskString);
            if (result)
            {
                task.RobotTaskState = TaskRobotStatusEnum.RobotExecuting.GetHashCode();
                // 重新获取最新状态并更新
                var stateToUpdate = _stateManager.GetState(ipAddress);
                if (stateToUpdate != null)
                {
                    stateToUpdate.CurrentTask = task;
                    if (_stateManager.TryUpdateStateSafely(ipAddress, stateToUpdate))
                        await _robotTaskService.UpdateRobotTaskAsync(task);
                }
            }
        }
        /// <summary>
        /// 处理放货完成状态
        /// </summary>
        private async Task HandlePutFinishedStateAsync(RobotSocketState latestState, Dt_RobotTask task, string ipAddress)
        {
            // 重新获取最新状态
            var stateForUpdate = _stateManager.GetState(ipAddress);
            if (stateForUpdate == null) return;
            if (!stateForUpdate.IsSplitPallet && !stateForUpdate.IsGroupPallet)
            {
                stateForUpdate.IsSplitPallet = task.RobotTaskType == RobotTaskTypeEnum.SplitPallet.GetHashCode();
                stateForUpdate.IsGroupPallet = task.RobotTaskType == RobotTaskTypeEnum.GroupPallet.GetHashCode() ||
                    task.RobotTaskType == RobotTaskTypeEnum.ChangePallet.GetHashCode();
            }
            if (task.RobotTaskType == RobotTaskTypeEnum.GroupPallet.GetHashCode())
            {
                string prefix = "TRAY";
                // 生成两个托盘条码
                string trayBarcode1 = RobotBarcodeGenerator.GenerateTrayBarcode(prefix);
                string trayBarcode2 = RobotBarcodeGenerator.GenerateTrayBarcode(prefix);
                if (!trayBarcode1.IsNullOrEmpty() && !trayBarcode2.IsNullOrEmpty())
                {
                    stateForUpdate.CellBarcode.Add(trayBarcode1);
                    stateForUpdate.CellBarcode.Add(trayBarcode2);
                    await _taskProcessor.SendSocketRobotPickAsync(task, stateForUpdate);
                }
            }
            else // 任务开始执行直接发送取货地址
            {
                await _taskProcessor.SendSocketRobotPickAsync(task, stateForUpdate);
                // 异常处理已在组件内部进行,Job 层保持兜底语义
                // 记录异常而不是静默吞掉,便于排查问题
                _logger?.LogError(ex, "RobotJob执行异常,IP: {IpAddress}", ipAddress);
                QuartzLogger.Error($"RobotJob执行异常,IP: {ipAddress}", state.RobotCrane.DeviceName, ex);
            }
        }
    }
}
}