Skip to main content

如何使用 KCP ?

前言

最近在考虑帧同步的通信协议的选型,阅览了相关资料。很多联机对战都是通过 UDP 去实现。但是 UDP 没 TCP 那么可靠,在项目比较赶的情况下,自己封装一套可靠的 UDP ,会导致延期。于是我打算用 KCP 协议去实现。

KCP 介绍 官方文档:Introduction | KCP(KCP.org)

Node KCP-X 文档:Introduction |Node KCP X(node-kcp-x.org)

KCP 作者的介绍

KCP是一个快速可靠协议,能以比 TCP 浪费 10%-20% 的带宽的代价,换取平均延迟降低 30%-40%,且最大延迟降低三倍的传输效果。纯算法实现,并不负责底层协议(如UDP)的收发,需要使用者自己定义下层数据包的发送方式,以 callback的方式提供给 KCP。 连时钟都需要外部传递进来,内部不会有任何一次系统调用。

协议参赛介绍

以下是 KCP 作者所描述的,搬运过来是为了能快速理解参数

工作模式:

int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc)
  • nodelay :是否启用 nodelay模式,0不启用;1启用。
  • interval :协议内部工作的 interval,单位毫秒,比如 10ms或者 20ms
  • resend :快速重传模式,默认0关闭,可以设置2(2次ACK跨越将会直接重传)
  • nc :是否关闭流控,默认是0代表不关闭,1代表关闭。
  • 普通模式: ikcp_nodelay(kcp, 0, 40, 0, 0);
  • 极速模式: ikcp_nodelay(kcp, 1, 10, 2, 1);

最大窗口:

int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd);

该调用将会设置协议的最大发送窗口和最大接收窗口大小,默认为32. 这个可以理解为 TCP的 SND_BUF 和 RCV_BUF,只不过单位不一样 SND/RCV_BUF 单位是字节,这个单位是包。

  1. 最大传输单元:

    纯算法协议并不负责探测 MTU,默认 mtu是1400字节,可以使用ikcp_setmtu来设置该值。该值将会影响数据包归并及分片时候的最大传输单元。

  2. 最小RTO:

    不管是 TCP还是 KCP计算 RTO时都有最小 RTO的限制,即便计算出来RTO为40ms,由于默认的 RTO是100ms,协议只有在100ms后才能检测到丢包,快速模式下为30ms,可以手动更改该值:

    kcp->rx_minrto = 10;

如何使用

本次采用 node-kcp-x 去实现的。看了一下源码,如果我们要自己构建可以去 KCP 的项目,把 ikcp.h, ikcp.c 两个源文件,放到 node-kcp-x 对应的目录里从新打包即可。

node-gyp build

本次采用的框架 TSRPC

Dome 的目录结构

├──  src
│ ├── common # 公共文件、常量业务报错等
│ ├── config # 配置文件
│ ├── db # 数据库连接
│ ├── middleware # 中间件
│ ├── shared # 前后端共享代码
│ │ ├── protocols # 协议定义
│ │ ├── api # API 实现
│ ├── index.ts # 启动文件
│ ├── tsrpc.config.ts # TSRPC 项目配置文件

定义 KCP 的配置

./config/kcpConfig.ts

export const kcpConfig = {
port: 23333,
conv: 231206,
nodelay: 1,
interval: 10,
resend: 2,
nc: 1,
mtu: 1400,
sndWnd: 1024,
rcvWnd: 1024,
checkTime: 10,
heartbeatInterval: 3000,
timeoutInterval: 10000,
};

export const clientConfig = {
maxRetries: 10,
pingInterval: 3000,
pongTimeout: 1000,
isBreakLine: 0,
reconnectInterval: 3000,
maxRecInterval: 30000,
reconnectGap: 2,
};

封装 KCP 为中间件

./config/kcpConfig.ts
import { KCP } from 'node-kcp-x';
import * as dgram from 'dgram';
import { kcpConfig } from '../config/kcpConfig';
import { Logger, WsServer } from 'tsrpc';
import { ServiceType } from '../shared/protocols/serviceProto';
import { EventEmitter } from 'events';

interface ClientContext {
address: string;
port: number;
}

interface Group {
clientKey: string;
}

interface Client extends KCP {
lastHeartbeat: number;
}

interface EventActionContent extends Group {
groupId: string;
}

class KcpServer extends EventEmitter {
private socketServer: dgram.Socket;
private clients: { [key: string]: Client } = {};
private groups: { [groupId: string]: string[] } = {};
port: number;
server: WsServer<ServiceType>;
logger: Logger;
heartbeatInterval: number;
timeoutInterval: number;
constructor(server: WsServer<ServiceType>) {
super();
this.server = server;
this.logger = server?.logger;
this.port = kcpConfig.port;
this.heartbeatInterval = kcpConfig.heartbeatInterval;
this.timeoutInterval = kcpConfig.timeoutInterval;
this.socketServer = dgram.createSocket('udp4');
this.socketServer.on('error', this.handleError.bind(this));
this.socketServer.on('message', this.handleMessage.bind(this));
this.socketServer.on('listening', this.handleListening.bind(this));
this.startHeartbeatCheck();
this.on('onKcpPing', this.handleKcpPing.bind(this));
this.on('onAddGroup', this.handleAddGroup.bind(this));
}

private handleError(err: Error) {
this.logger?.error(
'Kcp connected error',
`port:${this.port},err:${err}`
);
this.socketServer.close();
}

private handleMessage(msg: Buffer, rinfo: dgram.RemoteInfo) {
const key = `${rinfo.address}_${rinfo.port}`;
if (!this.clients[key]) {
const context: ClientContext = {
address: rinfo.address,
port: rinfo.port,
};
const { nodelay, interval, resend, nc, conv, mtu, sndWnd, rcvWnd } =
kcpConfig;
const kcpObj = new KCP(conv, context) as Client;
kcpObj.nodelay(nodelay, interval, resend, nc);
kcpObj.setmtu(mtu);
kcpObj.wndsize(sndWnd, rcvWnd);
kcpObj.output(this.output.bind(this));
kcpObj.lastHeartbeat = Date.now();
this.clients[key] = kcpObj;
this.check(kcpObj);
}

const kcpObj = this.clients[key];
kcpObj.input(msg);

const size = kcpObj.peeksize();
if (size > 0) {
const buffer = kcpObj.recv();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const { action, data }: { action: string; data: any } = JSON.parse(
buffer.toString()
);
data.clientKey = key;
this.logger?.log(
`recv: ${buffer} from ${JSON.stringify(kcpObj.context())}`
);
kcpObj.send(Buffer.from(buffer));
this.emit(action, data);
kcpObj.flush();
}
}

private handleListening() {
const address = this.socketServer.address();
this.logger?.log(`server listening ${address.address}:${address.port}`);
}

private output(data: Buffer, size: number, context: ClientContext) {
this.socketServer.send(data, 0, size, context.port, context.address);
}

private check(kcpObj: KCP) {
if (!kcpObj) {
return;
}
const now = Date.now();
kcpObj.update(now);
setTimeout(() => this.check(kcpObj), kcpConfig.checkTime);
}
// 心跳定义,采用 ping pong
private handleKcpPing(data: EventActionContent) {
if (!data || !data?.clientKey) {
return;
}
const client = this.clients[data.clientKey];
client.lastHeartbeat = Date.now();
client.send(Buffer.from('PONG'));
client.flush();
}
// 封装组,类似房间
private handleAddGroup(data: EventActionContent) {
if (!data || !data?.groupId || !data?.clientKey) {
return;
}
const { clientKey, groupId } = data;
if (!this.groups[groupId]) {
this.groups[groupId] = [];
}
const groupMap = new Set(this.groups[groupId]);
if (groupMap.has(clientKey)) {
return;
}
this.groups[groupId].push(clientKey);
}
// 心跳检查
private startHeartbeatCheck() {
setInterval(() => {
const now = Date.now();
for (const [key, client] of Object.entries(this.clients)) {
if (now - client.lastHeartbeat > this.timeoutInterval) {
delete this.clients[key];
this.handleClientDisconnect(key);
}
}
}, this.heartbeatInterval);
}
// 断开操作
private handleClientDisconnect(clientKey: string) {
this.logger.log(`Client ${clientKey} disconnected.`);
}

public start() {
this.socketServer.bind(this.port);
}

public stop() {
this.socketServer.close();
}
}

export default KcpServer;

在程序入口中调用

import * as path from 'path';
import { WsServer } from 'tsrpc';
import { serviceProto } from './shared/protocols/serviceProto';
import { Global } from './db/global';
import KcpServer from './middleware/kcp';

export const server = new WsServer(serviceProto, {
port: 3000,
json: false,
});

export const KcpInstance = new KcpServer(server); // 创建 KCP 实例

async function init() {
await server.autoImplementApi(path.resolve(__dirname, 'api'));
await Global.initRedisDb(server.logger);
KcpInstance.start();
}

async function main() {
await init();
await server.start();
}
main();

其他

本例子的 Dome push-server

UDP 如何使用 教程

建议和交流

如果大家有更好的方法和意见可以留下评论呀。欢迎 👏🏻👏🏻👏🏻