Skip to main content

如何实现主体类型的消息队列 ?

什么是队列

我们可以理解成一群人在排队打饭。先排的人,先打饭。本质就是先进先出。

前言

关于队列,相信大家应该很熟悉。先进先出就是它的特点。最近在实现某些业务场景会出现并发的情况,目前采用的数据库是没有事务控制的,无原子性的说法。存在高吞吐。其实我想用 KafkaMQ,但是出于某种原因,没能用上。于是打算自己造一个主题类的消息队列。

实现队列

class Queue {
constructor() {
this.q1 = [];
this.q2 = [];
}

push(value) {
return this.q1.push(value);
}

shift() {
let q2 = this.q2;
if (q2.length === 0) {
const q1 = this.q1;
if (q1.length === 0) {
return;
}
this.q1 = q2;
q2 = this.q2 = q1.reverse();
}

return q2.pop();
}

isEmpty() {
if (this.q1.length === 0 && this.q2.length === 0) {
return true;
}
return false;
}
}

module.exports = Queue;

实现延迟任务

class DelayedTask {
constructor(resolve, fn, args) {
this.resolve = resolve;
this.fn = fn;
this.args = args;
}
}
module.exports = DelayedTask;

延迟任务池子

const Queue = require('./queue');
const DelayedTask = require('./delayedTask');

class TaskPool {
constructor() {
this.queues = {};
this.sizes = {};
}

ensureQueue(topic, size = 1) {
if (!this.queues[topic]) {
this.queues[topic] = new Queue();
this.sizes[topic] = size;
}
}

addTask(topic, fn) {
this.ensureQueue(topic);
return (...args) => {
return new Promise(resolve => {
this.queues[topic].push(new DelayedTask(resolve, fn, args));
this.pullTask(topic);
});
};
}

pullTask(topic) {
if (this.queues[topic].isEmpty() || this.sizes[topic] <= 0) {
return;
}
this.sizes[topic]--;
const { resolve, fn, args } = this.queues[topic].shift();
resolve(this.runTask(topic, fn, args));
}

runTask(topic, fn, args) {
const result = Promise.resolve(fn(...args));
result
.then(() => {
this.sizes[topic]++;
this.pullTask(topic);
})
.catch(() => {
this.sizes[topic]++;
this.pullTask(topic);
});
return result;
}

getQueueLength(topic) {
if (this.queues[topic]) {
return this.queues[topic].length;
}
return 0;
}

getQueueContents(topic) {
if (this.queues[topic]) {
return this.queues[topic].map(task => {
return {
function: task.fn.name,
arguments: task.args,
};
});
}
return [];
}
}

module.exports = TaskPool;

用法

const TaskPool = require('../util/queue/taskPool');
const numberOfConcurrent = 1;
const taskQueue = new TaskPool(numberOfConcurrent);

const updateTask = async (type, roleId, count) => {
console.log(`更新角色任务: type=${type}, roleId=${roleId}, count=${count}`);
};

const updateRoleDailyTasks = async (type, roleId, count = 1) => {
const topic = 'update-role-org-daily-task';
taskQueue.ensureQueue(topic);
const task = taskQueue.addTask(topic, () => // 添加任务到队列,要使用箭头函数,不然直接调用会被立即执行
updateTask(type, roleId, count)
);
await task();// 执行该主题的任务
};