wanshenmean
2026-03-26 8e42d0c1b7ae36cff2e7c69999117911a4b6f300
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
using Microsoft.Extensions.Logging;
using Quartz;
using System.Net;
using WIDESEA_Core;
using WIDESEAWCS_Core.Caches;
using WIDESEAWCS_Core.Helper;
using WIDESEAWCS_Core.LogHelper;
using WIDESEAWCS_ITaskInfoService;
using WIDESEAWCS_QuartzJob;
using WIDESEAWCS_Tasks.SocketServer;
using WIDESEAWCS_Tasks.Workflow;
using WIDESEAWCS_Tasks.Workflow.Abstractions;
 
namespace WIDESEAWCS_Tasks
{
    /// <summary>
    /// 机器人任务作业(Quartz Job)- 负责调度与生命周期管理
    /// </summary>
    /// <remarks>
    /// Quartz 定时任务,每秒执行一次(默认),主要职责:
    /// 1. 从 JobDataMap 获取设备信息
    /// 2. 确保 TCP 客户端已连接并订阅消息
    /// 3. 轮询待处理的机器人任务
    /// 4. 调用工作流编排器执行任务状态机
    ///
    /// 使用 [DisallowConcurrentExecution] 禁止并发执行,确保同一设备的任务串行处理。
    /// 具体的状态机流程逻辑委托给 <see cref="RobotWorkflowOrchestrator"/> 处理。
    /// </remarks>
    [DisallowConcurrentExecution]
    public class RobotJob : IJob
    {
        /// <summary>
        /// 任务总数上限
        /// </summary>
        /// <remarks>
        /// 当机器人处理的货物数量达到此上限时,不再下发新任务。
        /// 防止机器人过度劳累或系统过载。
        /// </remarks>
        private const int MaxTaskTotalNum = 48;
 
        /// <summary>
        /// 消息事件订阅标志
        /// </summary>
        /// <remarks>
        /// 使用原子操作确保全局只订阅一次 TCP 消息事件。
        /// 防止多个 Job 实例重复订阅导致消息被多次处理。
        /// </remarks>
        private static int _messageSubscribedFlag;
 
        /// <summary>
        /// 机械手客户端连接管理器
        /// </summary>
        /// <remarks>
        /// 负责管理 TCP 连接的生命周期,包括连接、断开、消息发送等。
        /// </remarks>
        private readonly RobotClientManager _clientManager;
 
        /// <summary>
        /// 机械手状态管理器
        /// </summary>
        /// <remarks>
        /// 负责管理 Redis 缓存中的机械手状态,包括读写和并发控制。
        /// </remarks>
        private readonly RobotStateManager _stateManager;
 
        /// <summary>
        /// 消息路由器
        /// </summary>
        /// <remarks>
        /// 负责处理从 TCP 服务器接收到的消息,并分发给合适的处理器。
        /// 是消息处理管道的入口。
        /// </remarks>
        private readonly IRobotMessageRouter _messageRouter;
 
        /// <summary>
        /// 机器人任务处理器
        /// </summary>
        /// <remarks>
        /// 负责任务的下发、状态更新、与 WMS 的交互等。
        /// </remarks>
        private readonly RobotTaskProcessor _taskProcessor;
 
        /// <summary>
        /// 机器人工作流编排器
        /// </summary>
        /// <remarks>
        /// 负责执行任务的状态机流转,根据当前状态决定下一步动作。
        /// 这是核心的业务逻辑编排组件。
        /// </remarks>
        private readonly IRobotWorkflowOrchestrator _workflowOrchestrator;
 
        /// <summary>
        /// 日志记录器
        /// </summary>
        private readonly ILogger<RobotJob> _logger;
 
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <remarks>
        /// 采用依赖注入方式获取所需服务,并完成组件的初始化和组装。
        /// 这里体现了"控制反转"和"依赖注入"的设计原则。
        /// </remarks>
        /// <param name="tcpSocket">TCP Socket 服务器实例</param>
        /// <param name="robotTaskService">机器人任务服务</param>
        /// <param name="taskService">通用任务服务</param>
        /// <param name="cache">缓存服务</param>
        /// <param name="httpClientHelper">HTTP 客户端帮助类,用于调用 WMS 接口</param>
        /// <param name="logger">日志记录器</param>
        public RobotJob(
            TcpSocketServer tcpSocket,
            IRobotTaskService robotTaskService,
            ITaskService taskService,
            ICacheService cache,
            HttpClientHelper httpClientHelper,
            ILogger<RobotJob> logger)
        {
            // 初始化状态管理器,传入缓存服务
            _stateManager = new RobotStateManager(cache);
            _logger = logger;
 
            // 创建 Socket 网关,封装 TcpSocketServer 的访问
            // 后续替换通信实现时只需替换网关层
            ISocketClientGateway socketGateway = new SocketClientGateway(tcpSocket);
 
            // 初始化任务处理器
            _taskProcessor = new RobotTaskProcessor(socketGateway, _stateManager, robotTaskService, taskService, httpClientHelper);
 
            // 初始化客户端管理器
            _clientManager = new RobotClientManager(tcpSocket, _stateManager);
 
            // 初始化命令处理器
            // 简单命令处理器:处理状态更新等简单命令
            var simpleCommandHandler = new RobotSimpleCommandHandler(_taskProcessor);
            // 前缀命令处理器:处理 pickfinished、putfinished 等带参数的命令
            var prefixCommandHandler = new RobotPrefixCommandHandler(robotTaskService, _taskProcessor, _stateManager, socketGateway);
 
            // 初始化消息路由器
            _messageRouter = new RobotMessageHandler(socketGateway, _stateManager, cache, simpleCommandHandler, prefixCommandHandler, logger);
 
            // 初始化工作流编排器
            _workflowOrchestrator = new RobotWorkflowOrchestrator(_stateManager, _clientManager, _taskProcessor, robotTaskService);
 
            // 订阅客户端断开连接事件
            _clientManager.OnClientDisconnected += OnClientDisconnected;
 
            // 全局只订阅一次 TCP 消息事件(保持原有行为)
            // 使用 Interlocked.CompareExchange 实现原子操作
            if (System.Threading.Interlocked.CompareExchange(ref _messageSubscribedFlag, 1, 0) == 0)
            {
                // 将消息路由器的处理方法绑定到 TCP 服务器的消息接收事件
                tcpSocket.MessageReceived += _messageRouter.HandleMessageReceivedAsync;
                _logger.LogError("机器手TCP消息事件已订阅");
                QuartzLogger.Error($"机器手TCP消息事件已订阅");
            }
        }
 
        /// <summary>
        /// 客户端断开连接的事件处理
        /// </summary>
        /// <remarks>
        /// 当客户端断开连接时记录日志,便于排查问题。
        /// </remarks>
        /// <param name="sender">事件发送者</param>
        /// <param name="state">断开连接的机械手状态</param>
        private void OnClientDisconnected(object? sender, RobotSocketState state)
        {
            _logger.LogError("客户端已断开连接");
            QuartzLogger.Error($"客户端已断开连接", state.RobotCrane.DeviceName);
        }
 
        /// <summary>
        /// Quartz Job 的执行入口
        /// </summary>
        /// <remarks>
        /// 执行流程:
        /// 1. 从 JobDataMap 获取设备信息
        /// 2. 确保客户端已连接并订阅消息
        /// 3. 轮询待处理任务
        /// 4. 调用工作流编排器执行任务
        ///
        /// 注意:此方法可能频繁调用(每秒一次),需注意性能。
        /// </remarks>
        /// <param name="context">Quartz 作业执行上下文,包含作业详情和数据</param>
        public async Task Execute(IJobExecutionContext context)
        {
            // 从 JobDataMap 中获取作业参数
            bool flag = context.JobDetail.JobDataMap.TryGetValue("JobParams", out object? value);
 
            // 将参数转换为机器人设备信息
            RobotCraneDevice robotCrane = (RobotCraneDevice?)value ?? new RobotCraneDevice();
 
            // 如果没有获取到有效的设备信息,直接返回
            if (!flag || robotCrane.IsNullOrEmpty())
            {
                return;
            }
 
            // 获取设备 IP 地址,作为状态缓存的键
            string ipAddress = robotCrane.IPAddress;
 
            // 获取或创建设备状态对象
            RobotSocketState state = _stateManager.GetOrCreateState(ipAddress, robotCrane);
 
            // 更新设备基础信息(以防设备信息在运行期间发生变化)
            state.RobotCrane = robotCrane;
 
            try
            {
                // 确保客户端已连接并订阅消息事件
                // 如果客户端未连接或订阅失败,直接返回等待下次调度
                if (!_clientManager.EnsureClientSubscribed(ipAddress, robotCrane))
                {
                    return;
                }
 
                // 轮询获取该设备的待处理任务
                var task = _taskProcessor.GetTask(robotCrane);
 
                // 如果有待处理任务
                if (task != null)
                {
                    // 获取最新的设备状态
                    var latestState = _stateManager.GetState(ipAddress);
                    if (latestState == null)
                    {
                        // 状态不存在,可能设备未初始化
                        return;
                    }
 
                    // 检查任务总数是否未达到上限
                    if (latestState.RobotTaskTotalNum < MaxTaskTotalNum)
                    {
                        // 调用工作流编排器执行任务
                        // 编排器会根据当前状态决定下一步动作
                        await _workflowOrchestrator.ExecuteAsync(latestState, task, ipAddress);
                    }
                }
            }
            catch (Exception ex)
            {
                // 异常处理已在组件内部进行,Job 层保持兜底语义
                // 记录异常而不是静默吞掉,便于排查问题
                _logger?.LogError(ex, "RobotJob执行异常,IP: {IpAddress}", ipAddress);
                QuartzLogger.Error($"RobotJob执行异常,IP: {ipAddress}", state.RobotCrane.DeviceName, ex);
            }
        }
    }
}