如何使用 KCP ?
前言
最近在考虑帧同步的通信协议的选型,阅览了相关资料。很多联机对战都是通过 UDP 去实现。但是 UDP 没 TCP 那么可靠,在项目比较赶的情况下,自己封装一套可靠的 UDP ,会导致延期。于是我打算用 KCP 协议去实现。
官方文档:Introduction | KCP(KCP.org)
Node KCP-X 文档:Introduction |Node KCP X(node-kcp-x.org)
KCP 作者的介绍
KCP是一个快速可靠协议,能以比 TCP 浪费 10%-20% 的带宽的代价,换取平均延迟降低 30%-40%,且最大延迟降低三倍的传输效果。纯算法实现,并不负责底层协议(如UDP)的收发,需要使用者自己定义下层数据包的发送方式,以 callback的方式提供给 KCP。 连时钟都需要外部传递进来,内部不会有任何一次系统调用。
协议参赛介绍
以下是 KCP 作者所描述的,搬运过来是为了能快速理解参数
工作模式:
int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc)
- nodelay :是否启用 nodelay模式,0不启用;1启用。
- interval :协议内部工作的 interval,单位毫秒,比如 10ms或者 20ms
- resend :快速重传模式,默认0关闭,可以设置2(2次ACK跨越将会直接重传)
- nc :是否关闭流控,默认是0代表不关闭,1代表关闭。
- 普通模式: ikcp_nodelay(kcp, 0, 40, 0, 0);
- 极速模式: ikcp_nodelay(kcp, 1, 10, 2, 1);
最大窗口:
int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd);
该调用将会设置协议的最大发送窗口和最大接收窗口大小,默认为32. 这个可以理解为 TCP的 SND_BUF 和 RCV_BUF,只不过单位不一样 SND/RCV_BUF 单位是字节,这个单位是包。
-
最大传输单元:
纯算法协议并不 负责探测 MTU,默认 mtu是1400字节,可以使用ikcp_setmtu来设置该值。该值将会影响数据包归并及分片时候的最大传输单元。
-
最小RTO:
不管是 TCP还是 KCP计算 RTO时都有最小 RTO的限制,即便计算出来RTO为40ms,由于默认的 RTO是100ms,协议只有在100ms后才能检测到丢包,快速模式下为30ms,可以手动更改该值:
kcp->rx_minrto = 10;
如何使用
本次采用 node-kcp-x 去实现的。看了一下源码,如果我们要自己构建可以去 KCP 的项目,把 ikcp.h, ikcp.c 两个源文件,放到 node-kcp-x 对应的目录里从新打包即可。
node-gyp build
本次采用的框架 TSRPC。
Dome 的目录结构
├── src
│ ├── common # 公共文件、常量业务报错等
│ ├── config # 配置文件
│ ├── db # 数据库连接
│ ├── middleware # 中间件
│ ├── shared # 前后端共享代码
│ │ ├── protocols # 协议定义
│ │ ├── api # API 实现
│ ├── index.ts # 启动文件
│ ├── tsrpc.config.ts # TSRPC 项目配置文件
定义 KCP 的配置
./config/kcpConfig.ts
export const kcpConfig = {
port: 23333,
conv: 231206,
nodelay: 1,
interval: 10,
resend: 2,
nc: 1,
mtu: 1400,
sndWnd: 1024,
rcvWnd: 1024,
checkTime: 10,
heartbeatInterval: 3000,
timeoutInterval: 10000,
};
export const clientConfig = {
maxRetries: 10,
pingInterval: 3000,
pongTimeout: 1000,
isBreakLine: 0,
reconnectInterval: 3000,
maxRecInterval: 30000,
reconnectGap: 2,
};
封装 KCP 为中间件
./config/kcpConfig.ts
import { KCP } from 'node-kcp-x';
import * as dgram from 'dgram';
import { kcpConfig } from '../config/kcpConfig';
import { Logger, WsServer } from 'tsrpc';
import { ServiceType } from '../shared/protocols/serviceProto';
import { EventEmitter } from 'events';
interface ClientContext {
address: string;
port: number;
}
interface Group {
clientKey: string;
}
interface Client extends KCP {
lastHeartbeat: number;
}
interface EventActionContent extends Group {
groupId: string;
}
class KcpServer extends EventEmitter {
private socketServer: dgram.Socket;
private clients: { [key: string]: Client } = {};
private groups: { [groupId: string]: string[] } = {};
port: number;
server: WsServer<ServiceType>;
logger: Logger;
heartbeatInterval: number;
timeoutInterval: number;
constructor(server: WsServer<ServiceType>) {
super();
this.server = server;
this.logger = server?.logger;
this.port = kcpConfig.port;
this.heartbeatInterval = kcpConfig.heartbeatInterval;
this.timeoutInterval = kcpConfig.timeoutInterval;
this.socketServer = dgram.createSocket('udp4');
this.socketServer.on('error', this.handleError.bind(this));
this.socketServer.on('message', this.handleMessage.bind(this));
this.socketServer.on('listening', this.handleListening.bind(this));
this.startHeartbeatCheck();
this.on('onKcpPing', this.handleKcpPing.bind(this));
this.on('onAddGroup', this.handleAddGroup.bind(this));
}
private handleError(err: Error) {
this.logger?.error(
'Kcp connected error',
`port:${this.port},err:${err}`
);
this.socketServer.close();
}
private handleMessage(msg: Buffer, rinfo: dgram.RemoteInfo) {
const key = `${rinfo.address}_${rinfo.port}`;
if (!this.clients[key]) {
const context: ClientContext = {
address: rinfo.address,
port: rinfo.port,
};
const { nodelay, interval, resend, nc, conv, mtu, sndWnd, rcvWnd } =
kcpConfig;
const kcpObj = new KCP(conv, context) as Client;
kcpObj.nodelay(nodelay, interval, resend, nc);
kcpObj.setmtu(mtu);
kcpObj.wndsize(sndWnd, rcvWnd);
kcpObj.output(this.output.bind(this));
kcpObj.lastHeartbeat = Date.now();
this.clients[key] = kcpObj;
this.check(kcpObj);
}
const kcpObj = this.clients[key];
kcpObj.input(msg);
const size = kcpObj.peeksize();
if (size > 0) {
const buffer = kcpObj.recv();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const { action, data }: { action: string; data: any } = JSON.parse(
buffer.toString()
);
data.clientKey = key;
this.logger?.log(
`recv: ${buffer} from ${JSON.stringify(kcpObj.context())}`
);
kcpObj.send(Buffer.from(buffer));
this.emit(action, data);
kcpObj.flush();
}
}
private handleListening() {
const address = this.socketServer.address();
this.logger?.log(`server listening ${address.address}:${address.port}`);
}
private output(data: Buffer, size: number, context: ClientContext) {
this.socketServer.send(data, 0, size, context.port, context.address);
}
private check(kcpObj: KCP) {
if (!kcpObj) {
return;
}
const now = Date.now();
kcpObj.update(now);
setTimeout(() => this.check(kcpObj), kcpConfig.checkTime);
}
// 心跳定义,采用 ping pong
private handleKcpPing(data: EventActionContent) {
if (!data || !data?.clientKey) {
return;
}
const client = this.clients[data.clientKey];
client.lastHeartbeat = Date.now();
client.send(Buffer.from('PONG'));
client.flush();
}
// 封装组,类似房间
private handleAddGroup(data: EventActionContent) {
if (!data || !data?.groupId || !data?.clientKey) {
return;
}
const { clientKey, groupId } = data;
if (!this.groups[groupId]) {
this.groups[groupId] = [];
}
const groupMap = new Set(this.groups[groupId]);
if (groupMap.has(clientKey)) {
return;
}
this.groups[groupId].push(clientKey);
}
// 心跳检查
private startHeartbeatCheck() {
setInterval(() => {
const now = Date.now();
for (const [key, client] of Object.entries(this.clients)) {
if (now - client.lastHeartbeat > this.timeoutInterval) {
delete this.clients[key];
this.handleClientDisconnect(key);
}
}
}, this.heartbeatInterval);
}
// 断开操作
private handleClientDisconnect(clientKey: string) {
this.logger.log(`Client ${clientKey} disconnected.`);
}
public start() {
this.socketServer.bind(this.port);
}
public stop() {
this.socketServer.close();
}
}
export default KcpServer;
在程序入口中调用
import * as path from 'path';
import { WsServer } from 'tsrpc';
import { serviceProto } from './shared/protocols/serviceProto';
import { Global } from './db/global';
import KcpServer from './middleware/kcp';
export const server = new WsServer(serviceProto, {
port: 3000,
json: false,
});
export const KcpInstance = new KcpServer(server); // 创建 KCP 实例
async function init() {
await server.autoImplementApi(path.resolve(__dirname, 'api'));
await Global.initRedisDb(server.logger);
KcpInstance.start();
}
async function main() {
await init();
await server.start();
}
main();
其他
本例子的 Dome push-server
UDP 如何使用 教程
建议和交流
如果大家有更好的方法和意见可以留下评论呀。欢迎 👏🏻👏🏻👏🏻