using Microsoft.Extensions.Logging;
|
using Quartz;
|
using System.Net;
|
using WIDESEA_Core;
|
using WIDESEAWCS_Core.Caches;
|
using WIDESEAWCS_Core.Helper;
|
using WIDESEAWCS_Core.LogHelper;
|
using WIDESEAWCS_ITaskInfoService;
|
using WIDESEAWCS_QuartzJob;
|
using WIDESEAWCS_Tasks.SocketServer;
|
using WIDESEAWCS_Tasks.Workflow;
|
using WIDESEAWCS_Tasks.Workflow.Abstractions;
|
|
namespace WIDESEAWCS_Tasks
|
{
|
/// <summary>
|
/// 机器人任务作业(Quartz Job)- 负责调度与生命周期管理
|
/// </summary>
|
/// <remarks>
|
/// Quartz 定时任务,每秒执行一次(默认),主要职责:
|
/// 1. 从 JobDataMap 获取设备信息
|
/// 2. 确保 TCP 客户端已连接并订阅消息
|
/// 3. 轮询待处理的机器人任务
|
/// 4. 调用工作流编排器执行任务状态机
|
///
|
/// 使用 [DisallowConcurrentExecution] 禁止并发执行,确保同一设备的任务串行处理。
|
/// 具体的状态机流程逻辑委托给 <see cref="RobotWorkflowOrchestrator"/> 处理。
|
/// </remarks>
|
[DisallowConcurrentExecution]
|
public class RobotJob : IJob
|
{
|
/// <summary>
|
/// 任务总数上限
|
/// </summary>
|
/// <remarks>
|
/// 当机器人处理的货物数量达到此上限时,不再下发新任务。
|
/// 防止机器人过度劳累或系统过载。
|
/// </remarks>
|
private const int MaxTaskTotalNum = 48;
|
|
/// <summary>
|
/// 消息事件订阅标志
|
/// </summary>
|
/// <remarks>
|
/// 使用原子操作确保全局只订阅一次 TCP 消息事件。
|
/// 防止多个 Job 实例重复订阅导致消息被多次处理。
|
/// </remarks>
|
private static int _messageSubscribedFlag;
|
|
/// <summary>
|
/// 机械手客户端连接管理器
|
/// </summary>
|
/// <remarks>
|
/// 负责管理 TCP 连接的生命周期,包括连接、断开、消息发送等。
|
/// </remarks>
|
private readonly RobotClientManager _clientManager;
|
|
/// <summary>
|
/// 机械手状态管理器
|
/// </summary>
|
/// <remarks>
|
/// 负责管理 Redis 缓存中的机械手状态,包括读写和并发控制。
|
/// </remarks>
|
private readonly RobotStateManager _stateManager;
|
|
/// <summary>
|
/// 消息路由器
|
/// </summary>
|
/// <remarks>
|
/// 负责处理从 TCP 服务器接收到的消息,并分发给合适的处理器。
|
/// 是消息处理管道的入口。
|
/// </remarks>
|
private readonly IRobotMessageRouter _messageRouter;
|
|
/// <summary>
|
/// 机器人任务处理器
|
/// </summary>
|
/// <remarks>
|
/// 负责任务的下发、状态更新、与 WMS 的交互等。
|
/// </remarks>
|
private readonly RobotTaskProcessor _taskProcessor;
|
|
/// <summary>
|
/// 机器人工作流编排器
|
/// </summary>
|
/// <remarks>
|
/// 负责执行任务的状态机流转,根据当前状态决定下一步动作。
|
/// 这是核心的业务逻辑编排组件。
|
/// </remarks>
|
private readonly IRobotWorkflowOrchestrator _workflowOrchestrator;
|
|
/// <summary>
|
/// 日志记录器
|
/// </summary>
|
private readonly ILogger<RobotJob> _logger;
|
|
/// <summary>
|
/// 构造函数
|
/// </summary>
|
/// <remarks>
|
/// 采用依赖注入方式获取所需服务,并完成组件的初始化和组装。
|
/// 这里体现了"控制反转"和"依赖注入"的设计原则。
|
/// </remarks>
|
/// <param name="tcpSocket">TCP Socket 服务器实例</param>
|
/// <param name="robotTaskService">机器人任务服务</param>
|
/// <param name="taskService">通用任务服务</param>
|
/// <param name="cache">缓存服务</param>
|
/// <param name="httpClientHelper">HTTP 客户端帮助类,用于调用 WMS 接口</param>
|
/// <param name="logger">日志记录器</param>
|
public RobotJob(
|
TcpSocketServer tcpSocket,
|
IRobotTaskService robotTaskService,
|
ITaskService taskService,
|
ICacheService cache,
|
HttpClientHelper httpClientHelper,
|
ILogger<RobotJob> logger)
|
{
|
// 初始化状态管理器,传入缓存服务
|
_stateManager = new RobotStateManager(cache);
|
_logger = logger;
|
|
// 创建 Socket 网关,封装 TcpSocketServer 的访问
|
// 后续替换通信实现时只需替换网关层
|
ISocketClientGateway socketGateway = new SocketClientGateway(tcpSocket);
|
|
// 初始化任务处理器
|
_taskProcessor = new RobotTaskProcessor(socketGateway, _stateManager, robotTaskService, taskService, httpClientHelper);
|
|
// 初始化客户端管理器
|
_clientManager = new RobotClientManager(tcpSocket, _stateManager);
|
|
// 初始化命令处理器
|
// 简单命令处理器:处理状态更新等简单命令
|
var simpleCommandHandler = new RobotSimpleCommandHandler(_taskProcessor);
|
// 前缀命令处理器:处理 pickfinished、putfinished 等带参数的命令
|
var prefixCommandHandler = new RobotPrefixCommandHandler(robotTaskService, _taskProcessor, _stateManager, socketGateway);
|
|
// 初始化消息路由器
|
_messageRouter = new RobotMessageHandler(socketGateway, _stateManager, cache, simpleCommandHandler, prefixCommandHandler, logger);
|
|
// 初始化工作流编排器
|
_workflowOrchestrator = new RobotWorkflowOrchestrator(_stateManager, _clientManager, _taskProcessor, robotTaskService);
|
|
// 订阅客户端断开连接事件
|
_clientManager.OnClientDisconnected += OnClientDisconnected;
|
|
// 全局只订阅一次 TCP 消息事件(保持原有行为)
|
// 使用 Interlocked.CompareExchange 实现原子操作
|
if (System.Threading.Interlocked.CompareExchange(ref _messageSubscribedFlag, 1, 0) == 0)
|
{
|
// 将消息路由器的处理方法绑定到 TCP 服务器的消息接收事件
|
tcpSocket.MessageReceived += _messageRouter.HandleMessageReceivedAsync;
|
_logger.LogError("机器手TCP消息事件已订阅");
|
QuartzLogger.Error($"机器手TCP消息事件已订阅");
|
}
|
}
|
|
/// <summary>
|
/// 客户端断开连接的事件处理
|
/// </summary>
|
/// <remarks>
|
/// 当客户端断开连接时记录日志,便于排查问题。
|
/// </remarks>
|
/// <param name="sender">事件发送者</param>
|
/// <param name="state">断开连接的机械手状态</param>
|
private void OnClientDisconnected(object? sender, RobotSocketState state)
|
{
|
_logger.LogError("客户端已断开连接");
|
QuartzLogger.Error($"客户端已断开连接", state.RobotCrane.DeviceName);
|
}
|
|
/// <summary>
|
/// Quartz Job 的执行入口
|
/// </summary>
|
/// <remarks>
|
/// 执行流程:
|
/// 1. 从 JobDataMap 获取设备信息
|
/// 2. 确保客户端已连接并订阅消息
|
/// 3. 轮询待处理任务
|
/// 4. 调用工作流编排器执行任务
|
///
|
/// 注意:此方法可能频繁调用(每秒一次),需注意性能。
|
/// </remarks>
|
/// <param name="context">Quartz 作业执行上下文,包含作业详情和数据</param>
|
public async Task Execute(IJobExecutionContext context)
|
{
|
// 从 JobDataMap 中获取作业参数
|
bool flag = context.JobDetail.JobDataMap.TryGetValue("JobParams", out object? value);
|
|
// 将参数转换为机器人设备信息
|
RobotCraneDevice robotCrane = (RobotCraneDevice?)value ?? new RobotCraneDevice();
|
|
// 如果没有获取到有效的设备信息,直接返回
|
if (!flag || robotCrane.IsNullOrEmpty())
|
{
|
return;
|
}
|
|
// 获取设备 IP 地址,作为状态缓存的键
|
string ipAddress = robotCrane.IPAddress;
|
|
// 获取或创建设备状态对象
|
RobotSocketState state = _stateManager.GetOrCreateState(ipAddress, robotCrane);
|
|
// 更新设备基础信息(以防设备信息在运行期间发生变化)
|
state.RobotCrane = robotCrane;
|
|
try
|
{
|
// 确保客户端已连接并订阅消息事件
|
// 如果客户端未连接或订阅失败,直接返回等待下次调度
|
if (!_clientManager.EnsureClientSubscribed(ipAddress, robotCrane))
|
{
|
return;
|
}
|
|
// 轮询获取该设备的待处理任务
|
var task = _taskProcessor.GetTask(robotCrane);
|
|
// 如果有待处理任务
|
if (task != null)
|
{
|
// 获取最新的设备状态
|
var latestState = _stateManager.GetState(ipAddress);
|
if (latestState == null)
|
{
|
// 状态不存在,可能设备未初始化
|
return;
|
}
|
|
// 检查任务总数是否未达到上限
|
if (latestState.RobotTaskTotalNum < MaxTaskTotalNum)
|
{
|
// 调用工作流编排器执行任务
|
// 编排器会根据当前状态决定下一步动作
|
await _workflowOrchestrator.ExecuteAsync(latestState, task, ipAddress);
|
}
|
}
|
}
|
catch (Exception ex)
|
{
|
// 异常处理已在组件内部进行,Job 层保持兜底语义
|
// 记录异常而不是静默吞掉,便于排查问题
|
_logger?.LogError(ex, "RobotJob执行异常,IP: {IpAddress}", ipAddress);
|
QuartzLogger.Error($"RobotJob执行异常,IP: {ipAddress}", state.RobotCrane.DeviceName, ex);
|
}
|
}
|
}
|
}
|