背景


  工作中经常会有定时任务的需求,常见的做法可以使用Timer、Quartz、Hangfire等组件,这次想尝试下新的思路,使用RabbitMQ死信队列的机制来实现定时任务,同时帮助再次了解RabbitMQ的死信队列。

 

交互流程

  

 

  1. 用户创建定时任务

  2. 往死信队列插入一条消息,并设置过期时间为首个任务执行时间

  3. 死信队列中的消息过期后,消息流向工作队列

  4. 任务执行消费者监听工作队列,工作队列向消费者推送消息

  5. 消费者查询数据库,读取任务信息

  6. 消费者确认任务有效(未被撤销),执行任务

  7. 消费者确认有下个任务,再往死信队列插入一条消息,并设置过期时间为任务执行时间

  8. 重复2-7的步骤,直到所有任务执行完成或任务撤销

 

环境准备

  请自行完成MongoDB和RabbitMQ的安装,Windows、Linux、Docker皆可,以下提供Windows的安装方法:

  MongoDB:https://docs.mongodb.com/manual/tutorial/install-mongodb-on-windows/
<https://docs.mongodb.com/manual/tutorial/install-mongodb-on-windows/>

  RabbitMQ:https://www.rabbitmq.com/install-windows.html
<https://www.rabbitmq.com/install-windows.html>

 

核心代码

  1. (WebApi)创建任务,并根据设置创建子任务,把任务数据写入数据库
var task = new Task { Name = form.Name, StartTime = form.StartTime, EndTime =
form.EndTime, Interval= form.Interval, SubTasks = new List<SubTask>() }; var
startTime = task.StartTime; var endTime = task.EndTime; while ((endTime -
startTime).TotalMinutes >=0) { var sendTime = startTime; if (sendTime <=
endTime && sendTime > DateTime.UtcNow) { task.SubTasks.Add(new SubTask { Id =
ObjectId.GenerateNewId(), SendTime = sendTime }); } startTime =
startTime.AddMinutes(task.Interval); }await
_mongoDbContext.Collection<Task>().InsertOneAsync(task);
 

  2. (WebApi)往死信队列中写入消息
var timeFlag = task.SubTasks[0].SendTime.ToString("yyyy-MM-dd HH:mm:ssZ"); var
exchange ="Task"; var queue = "Task"; var index = 0; var pendingExchange = "
PendingTask"; var pendingQueue = $"PendingTask|Task:{task.Id}_{index}_{timeFlag}
"; using (var channel = _rabbitConnection.CreateModel()) {
channel.ExchangeDeclare(exchange,"direct", true); channel.QueueDeclare(queue,
true, false, false); channel.QueueBind(queue, exchange, queue); var retryDic =
new Dictionary<string, object> { {"x-dead-letter-exchange", exchange}, {"
x-dead-letter-routing-key", queue} }; channel.ExchangeDeclare(pendingExchange, "
direct", true); channel.QueueDeclare(pendingQueue, true, false, false,
retryDic); channel.QueueBind(pendingQueue, pendingExchange, pendingQueue);var
properties = channel.CreateBasicProperties(); properties.Headers = new
Dictionary<string, object> { ["index"] = index, ["id"] = task.Id.ToString(), ["
sendtime"] = timeFlag }; properties.Expiration = ((int)(task.SubTasks[0
].SendTime -
DateTime.UtcNow).TotalMilliseconds).ToString(CultureInfo.InvariantCulture);
channel.BasicPublish(pendingExchange, pendingQueue, properties,
Encoding.UTF8.GetBytes(string.Empty)); }
  其中:

  PendingTask为死信队列Exchange,死信队列的队列名(Queue
Name)会包含Task、index、timeFlag的信息,帮助跟踪队列和子任务,同时也起到唯一标识的作用。

  task.id为任务Id

  index为子任务下标

  timeFlag为子任务执行时间

 

  3. (消费者)处理消息
var exchange = "Task"; var queue = "Task"; _channel.ExchangeDeclare(exchange, "
direct", true); _channel.QueueDeclare(queue, true, false, false);
_channel.QueueBind(queue, exchange, queue);var consumer = new
EventingBasicConsumer(_channel);
   //监听处理 consumer.Received += (model, ea) => {
     //获取消息头信息 var index = (int)ea.BasicProperties.Headers["index"]; var id =
(ea.BasicProperties.Headers["id"] as byte[]).BytesToString(); var timeFlag =
(ea.BasicProperties.Headers["sendtime"] as byte[]).BytesToString();

   //删除临时死信队列 _channel.QueueDelete($"PendingTask|Task:{id}_{index}_{timeFlag}",
false, true); var taskId = new ObjectId(id); var task =
_mongoDbContext.Collection<Task>().Find(n => n.Id == taskId).SingleOrDefault();

     //撤销或已完成的任务不执行 if (task == null || task.Status != TaskStatus.Normal) {
_channel.BasicAck(ea.DeliveryTag,false); return; }
     //执行任务 _logger.LogInformation($"[{DateTime.UtcNow}]执行任务...");
//设置子任务已完成 task.SubTasks[index].IsSent = true; if (task.SubTasks.Count >
index +1) //还有未完成的子任务,把下个子任务的信息写入死信队列 { PublishPendingMsg(_channel, task, index
+1); } else { task.Status = TaskStatus.Finished; //所有子任务执行完毕,设置任务状态为完成 }
_mongoDbContext.Collection<Task>().ReplaceOne(n => n.Id == taskId, task);
//更新任务状态 _channel.BasicAck(ea.DeliveryTag,false); };
_channel.BasicConsume(queue,false, consumer);
 

  4. (WebApi)撤销任务,更新任务状态即可
var taskId = new ObjectId(id); var task = await
_mongoDbContext.Collection<Task>().Find(n => n.Id ==
taskId).SingleOrDefaultAsync();if (task == null) { return NotFound(new {
message ="任务不存在!" }); } task.Status = TaskStatus.Canceled; await
_mongoDbContext.Collection<Task>().FindOneAndReplaceAsync(n => n.Id == taskId,
task);
 

效果展示

   1. 先使用控制台把消费者启动起来。

  

 

  2. 创建任务


  启动WebApi,创建一个任务,开始时间为2019-07-16T07:55:00.000Z,结束时间为2019-07-16T07:59:00.000Z,执行时间间隔1分钟:

  

 

  任务与相应的子任务也写入了MongoDB,这里假设子任务可能是邮件发送任务:

  

     

  创建了一个临时死信队列,队列名称包含任务Id,子任务下标、以及子任务执行时间,并往其写入一条消息:

  

 

  3. 执行(子)任务

  从日志内容可以看出,(子)任务正常执行:

  

  子任务状态也标注为已发送

  

 

  同时也往消息队列写入了下一个子任务的消息:

  

  

  4. 撤销任务

   

 

  任务状态被置为已撤销:

  

 

  任务没再继续往下执行:

  

   

  消息队列中的临时队列被删除,消息也被消费完

  

 

源码地址

  https://github.com/ErikXu/rabbit-scheduler
<https://github.com/ErikXu/rabbit-scheduler>

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信