From 002e7214ce62adf9b719545287ab7596623e04a8 Mon Sep 17 00:00:00 2001 From: Kubbo <390378816@qq.com> Date: Thu, 10 Apr 2025 07:23:02 +0800 Subject: [PATCH] =?UTF-8?q?feat(server):=20=E6=B7=BB=E5=8A=A0=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=A4=84=E7=90=86=E6=A8=A1=E5=9D=97=E5=B9=B6=E5=A2=9E?= =?UTF-8?q?=E5=BC=BA=E6=9C=8D=E5=8A=A1=E5=99=A8=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 DataProcesser 类,用于处理数据包和管理会话 - 在 GateServer 中添加客户端管理、广播消息和服务器停止功能 - 优化 GateServer 的连接处理逻辑,增加客户端 ID 和信号处理 --- GateServer/DataProcess.js | 70 +++++++++++++++++++++++++++++++++++++++ GateServer/index.js | 43 ++++++++++++++++++++++-- 2 files changed, 110 insertions(+), 3 deletions(-) create mode 100644 GateServer/DataProcess.js diff --git a/GateServer/DataProcess.js b/GateServer/DataProcess.js new file mode 100644 index 0000000..883faf8 --- /dev/null +++ b/GateServer/DataProcess.js @@ -0,0 +1,70 @@ +class DataProcesser { + constructor(maxSessionCount) { + this.maxSessionCount = maxSessionCount; + this.activeUser = 0; + this.sessions = new Array(maxSessionCount).fill(null); + this.sendThreads = []; + this.recvQueue = []; + this.serverBuf = { buffer: null, size: 0, offset: 0 }; + this.userVerify = 1; + this.processRecvSize = 0; + this.waitSendUserSize = 0; + this.waitSendQueueSize = 0; + this.sendUserSize = 0; + this.recvSeverSize = 0; + this.lastProcUsrMsgTime = 0; + this.lastProcSrvMsgTime = 0; + this.lastRecvSrvMsgTime = 0; + this.procSrvThreadSleep = 0; + this.sendQueueSize = 0; + this.ignoreDataPacket = 0; + this.sndThreadSleepTime = 20; + } + + initSessions() { + for (let i = 0; i < this.maxSessionCount; i++) { + this.sessions[i] = { + socket: null, + serverIdx: 0, + packetIdx: 0, + packetError: 0, + recvPacketCount: 0, + sendPacketCount: 0, + markToClose: false, + remoteClosed: false, + sendAvaliable: true, + sendTimeout: 0, + closeTick: 0, + connectTick: Date.now(), + clientMsgTick: Date.now(), + serverMsgTick: Date.now(), + verifyIdx: this.userVerify++, + recvBuf: { buffer: null, size: 0, offset: 0 }, + sendBuf: { buffer: null, size: 0, offset: 0 } + }; + } + } + + processUserRecvPacket(session, buffer, bufferSize) { + // 处理用户接收到的数据包 + // 这里需要实现具体的数据处理逻辑 + } + + sendServerMessage(ident, sessionIdx, socket, serverIdx, buffer, bufferSize) { + // 发送消息到服务器 + // 这里需要实现具体的消息发送逻辑 + } + + startup() { + this.initSessions(); + // 启动数据处理线程 + // 这里需要实现具体的线程启动逻辑 + } + + stop() { + // 停止数据处理线程 + // 这里需要实现具体的线程停止逻辑 + } +} + +module.exports = DataProcesser; \ No newline at end of file diff --git a/GateServer/index.js b/GateServer/index.js index 7f8d565..32b94ae 100644 --- a/GateServer/index.js +++ b/GateServer/index.js @@ -7,16 +7,32 @@ class GateServer extends EventEmitter { this.port = port; this.host = host; this.server = net.createServer(this.handleConnection.bind(this)); + this.clients = new Map(); // 用于存储连接的客户端 + this.gateEngineRunning = true; } start() { this.server.listen(this.port, this.host, () => { console.log(`GateServer is listening on ${this.host}:${this.port}`); }); + + // 处理信号 + process.on('SIGINT', () => this.signalHandler('SIGINT')); + process.on('SIGTERM', () => this.signalHandler('SIGTERM')); + } + + signalHandler(signal) { + if (signal === 'SIGINT' || signal === 'SIGTERM') { + this.gateEngineRunning = false; + console.log(`[SIGNAL] ${signal} received, shutting down...`); + this.stop(); + } } handleConnection(socket) { - console.log('New client connected'); + const clientId = `${socket.remoteAddress}:${socket.remotePort}`; + console.log(`New client connected: ${clientId}`); + this.clients.set(clientId, socket); // 存储客户端连接 this.emit('connection', socket); socket.on('data', (data) => { @@ -24,15 +40,36 @@ class GateServer extends EventEmitter { }); socket.on('end', () => { - console.log('Client disconnected'); + console.log(`Client disconnected: ${clientId}`); + this.clients.delete(clientId); // 移除断开的客户端 this.emit('end', socket); }); socket.on('error', (err) => { - console.error('Socket error:', err); + console.error(`Socket error for client ${clientId}:`, err); + this.clients.delete(clientId); // 移除出错的客户端 this.emit('error', err); }); } + + // 新增方法:获取当前连接的客户端数量 + getClientCount() { + return this.clients.size; + } + + // 新增方法:向所有客户端广播消息 + broadcast(message) { + this.clients.forEach((client) => { + client.write(message); + }); + } + + // 新增方法:停止服务器 + stop() { + this.server.close(() => { + console.log('GateServer has been stopped.'); + }); + } } module.exports = GateServer; \ No newline at end of file