모바일 & 앱

node.js를 이용한 분산 소켓 서버 : #4 node.js로 서버 개발(클라이언트 포함) 추가로 postman으로 테스트

Hippalus 2024. 6. 30. 23:12
반응형

서버 소스 server.js

너무 간단한 버전이라 딱히 설명할게 없다.
클라이언트가 처음 접속할 때 user 아이디와 참여하고자 하는 방번호(그룹으로 묶여 있어서 이 방번호 안에서 발생한 메세지는 해당 방 참여자에게만 전송 용도)를 보내면 이를 map으로 저장해두고 
chat 메세지가 수신되면 보내는게 고작이다.
개별 실행하려면 node server.js 8985 처럼 포트번호를 적고 실행하면 동작한다.

const express = require('express');
const http = require('http');
const socketIO = require('socket.io');
const { createClient } = require('redis');

const app = express();
const server = http.createServer(app);

const io = socketIO(server, {
    cors: {
        origin: '*',
    }
});

const { v4: uuidv4 } = require('uuid');

// UUID 생성
const SERVERID = uuidv4();

console.log(SERVERID); // 고유한 UUID 출력

// Redis adapter setup
const redisAdapter = require('socket.io-redis');
const redisClient = createClient({
    url: 'redis://localhost:6379'
});
//redisClient.connect().catch(console.error);

redisClient.on('error', (error) => {
    console.error('Redis connection error:', error);
    // 여기에서 적절한 에러 처리를 수행할 수 있습니다.
});

redisClient.connect()
    .then(() => {
        console.log('Connected to Redis server');
    })
    .catch((error) => {
        console.error('Failed to connect to Redis server:', error);
        // 여기에서 적절한 에러 처리를 수행할 수 있습니다.
    });

// Store server's UUID in Redis
redisClient.set('server_id', SERVERID);

io.adapter(redisAdapter(redisClient));

const PING_INTERVAL = 5000; // 5 seconds
const CLIENT_TIMEOUT = PING_INTERVAL * 2; // double seconds

// Heartbeat channel
const HEARTBEAT_CHANNEL = SERVERID + '_' + 'heartbeat';

// Function to publish heartbeat
async function publishHeartbeat() {
    //console.log('Boradcast heartbeat message');
    await redisClient.publish(HEARTBEAT_CHANNEL, 'ping');
}

// Start heartbeat interval
setInterval(publishHeartbeat, PING_INTERVAL);

// Function to check and remove zombie clients
async function checkAndRemoveZombieClients() {
    console.log('Checking for zombie clients');
    const serverId = await redisClient.get('server_id'); // Redis에서 서버의 고유값 가져오기
    const clients = await redisClient.hGetAll('clients');

    for (const key in clients) {
        const client = JSON.parse(clients[key]);
        if (client.serverId === serverId) { // 해당 서버의 고유값과 일치하는 클라이언트만 처리
            const now = Date.now();
            const lastPong = client.lastPong || 0;
            if (now - lastPong > CLIENT_TIMEOUT * 3) {
                console.log(`Removing zombie client: ${key}`);
                // 좀비 클라이언트의 소켓을 disconnect
                const socket = io.sockets.sockets.get(key);
                if (socket) {
                    socket.disconnect(true);
                }
                // Redis에서 클라이언트 정보 삭제
                await redisClient.hDel('clients', key);
            }
        }
    }
}



// Start zombie client check interval
setInterval(checkAndRemoveZombieClients, PING_INTERVAL * 2); // Check every CLIENT_TIMEOUT milliseconds


// Function to check and update client timeout
async function updateClientTimeout(socketId) {
    console.log("updateClientTimeout call! : ", socketId)
    const now = Date.now();
    const clientJson = await redisClient.hGet('clients', socketId);
   
    if (!clientJson) return; // Client not found, possibly already disconnected

    const client = JSON.parse(clientJson);
    client.lastPong = now; // Update last pong time

    // Check if client was in timeout state before this pong
    if (now - client.lastPong > CLIENT_TIMEOUT) {
        console.log(`Client ${socketId} was in timeout state, but responded now.`);
        // Optionally, you can emit an event to the client to notify that it was nearly disconnected
        io.to(socketId).emit('timeout-warning', 'You were about to be disconnected due to inactivity.');
    }

    // Update client info in Redis
    await redisClient.hSet('clients', socketId, JSON.stringify(client));
}

io.on('connection', async (socket) => {
    console.log('A new client has connected.');

    const user = socket.handshake.query.user;
    const itemseq = socket.handshake.query.itemseq;

    console.log('Welcome!!!', user, itemseq);

    // Store client information in Redis
    //await redisClient.hSet('clients', socket.id, JSON.stringify({ user, itemseq, socketId: socket.id, lastPong: Date.now() }));
    try {
        //await redisClient.hSet('clients', socket.id, JSON.stringify({ user, itemseq, socketId: socket.id, lastPong: Date.now() }));
        await redisClient.hSet('clients', socket.id, JSON.stringify({ user, itemseq, socketId: socket.id, lastPong: Date.now(), serverId: SERVERID }));
    } catch (error) {
        console.error('Failed to store client information in Redis:', error);
        // 여기에서 적절한 에러 처리를 수행할 수 있습니다.
    }

    // Subscribe client to heartbeat channel
    const subscriber = redisClient.duplicate();
    await subscriber.connect();
    await subscriber.subscribe(HEARTBEAT_CHANNEL, async (message) => {
        if (message === 'ping') {
            socket.emit('heartbeat', 'ping');
        }
    });

    socket.on('heartbeat', async (msg) => {
        if (msg === 'pong') {
            // Update client's timeout status individually
            await updateClientTimeout(socket.id);
        }
    });

    socket.on('data message', async (msg) => {
        console.log('Received message:', msg);

        if (msg.type === 'bid') {
            console.log("Bid received!!! : ", msg.itemseq);

            // Get all clients from Redis
            const clients = await redisClient.hGetAll('clients');
           
            const clientsToNotify = [];

            for (const key in clients) {
                const client = JSON.parse(clients[key]);

                if (client.itemseq === msg.itemseq) {
                    clientsToNotify.push(client);
                }
            }

            for (const client of clientsToNotify) {
                console.log("Sending to client!", client.socketId);
                io.to(client.socketId).emit('data message', msg);
            }
        } else {
            // Handle other types of messages
        }
    });

    socket.on('disconnect', async () => {
        console.log('Client disconnected.');
        // Unsubscribe from heartbeat channel and close the subscriber
        if (socket.subscriber) {
            await socket.subscriber.unsubscribe(HEARTBEAT_CHANNEL);
            await socket.subscriber.quit();
            delete socket.subscriber;  // Clean up the reference
        }

        await redisClient.hDel('clients', socket.id);
    });
});



// Start the server on the specified port or default to 8985
const port = process.argv[2] || 8985;
server.listen(port, async () => {
    // All redis clients delete.
    await redisClient.del('clients');

    console.log(`Server is running on port ${port}.`);
});



클라이언트 소스 

역시 별거 없다.
https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js 갖다쓰고
connect 버튼 누르면 서버에 접속하고
이후 접속되면 타입으로 전송하고 서버에서 오면 받아서 뿌리는게 끝이다.
 


<!DOCTYPE html>
<html lang="ko">
<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>Socket.IO Chat</title>
</head>
<body>
  <ul id="messages"></ul>
  <form id="form" action="">
    <input id="user" type="text" placeholder="user" value="arosones">
    <input id="itemseq" type="text" placeholder="itemseq" value="a01">
    <button id="connectButton">connect</button>
    <input id="bidamount" type="text" autocomplete="off" placeholder="bidamount">
    <button id="sendButton" disabled>send</button>
  </form>

  <script>
    $(document).ready(function() {
       
      var socket;
 
      $('#connectButton').click(function(e) {
        e.preventDefault();

        var user = $('#user').val();
        var itemseq = $('#itemseq').val();
       
        if (user && itemseq) {
            socket = io('https://dev.kobay.co.kr:8984', {
                query: {
                  user: user,
                  itemseq: itemseq
                },

                transports: ['websocket'],
                reconnectionAttempts:2,
                reconnectionDelay: 100
              });

          socket.on('error', (error) => {
            console.log('WebSocket connection error:', error);
                // 여기에 추가적인 오류 처리 로직을 넣을 수 있습니다.
          });
         
          socket.on('connect', () => {
            console.log('WebSocket connected');
           
            $('#connectButton').prop('disabled', true);
            $('#sendButton').prop('disabled', false);
          });

          socket.on('disconnect', () => {
            console.log('WebSocket disconnected');

            $('#connectButton').prop('disabled', false);
            $('#sendButton').prop('disabled', true);
          });

          socket.on('data message', function(msg) {
            console.log("msg : ", msg);

            var item = $('<li>').text(msg.bidamount);

            $('#messages').append(item);
          });


          socket.on('heartbeat', (msg) => {
            console.log("heartbeat rcv : ", msg);
              if (msg === 'ping') {
                    // 'ping' 메시지를 받으면 즉시 'pong'으로 응답
                    socket.emit('heartbeat', 'pong');
              }
          });          
        }
      });

      $('#sendButton').click(function(e) {
        e.preventDefault();

        var bidamount = $('#bidamount').val();
        var itemseq = $('#itemseq').val();

        if (bidamount && socket.connected) {
          var message = {
            type: 'bid',
            bidamount: bidamount,
            itemseq: itemseq,
          };

          socket.emit('data message', message);

          $('#bidamount').val('');
        }
      });
    });
  </script>
</body>
</html>




다만 중요한 부분은 transports: ['websocket'] 이부분이다.
WebSocket 전송 방식만 사용하는건데 이거 빠지면 사정없이 서버에서 끊어지고 난리도 아니다.
이 부분을 몰라서 이틀을 허비했다. ㅡㅡ

추가로
포스트맨으로 테스트 하려면

 

 



반응형