using Microsoft.Extensions.Logging; using Quartz; using WIDESEA_Core; using WIDESEAWCS_Common; using WIDESEAWCS_Core.Caches; using WIDESEAWCS_Core.Helper; using WIDESEAWCS_Core.LogHelper; using WIDESEAWCS_ITaskInfoService; using WIDESEAWCS_QuartzJob; using WIDESEAWCS_Tasks.SocketServer; using WIDESEAWCS_Tasks.Workflow; using WIDESEAWCS_Tasks.Workflow.Abstractions; namespace WIDESEAWCS_Tasks { /// /// 机器人任务作业(Quartz Job)- 负责调度与生命周期管理 /// /// /// Quartz 定时任务,每秒执行一次(默认),主要职责: /// 1. 从 JobDataMap 获取设备信息 /// 2. 确保 TCP 客户端已连接并订阅消息 /// 3. 轮询待处理的机器人任务 /// 4. 调用工作流编排器执行任务状态机 /// /// 使用 [DisallowConcurrentExecution] 禁止并发执行,确保同一设备的任务串行处理。 /// 具体的状态机流程逻辑委托给 处理。 /// [DisallowConcurrentExecution] public class RobotJob : IJob { /// /// 消息事件订阅标志 /// /// /// 使用原子操作确保全局只订阅一次 TCP 消息事件。 /// 防止多个 Job 实例重复订阅导致消息被多次处理。 /// private static int _messageSubscribedFlag; /// /// 机械手客户端连接管理器 /// /// /// 负责管理 TCP 连接的生命周期,包括连接、断开、消息发送等。 /// private readonly RobotClientManager _clientManager; /// /// 机械手状态管理器 /// /// /// 负责管理 Redis 缓存中的机械手状态,包括读写和并发控制。 /// private readonly RobotStateManager _stateManager; /// /// 消息路由器 /// /// /// 负责处理从 TCP 服务器接收到的消息,并分发给合适的处理器。 /// 是消息处理管道的入口。 /// private readonly IRobotMessageRouter _messageRouter; /// /// 机器人任务处理器 /// /// /// 负责任务的下发、状态更新、与 WMS 的交互等。 /// private readonly RobotTaskProcessor _taskProcessor; /// /// 机器人工作流编排器 /// /// /// 负责执行任务的状态机流转,根据当前状态决定下一步动作。 /// 这是核心的业务逻辑编排组件。 /// private readonly IRobotWorkflowOrchestrator _workflowOrchestrator; /// /// 日志记录器 /// private readonly ILogger _logger; /// /// 构造函数 /// /// /// 采用依赖注入方式获取所需服务,并完成组件的初始化和组装。 /// 这里体现了"控制反转"和"依赖注入"的设计原则。 /// /// TCP Socket 服务器实例 /// 机器人任务服务 /// 通用任务服务 /// 缓存服务 /// HTTP 客户端帮助类,用于调用 WMS 接口 /// 日志记录器 public RobotJob( TcpSocketServer tcpSocket, IRobotTaskService robotTaskService, ITaskService taskService, ICacheService cache, HttpClientHelper httpClientHelper, ILogger logger) { // 初始化状态管理器,传入缓存服务 _stateManager = new RobotStateManager(cache, _logger); _logger = logger; // 创建 Socket 网关,封装 TcpSocketServer 的访问 // 后续替换通信实现时只需替换网关层 ISocketClientGateway socketGateway = new SocketClientGateway(tcpSocket); // 初始化任务处理器 _taskProcessor = new RobotTaskProcessor(socketGateway, _stateManager, robotTaskService, taskService, httpClientHelper, _logger); // 初始化客户端管理器 _clientManager = new RobotClientManager(tcpSocket, _stateManager, _logger); // 初始化命令处理器 // 简单命令处理器:处理状态更新等简单命令 var simpleCommandHandler = new RobotSimpleCommandHandler(_taskProcessor, socketGateway); // 前缀命令处理器:处理 pickfinished、putfinished 等带参数的命令 var prefixCommandHandler = new RobotPrefixCommandHandler(robotTaskService, _taskProcessor, _stateManager, socketGateway); // 初始化消息路由器 _messageRouter = new RobotMessageHandler(socketGateway, _stateManager, cache, simpleCommandHandler, prefixCommandHandler, logger); // 初始化工作流编排器 _workflowOrchestrator = new RobotWorkflowOrchestrator(_stateManager, _clientManager, _taskProcessor, robotTaskService, _logger); // 订阅客户端断开连接事件 _clientManager.OnClientDisconnected += OnClientDisconnected; // 全局只订阅一次 TCP 消息事件(保持原有行为) // 使用 Interlocked.CompareExchange 实现原子操作 if (System.Threading.Interlocked.CompareExchange(ref _messageSubscribedFlag, 1, 0) == 0) { // 将消息路由器的处理方法绑定到 TCP 服务器的消息接收事件 tcpSocket.MessageReceived += _messageRouter.HandleMessageReceivedAsync; _logger.LogError("机器手TCP消息事件已订阅"); QuartzLogger.Error($"机器手TCP消息事件已订阅"); } } /// /// 客户端断开连接的事件处理 /// /// /// 当客户端断开连接时记录日志,便于排查问题。 /// /// 事件发送者 /// 断开连接的机械手状态 private void OnClientDisconnected(object? sender, RobotSocketState state) { _logger.LogError("客户端已断开连接"); QuartzLogger.Error($"客户端已断开连接", state.RobotCrane.DeviceName); } /// /// Quartz Job 的执行入口 /// /// /// 执行流程: /// 1. 从 JobDataMap 获取设备信息 /// 2. 确保客户端已连接并订阅消息 /// 3. 轮询待处理任务 /// 4. 调用工作流编排器执行任务 /// /// 注意:此方法可能频繁调用(每秒一次),需注意性能。 /// /// Quartz 作业执行上下文,包含作业详情和数据 public async Task Execute(IJobExecutionContext context) { // 从 JobDataMap 中获取作业参数 bool flag = context.JobDetail.JobDataMap.TryGetValue("JobParams", out object? value); // 将参数转换为机器人设备信息 RobotCraneDevice robotCrane = (RobotCraneDevice?)value ?? new RobotCraneDevice(); // 如果没有获取到有效的设备信息,直接返回 if (!flag || robotCrane.IsNullOrEmpty()) { return; } // 获取设备 IP 地址,作为状态缓存的键 string ipAddress = robotCrane.IPAddress; // 获取或创建设备状态对象 RobotSocketState state = _stateManager.GetOrCreateState(ipAddress, robotCrane); // 更新设备基础信息(以防设备信息在运行期间发生变化) state.RobotCrane = robotCrane; try { // 确保客户端已连接并订阅消息事件 // 如果客户端未连接或订阅失败,直接返回等待下次调度 if (!_clientManager.EnsureClientSubscribed(ipAddress, robotCrane)) { return; } if (state.CurrentAction == "Picking" || state.CurrentAction == "Puting") { return; } // 轮询获取该设备的待处理任务 var task = _taskProcessor.GetTask(robotCrane); // 如果没有获取到待处理任务,且RobotArmObject为1(有物料),则获取该设备执行中的任务 if (task == null && state.RobotArmObject == 1) { task = _taskProcessor.GetExecutingTask(robotCrane); } // 如果有待处理任务 if (task != null) { // 获取最新的设备状态 var latestState = _stateManager.GetState(ipAddress); if (latestState == null) { // 状态不存在,可能设备未初始化 return; } // 检查任务总数是否未达到上限 if (latestState.RobotTaskTotalNum < RobotConst.MaxTaskTotalNum) { // 调用工作流编排器执行任务 // 编排器会根据当前状态决定下一步动作 await _workflowOrchestrator.ExecuteAsync(latestState, task, ipAddress); } } } catch (Exception ex) { // 异常处理已在组件内部进行,Job 层保持兜底语义 // 记录异常而不是静默吞掉,便于排查问题 _logger?.LogError(ex, "RobotJob执行异常,IP: {IpAddress}", ipAddress); QuartzLogger.Error($"RobotJob执行异常,IP: {ipAddress}", state.RobotCrane.DeviceName, ex); } } } }