NodeJs 专题
专题目录
您的位置:Js应用 > NodeJs专题 > Node.js 集群
Node.js 集群
作者:--    发布时间:2019-11-20
稳定性: 2 - 不稳定

单个node.js实例在单线程中运行,在某些情况下,它可能出现负载,因此为了能够更好的利用多核系统的能力,你可以使用node.js内置的集群(cluster)功能来处理负载。

在集群模块里很容易就能创建一个共享所有服务器接口的进程。

var cluster = require('cluster');
var http = require('http');
var numcpus = require('os').cpus().length;

if (cluster.ismaster) {
  // fork workers.
  for (var i = 0; i < numcpus; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  });
} else {
  // workers can share any tcp connection
  // in this case its a http server
  http.createserver(function(req, res) {
    res.writehead(200);
    res.end("hello world\n");
  }).listen(8000);
}

运行node后,将会在所有工作进程里共享8000端口。

% node_debug=cluster node server.js
23521,master worker 23524 online
23521,master worker 23526 online
23521,master worker 23523 online
23521,master worker 23528 online

这个特性是最近才引入的,大家可以试试并提供反馈。

还要注意,在windows系统里还不能在工作进程中创建一个被命名的管道服务器。

如何工作

child_process.fork方法派生工作进程,所以它能通过ipc和父进程通讯,并相互传递句柄。

集群模块通过以下的两种分发模式来处理连接:

第一种(默认方法,除了windows平台)为循环式。主进程监听一个端口,接收新的连接,再轮流的分发给工作进程。

第二种,主进程监听socket,并发送给感兴趣的工作进程,工作进程直接接收连接。

比较上述两种方法,第二种方法理论上性能最高。实际上,由于操作系统各式各样,分配往往分配不均。比如,70%的连接终止于2个进程,实际上共有8个进程。

因为server.listen()将大部分工作交给了主进程,所以一个普通的node.js进程和一个集群工作进程会在三种情况下有所区别:

  1. server.listen({fd: 7})由于消息被传回主进程,所以将会监听主进程里的文件描述符,而不是其他工作进程里的文件描述符 7。
  2. server.listen(handle)监听一个明确地句柄,会使得工作进程使用指定句柄,而不是与主进程通讯。如果工作进程已经拥有了该句柄,前提是您知道在做什么。
  3. server.listen(0)通常它会让服务器随机监听端口。然而在集群里每个工作进程listen(0)时会收到相同的端口。实际上仅第一次是随机的,之后是可预测的。如果你想监听一个特定的端口,可以根据集群的工作进程的id生产一个端口id 。

在node.js或你的程序里没有路由逻辑,工作进程见也没有共享状态。因此,像登录和会话这样的工作,不要设计成过度依赖内存里的对象。

因为工作线程都是独立的,你可以根据需求来杀死或者派生而不会影响其他进程。只要仍然有工作进程,服务器还会接收连接。node不会自动管理工作进程的数量,这是你的责任,你可以根据自己需求来管理。

cluster.schedulingpolicy

调度策略cluster.sched_rr表示轮流制,cluster.sched_none表示操作系统处理。这是全局性的设定,一旦你通过cluster.setupmaster()派生了第一个工作进程,它就不可更改了。

sched_rr是除windows外所有系统的默认设置。只要libuv能够有效地分配iocp句柄并且不产生巨大的性能损失,windows也会改为sched_rr方式。

cluster.schedulingpolicy也可通过环境变量node_cluster_sched_policy来更改。有效值为"rr""none"

cluster.settings

  • {object}
    • execargv {array} 传给可执行的node的参数列表(默认=process.execargv)
    • exec {string} 执行文件的路径。 (默认=process.argv[1])
    • args {array} 传给工作进程的参数列表(默认=process.argv.slice(2))
    • silent {boolean}是否将输出发送给父进程的stdio。(默认=false)
    • uid {number} 设置用户进程的id。 (参考setuid(2)。)
    • gid {number} 设置进程组的id。 (参考setgid(2)。)

调用.setupmaster()(或.fork())方法后,这个settings对象会包含设置内容,包括默认值。

设置后会立即冻结,因为.setupmaster()只能调用一次。

这个对象不应该被手动改变或设置。

cluster.ismaster

  • {boolean}

如果是主进程,返回true。如果process.env.node_unique_id未定义,则ismastertrue

cluster.isworker

  • {boolean}

如果不是主进程返回true(和cluster.ismaster相反)。

事件: 'fork'

  • worker {worker object}

当一个新的工作进程被分支出来,集群模块会产生'fork'事件。它可用于记录工作进程,并创建自己的超时管理。

var timeouts = [];
function errormsg() {
  console.error("something must be wrong with the connection ...");
}

cluster.on('fork', function(worker) {
  timeouts[worker.id] = settimeout(errormsg, 2000);
});
cluster.on('listening', function(worker, address) {
  cleartimeout(timeouts[worker.id]);
});
cluster.on('exit', function(worker, code, signal) {
  cleartimeout(timeouts[worker.id]);
  errormsg();
});

事件: 'online'

  • worker {worker object}

分支出一个新的工作进程后,它会响应在线消息。当主线程接收到在线消息后,它会触发这个事件。'fork'和'online'之间的区别在于,主进程分支一个工作进程后会调用 fork,而工作进程运行后会调用emitted。

cluster.on('online', function(worker) {
  console.log("yay, the worker responded after it was forked");
});

事件: 'listening'

  • worker {worker object}
  • address {object}

工作进程调用listen()时,服务器会触发'listening'事件,同时也会在主进程的集群里触发。

事件处理函数有两个参数,worker包含工作进程对象,address包含以下属性:address, portaddresstype。如果工作进程监听多个地址的时候,这些东西非常有用。

cluster.on('listening', function(worker, address) {
  console.log("a worker is now connected to " + address.address + ":" + address.port);
});

addresstype是以下内容:

  • 4 (tcpv4)
  • 6 (tcpv6)
  • -1 (unix domain socket)
  • "udp4" 或者"udp6"(udp v4或者v6)*

事件: 'disconnect'

  • worker {worker object}

当一个工作进程的ipc通道关闭时会触发这个事件。当工作进程正常退出,被杀死,或者手工关闭(例如worker.disconnect())时会调用。

disconnectexit事件间可能存在延迟。 这些事件可以用来检测进程是否卡在清理过程中,或者存在长连接。

cluster.on('disconnect', function(worker) {
  console.log('the worker #' + worker.id + ' has disconnected');
});

事件: 'exit'

  • worker {worker object}
  • code {number} 如果正常退出,则为退出代码.
  • signal {string} 使得进程被杀死的信号名 (比如,'sighup')

当任意一个工作进程终止的时候,集群模块会触发'exit'事件。

可以调用.fork()重新启动工作进程。

cluster.on('exit', function(worker, code, signal) {
  console.log('worker %d died (%s). restarting...',
    worker.process.pid, signal || code);
  cluster.fork();
});

参见child_process event: 'exit'.

事件: 'setup'

  • settings{object}

调用.setupmaster()后会被触发。

settings对象就是cluster.settings对象。

详细内容参见cluster.settings

cluster.setupmaster([settings])

  • settings {object}
    • exec {string} 执行文件的路径。 (默认=process.argv[1])
    • args {array}传给工作进程的参数列表(默认=process.argv.slice(2))
    • silent {boolean} 是否将输出发送给父进程的stdio.

setupmaster用来改变默认的'fork' 。 一旦调用,settings值将会出现在cluster.settings里。

你需要注意如下事项:

  • 改变任何设置,仅会对未来的工作进程产生影响,不会影响对目前已经运行的进程
  • 工作进程里,仅能改变传递给.fork()env属性。
  • 以上的默认值,仅在第一次调用的时候有效,之后的默认值是调用cluster.setupmaster()后的值。

例如:

var cluster = require('cluster');
cluster.setupmaster({
  exec: 'worker.js',
  args: ['--use', 'https'],
  silent: true
});
cluster.fork(); // https worker
cluster.setupmaster({
  args: ['--use', 'http']
});
cluster.fork(); // http worker

仅能在主进程里调用。

cluster.fork([env])

  • env{object} 添加到子进程环境变量中的键值。
  • return {worker object}

派生一个新的工作进程。

仅能在主进程里调用。

cluster.disconnect([callback])

  • callback {function} 当所有工作进程都断开连接,并且关闭句柄后被调用。

cluster.workers里的每个工作进程可调用.disconnect()关闭。

关闭所有的内部句柄连接,并且没有任何等待处理的事件时,允许主进程优雅的退出。

这个方法有一个可选参数,会在完成时被调用。

仅能在主进程里调用。

cluster.worker

  • {object}

对当前工作进程对象的引用。主进程中不可用。

var cluster = require('cluster');

if (cluster.ismaster) {
  console.log('i am master');
  cluster.fork();
  cluster.fork();
} else if (cluster.isworker) {
  console.log('i am worker #' + cluster.worker.id);
}

cluster.workers

  • {object}

存储活跃工作对象的哈希表,主键是id,能方便的遍历所有工作进程,仅在主进程可用。

当工作进程关闭连接并退出后,将会从cluster.workers里移除。这两个事件的次序无法确定,仅能保证从cluster.workers移除会发生在'disconnect''exit'之后。

// go through all workers
function eachworker(callback) {
  for (var id in cluster.workers) {
    callback(cluster.workers[id]);
  }
}
eachworker(function(worker) {
  worker.send('big announcement to all workers');
});

如果希望通过通讯通道引用工作进程,那么使用工作进程的 id 来查询最简单。

socket.on('data', function(id) {
  var worker = cluster.workers[id];
});

class: worker

一个worker对象包含工作进程所有公开的信息和方法。在主进程里可用通过cluster.workers来获取,在工作进程里可以通过cluster.worker来获取。

worker.id

  • {string}

每一个新的工作进程都有独立的唯一标示,它就是id

当工作进程可用时,id就是cluster.workers里的主键。

worker.process

  • {childprocess object}

所有工作进程都是通用child_process.fork()创建的,该函数返回的对象被储存在process中。

参见: child process module

注意:当process.suicide不是true的时候,会触发'disconnect'事件,并使得工作进程调用process.exit(0)。它会保护意外的连接关闭。

worker.suicide

  • {boolean}

调用.kill().disconnect()后设置,在这之前是undefined

worker.suicide能让你区分出是自愿的还是意外退出,主进程可以根据这个值,来决定是否是重新派生成工作进程。

cluster.on('exit', function(worker, code, signal) {
  if (worker.suicide === true) {
    console.log('oh, it was just suicide\' – no need to worry').
  }
});

// kill worker
worker.kill();

worker.send(message[, sendhandle])

  • message {object}
  • sendhandle {handle object}

这个函数和child_process.fork()提供的send方法相同。主进程里你必须使用这个函数给指定工作进程发消息。

在工作进程里,你也可以用process.send(message)

这个例子会回应所有来自主进程的消息:

if (cluster.ismaster) {
  var worker = cluster.fork();
  worker.send('hi there');

} else if (cluster.isworker) {
  process.on('message', function(msg) {
    process.send(msg);
  });
}

worker.kill([signal='sigterm'])

  • signal{string}发送给工作进程的杀死信号的名称

这个函数会杀死工作进程。在主进程里,它会关闭worker.process,一旦关闭会发送杀死信号。在工作进程里,关闭通道,退出,返回代码0

会导致.suicide被设置。

为了保持兼容性,这个方法的别名是worker.destroy()

注意,在工作进程里有process.kill(),于此不同。

worker.disconnect()

在工作进程里,这个函数会关闭所有服务器,等待 'close' 事件,关闭ipc通道。

在主进程里,发给工作进程一个内部消息,用来调用.disconnect()

会导致.suicide被设置。

注意,服务器关闭后,不再接受新的连接,但可以接受新的监听。已经存在的连接允许正常退出。当连接为空得时候,工作进程的ipc通道运行优雅的退出。

以上仅能适用于服务器的连接,客户端的连接由工作进程关闭。

注意,在工作进程里,存在process.disconnect,但并不是这个函数,它是disconnect。

由于长连接可能会阻塞进程关闭连接,有一个较好的办法是发消息给应用,这样应用会想办法关闭它们。超时管理也是不错,如果超过一定时间后还没有触发 disconnect事件,将会杀死进程。

if (cluster.ismaster) {
  var worker = cluster.fork();
  var timeout;

  worker.on('listening', function(address) {
    worker.send('shutdown');
    worker.disconnect();
    timeout = settimeout(function() {
      worker.kill();
    }, 2000);
  });

  worker.on('disconnect', function() {
    cleartimeout(timeout);
  });

} else if (cluster.isworker) {
  var net = require('net');
  var server = net.createserver(function(socket) {
    // connections never end
  });

  server.listen(8000);

  process.on('message', function(msg) {
    if(msg === 'shutdown') {
      // initiate graceful close of any connections to server
    }
  });
}

worker.isdead()

工作进程结束,返回true, 否则返回false

worker.isconnected()

当工作进程通过ipc通道连接主进程时,返回true ,否则false。工作进程创建后会连接到主进程。当disconnect事件触发后会关闭连接。

事件: 'message'

  • message{object}

该事件和child_process.fork()所提供的一样。在主进程中您应当使用该事件,而在工作进程中您也可以使用process.on('message')

例如,有一个集群使用消息系统在主进程中统计请求的数量:

var cluster = require('cluster');
var http = require('http');

if (cluster.ismaster) {

  // keep track of http requests
  var numreqs = 0;
  setinterval(function() {
    console.log("numreqs =", numreqs);
  }, 1000);

  // count requestes
  function messagehandler(msg) {
    if (msg.cmd && msg.cmd == 'notifyrequest') {
      numreqs += 1;
    }
  }

  // start workers and listen for messages containing notifyrequest
  var numcpus = require('os').cpus().length;
  for (var i = 0; i < numcpus; i++) {
    cluster.fork();
  }

  object.keys(cluster.workers).foreach(function(id) {
    cluster.workers[id].on('message', messagehandler);
  });

} else {

  // worker processes have a http server.
  http.server(function(req, res) {
    res.writehead(200);
    res.end("hello world\n");

    // notify master about the request
    process.send({ cmd: 'notifyrequest' });
  }).listen(8000);
}

事件: 'online'

cluster.on('online')事件类似, 仅能在特定工作进程里触发。

cluster.fork().on('online', function() {
  // worker is online
});

不会在工作进程里触发。

事件: 'listening'

  • address{object}

cluster.on('listening')事件类似, 仅能在特定工作进程里触发。

cluster.fork().on('listening', function(address) {
  // worker is listening
});

不会在工作进程里触发。

事件: 'disconnect'

cluster.on('disconnect')事件类似, 仅能在特定工作进程里触发。

cluster.fork().on('disconnect', function() {
  // worker has disconnected
});

事件: 'exit'

  • code {number} 正常退出时的退出代码.
  • signal {string} 使得进程被终止的信号的名称(比如sighup)。

cluster.on('exit')事件类似, 仅能在特定工作进程里触发。

var worker = cluster.fork();
worker.on('exit', function(code, signal) {
  if( signal ) {
    console.log("worker was killed by signal: "+signal);
  } else if( code !== 0 ) {
    console.log("worker exited with error code: "+code);
  } else {
    console.log("worker success!");
  }
});

事件: 'error'

child_process.fork()事件类似。

工作进程里,你也可以用process.on('error')

网站声明:
本站部分内容来自网络,如您发现本站内容
侵害到您的利益,请联系本站管理员处理。
联系站长
373515719@qq.com
关于本站:
编程参考手册