const express = require('express');
const http = require('http');
const socketIO = require('socket.io');
const { createClient } = require('redis');
const app = express();
const server = http.createServer(app);
const io = socketIO(server, {
cors: {
origin: '*',
}
});
const { v4: uuidv4 } = require('uuid');
// UUID 생성
const SERVERID = uuidv4();
console.log(SERVERID); // 고유한 UUID 출력
// Redis adapter setup
const redisAdapter = require('socket.io-redis');
const redisClient = createClient({
url: 'redis://localhost:6379'
});
//redisClient.connect().catch(console.error);
redisClient.on('error', (error) => {
console.error('Redis connection error:', error);
// 여기에서 적절한 에러 처리를 수행할 수 있습니다.
});
redisClient.connect()
.then(() => {
console.log('Connected to Redis server');
})
.catch((error) => {
console.error('Failed to connect to Redis server:', error);
// 여기에서 적절한 에러 처리를 수행할 수 있습니다.
});
// Store server's UUID in Redis
redisClient.set('server_id', SERVERID);
io.adapter(redisAdapter(redisClient));
const PING_INTERVAL = 5000; // 5 seconds
const CLIENT_TIMEOUT = PING_INTERVAL * 2; // double seconds
// Heartbeat channel
const HEARTBEAT_CHANNEL = SERVERID + '_' + 'heartbeat';
// Function to publish heartbeat
async function publishHeartbeat() {
//console.log('Boradcast heartbeat message');
await redisClient.publish(HEARTBEAT_CHANNEL, 'ping');
}
// Start heartbeat interval
setInterval(publishHeartbeat, PING_INTERVAL);
// Function to check and remove zombie clients
async function checkAndRemoveZombieClients() {
console.log('Checking for zombie clients');
const serverId = await redisClient.get('server_id'); // Redis에서 서버의 고유값 가져오기
const clients = await redisClient.hGetAll('clients');
for (const key in clients) {
const client = JSON.parse(clients[key]);
if (client.serverId === serverId) { // 해당 서버의 고유값과 일치하는 클라이언트만 처리
const now = Date.now();
const lastPong = client.lastPong || 0;
if (now - lastPong > CLIENT_TIMEOUT * 3) {
console.log(`Removing zombie client: ${key}`);
// 좀비 클라이언트의 소켓을 disconnect
const socket = io.sockets.sockets.get(key);
if (socket) {
socket.disconnect(true);
}
// Redis에서 클라이언트 정보 삭제
await redisClient.hDel('clients', key);
}
}
}
}
// Start zombie client check interval
setInterval(checkAndRemoveZombieClients, PING_INTERVAL * 2); // Check every CLIENT_TIMEOUT milliseconds
// Function to check and update client timeout
async function updateClientTimeout(socketId) {
console.log("updateClientTimeout call! : ", socketId)
const now = Date.now();
const clientJson = await redisClient.hGet('clients', socketId);
if (!clientJson) return; // Client not found, possibly already disconnected
const client = JSON.parse(clientJson);
client.lastPong = now; // Update last pong time
// Check if client was in timeout state before this pong
if (now - client.lastPong > CLIENT_TIMEOUT) {
console.log(`Client ${socketId} was in timeout state, but responded now.`);
// Optionally, you can emit an event to the client to notify that it was nearly disconnected
io.to(socketId).emit('timeout-warning', 'You were about to be disconnected due to inactivity.');
}
// Update client info in Redis
await redisClient.hSet('clients', socketId, JSON.stringify(client));
}
io.on('connection', async (socket) => {
console.log('A new client has connected.');
const user = socket.handshake.query.user;
const itemseq = socket.handshake.query.itemseq;
console.log('Welcome!!!', user, itemseq);
// Store client information in Redis
//await redisClient.hSet('clients', socket.id, JSON.stringify({ user, itemseq, socketId: socket.id, lastPong: Date.now() }));
try {
//await redisClient.hSet('clients', socket.id, JSON.stringify({ user, itemseq, socketId: socket.id, lastPong: Date.now() }));
await redisClient.hSet('clients', socket.id, JSON.stringify({ user, itemseq, socketId: socket.id, lastPong: Date.now(), serverId: SERVERID }));
} catch (error) {
console.error('Failed to store client information in Redis:', error);
// 여기에서 적절한 에러 처리를 수행할 수 있습니다.
}
// Subscribe client to heartbeat channel
const subscriber = redisClient.duplicate();
await subscriber.connect();
await subscriber.subscribe(HEARTBEAT_CHANNEL, async (message) => {
if (message === 'ping') {
socket.emit('heartbeat', 'ping');
}
});
socket.on('heartbeat', async (msg) => {
if (msg === 'pong') {
// Update client's timeout status individually
await updateClientTimeout(socket.id);
}
});
socket.on('data message', async (msg) => {
console.log('Received message:', msg);
if (msg.type === 'bid') {
console.log("Bid received!!! : ", msg.itemseq);
// Get all clients from Redis
const clients = await redisClient.hGetAll('clients');
const clientsToNotify = [];
for (const key in clients) {
const client = JSON.parse(clients[key]);
if (client.itemseq === msg.itemseq) {
clientsToNotify.push(client);
}
}
for (const client of clientsToNotify) {
console.log("Sending to client!", client.socketId);
io.to(client.socketId).emit('data message', msg);
}
} else {
// Handle other types of messages
}
});
socket.on('disconnect', async () => {
console.log('Client disconnected.');
// Unsubscribe from heartbeat channel and close the subscriber
if (socket.subscriber) {
await socket.subscriber.unsubscribe(HEARTBEAT_CHANNEL);
await socket.subscriber.quit();
delete socket.subscriber; // Clean up the reference
}
await redisClient.hDel('clients', socket.id);
});
});
// Start the server on the specified port or default to 8985
const port = process.argv[2] || 8985;
server.listen(port, async () => {
// All redis clients delete.
await redisClient.del('clients');
console.log(`Server is running on port ${port}.`);
});