Мне очень понравилась библиотека soket.io. С помощью нее можно просто реализовать realtime приложения. Она сама выбирает протокол в зависимости от браузера, если надо то создает WebSoket.
Это конечно все хорошо, но не стоит забывать о том, что Node.js работает в одном потоке. Для этого у нас есть отличный инструмент Cluset.
Передо мной встал вопрос, а можно ли сделать приложения с кластеризацией, но не использовать для этого REDIS? Также для удобства используем EXPRESS.
Создадим простой пример (пока без кластера):
Пример из официальной документации.
var app = require('express')(); var server = require('http').Server(app); var io = require('socket.io')(server); server.listen(3038); app.get('/', function (req, res) { res.sendfile(__dirname + '/index.html'); }); io.on('connection', function (socket) { socket.emit('news', { hello: 'world' }); socket.on('my other event', function (data) { console.log(data); }); });
Пока ничего сложного, все понятно, вешаем сервер на порт 3038 с express и soket.io. Клиентский код будет неизменным:
<html> <head> <title>Test socket.io</title> <script src="socket.io.js"></script> <script> var connect_error_count = 0; var socket = io.connect('http://localhost:3038/', { 'reconnectionDelay': 10 // defaults to 500 } ); socket.on('connect_error', function(){ console.log('Connection Failed'); //Если более 5 попыток переподключения, то отключаем подключение connect_error_count++; if (connect_error_count>=5){ socket.disconnect(); console.log("stop reconection"); } }); socket.on('reconnect', function(){ console.log('reconnect'); connect_error_count=0; }); socket.on('news', function (data) { console.log(data); }); </script> </head> <body> </body> </html>
Немного пояснений по клиентскому коду. Создаем объект soket с настройками (порт 3038, таймаут подключения 10мс). В случае ошибок подключения более 5 раз, отключаем работу soket, если менее, то обнуляем счетчик. А также подключаемся к комнате news.
Следующим этапом будет создание кластера. Т.к. сокет нельзя создать на одном порту, то каждый новый worker будет создавать soket.io на новом порту:
var cluster = require('cluster');//Включаем cluster var cpuCount = require('os').cpus().length;//Количество ядер процессора var io = []; //В мастере создаем worker'ов равное количеству ядер процессоров if (cluster.isMaster) { for (var i = 0; i < cpuCount; i += 1) { var worker = cluster.fork(); } } //В воркере if (cluster.isWorker) { var worker_id = cluster.worker.id; var express = require('express'); var app = express(); var server = require('http').Server(app); io[worker_id] = require('socket.io')(server); server.listen(3030+worker_id); io[worker_id].on('connection', function (socket) { console.log( socket.id ); console.log( "WORKER ID :"+worker_id ); socket.emit('news', { hello: 'world' }); socket.on('my other event', function (data) { console.log(data); }); }); var app_express = express(); app_express.listen(8081); app_express.use(express.static('public'));//отдаем статичные данные (см. код клиента) app_express.get('/', function (request, response) { response.send('Hello from Worker '+worker_id); console.log( '------' ); }); app_express.get('/get_port', function (request, response) { response.send(3030+worker_id); console.log( 'get_port' ); }); } app_express.get('/api', function (request, response) { response.setHeader('Content-Type', 'application/json'); var id = request.param('id'); var msg = request.param('msg'); var port = request.param('port'); var JSON_DATA = { "worker_id":worker_id ,"id":id ,"msg":msg ,"port":port }; });
Схематично покажем как все это выглядит:
Когда мы открываем в браузере localhost:8081 cluster сервирует нас на worker по своему правилу. У нас есть вызов:
app_express.get('/api', function (request, response){...}
В этом вызове при открытии localhost:8081/api?msg=test&port=3032&id=100500 мы хотим передать клиенту подключенному по порту 3032 id=100500 сообщение с msg=teest.
Самый простой способ, создать клиента и открыть soket.io по определенному порту
var http = require('http'); var client = http.createClient(3032 , "localhost"); request = client.request(); request.on('response', function( res ) { res.on('data', function( data ) { console.log( data ); } ); } ); request.end();
Но к примеру нам нужно отправить broadcast сообщение, и нам неизвестен порт на котором сервируется клиента. Проблема в следующем, мы достоверно не знаем что при открытии localhost:8081/api?msg=test&port=3032&id=100500 мы будем сервироваться на worker 2.
На рисунке показано что при отправки broadcast сообщения с worker1 мы не имеем возможности отправить на прямую клиентам soket.io 2, soket.io 3, soket.io 4, soket.io…
Для решения этой задачи мы можем воспользоваться методами в cluster. В cluster'e мы можем передавать сообщения из master ? worker и из worker ? master. Для наглядности изобразим это схематично.
Пример работы с отправкой master ? worker и worker ? master представлен ниже:
// Кластерное приложение var cluster = require('cluster'); var io = []; var cpuCount = require('os').cpus().length; var workers = []; if (cluster.isMaster) { // Count the machine's CPUs // Create a worker for each CPU for (var i = 0; i < cpuCount; i += 1) { var worker = cluster.fork(); //Слушаем сообщение от workera worker.on('message', function(data) { //отправляем всем worker'ам сообщение for (var j in workers) {workers[j].send(data);} }); //Добавляем объект worker в массив workers.push(worker); } } if (cluster.isWorker) { //------------------------------------------------------------------------------------// var worker_id = cluster.worker.id; var express = require('express'); var app = express(); var server = require('http').Server(app); io[worker_id] = require('socket.io')(server); server.listen(3030+worker_id); io[worker_id].on('connection', function (socket) { console.log( socket.id ); console.log( "WORKER ID :"+worker_id ); socket.emit('news', { hello: 'world' }); socket.on('my other event', function (data) { console.log(data); }); }); //------------------------------------------------------------------------------------// var app_express = express(); app_express.listen(8081); app_express.use(express.static('public'));//отдаем статичные данные app_express.get('/', function (request, response) { response.send('Hello from Worker '+worker_id); console.log( '------' ); }); app_express.get('/get_port', function (request, response) { response.send(3030+worker_id); console.log( 'get_port' ); }); app_express.get('/api', function (request, response) { //process.send("----------------------"); response.setHeader('Content-Type', 'application/json'); var id = request.param('id'); var msg = request.param('msg'); var JSON_DATA = { "worker_id":worker_id ,"id":id ,"msg":msg }; io[port-3030].to(msg.id).emit('news', msg.msg); response.send( JSON.stringify(JSON_DATA) ); //Отправляем всем процессам данные process.send(JSON_DATA); }); //Обработка сообщений от worker// process.on('message', function(msg){ console.log(worker_id); console.log(msg.id); console.log(msg.msg); io[worker_id].to(msg.id).emit('news', msg.msg); }); }
Надеюсь статья будет полезна для новичков, чтобы познакомиться с cluster и общением между master ? worker и worker ? master. Кратко рассмотрен soket.io. И самое главное не используется redis.