如何实现主体类型的消息队列 ?
什么是队列
我们可以理解成一群人在排队打饭。先排的人,先打饭。本质就是先进先出。
前言
关于队列,相信大家应该很熟悉。先进先出就是它的特点。最近在实现某些业务场景会出现并发的情况,目前采用的数据库是没有事务控制的,无原子性的说法。存在高吞吐。其实我想用 Kafka 和 MQ,但是出于某种原因,没能用上。于是打算自己造一个主题类的消息队列。
实现队列
class Queue {
constructor() {
this.q1 = [];
this.q2 = [];
}
push(value) {
return this.q1.push(value);
}
shift() {
let q2 = this.q2;
if (q2.length === 0) {
const q1 = this.q1;
if (q1.length === 0) {
return;
}
this.q1 = q2;
q2 = this.q2 = q1.reverse();
}
return q2.pop();
}
isEmpty() {
if (this.q1.length === 0 && this.q2.length === 0) {
return true;
}
return false;
}
}
module.exports = Queue;
实现延迟任务
class DelayedTask {
constructor(resolve, fn, args) {
this.resolve = resolve;
this.fn = fn;
this.args = args;
}
}
module.exports = DelayedTask;
延迟任务池子
const Queue = require('./queue');
const DelayedTask = require('./delayedTask');
class TaskPool {
constructor() {
this.queues = {};
this.sizes = {};
}
ensureQueue(topic, size = 1) {
if (!this.queues[topic]) {
this.queues[topic] = new Queue();
this.sizes[topic] = size;
}
}
addTask(topic, fn) {
this.ensureQueue(topic);
return (...args) => {
return new Promise(resolve => {
this.queues[topic].push(new DelayedTask(resolve, fn, args));
this.pullTask(topic);
});
};
}
pullTask(topic) {
if (this.queues[topic].isEmpty() || this.sizes[topic] <= 0) {
return;
}
this.sizes[topic]--;
const { resolve, fn, args } = this.queues[topic].shift();
resolve(this.runTask(topic, fn, args));
}
runTask(topic, fn, args) {
const result = Promise.resolve(fn(...args));
result
.then(() => {
this.sizes[topic]++;
this.pullTask(topic);
})
.catch(() => {
this.sizes[topic]++;
this.pullTask(topic);
});
return result;
}
getQueueLength(topic) {
if (this.queues[topic]) {
return this.queues[topic].length;
}
return 0;
}
getQueueContents(topic) {
if (this.queues[topic]) {
return this.queues[topic].map(task => {
return {
function: task.fn.name,
arguments: task.args,
};
});
}
return [];
}
}
module.exports = TaskPool;
用法
const TaskPool = require('../util/queue/taskPool');
const numberOfConcurrent = 1;
const taskQueue = new TaskPool(numberOfConcurrent);
const updateTask = async (type, roleId, count) => {
console.log(`更新角色任务: type=${type}, roleId=${roleId}, count=${count}`);
};
const updateRoleDailyTasks = async (type, roleId, count = 1) => {
const topic = 'update-role-org-daily-task';
taskQueue.ensureQueue(topic);
const task = taskQueue.addTask(topic, () => // 添加任务到队列,要使用箭头函数,不然直接调用会被立即执行
updateTask(type, roleId, count)
);
await task();// 执行该主题的任务
};