1. 目的
有时候,我们需要数据在不同的服务之间进行流通,跨服务通讯;这个时候我们就需要将数据进行服务间的转发。
2. 应用场景
可能的应用场景有:
- APP通过HTTP请求发送消息给服务器,服务器通过websocket推送给web客户端
- Web客户端在web端进行操作(HTTP或websocket),操作数据通过TCP由服务器推送到移动端
- ……
在做物联网相关的项目,可能会用得比较多。
3. 实现方法
不同的业务场景会有不同的解决方案,这里我们就不谈那些服务器crontab定时消费,用Redis队列来消费了。现在就来看看用workerman来实现,服务间数据的消费。
3.1 进程上再增加端口监听(一个进程监听两个端口协议)
通过Worker::listen()
在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。这是最方便的方法,将本来进程间的通讯,转化为进程内的操作。
代码实例:
<?php
use Workerman\Worker;
require_once './Workerman/Autoloader.php';
//监听一个http端口
$worker = new Worker('websocket://0.0.0.0:1234');
// 4个进程
$worker->count = 4;
// 每个进程启动后在当前进程新增一个Worker监听
$worker->onWorkerStart = function($worker)
{
$inner_worker = new Worker('http://0.0.0.0:2016');
// 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
$inner_worker->reusePort = true;
$inner_worker->onMessage = function($connection, $data)
{
global $worker;
// 转发到所有的ws连接
$message = $data;
foreach($worker->conections as $con){
$con->send($message);
}
};
// 执行监听。正常监听不会报错
$inner_worker->listen();
};
$worker->onMessage = function($connection, $data)
{
$message = $data;
$connection->send($message);
};
// 运行worker
Worker::runAll();
同样地我们也可以从外部监听端口处转发里面发来的消息,只需要将要调用的worker定为global来共享就可以。
需要注意的是,如果PHP版本小于7.0,则不支持在多个子进程中实例化相同端口的Worker,即端口复用,只有PHP版本大于7.0才能使用:$inner_worker->reusePort = true;
。
3.2 用Channel分布式通讯组件订阅发布
这种方式是非阻塞式IO的。Channel组件需要到官网下载,是额外的拓展包。
代码实例如下:
首先需要开启一个Channel服务,:
<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/workerman/Autoloader.php';
require_once __DIR__ . '/vendor/channel/src/Server.php'; // 引人channel服务端文件
// 初始化一个Channel服务端
$channel_server = new Channel\Server('0.0.0.0', 2206);
// 如果不是在根目录启动,则运行runAll方法
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
定义Websocket服务,在worker启动时定义一个广播事件:
<?php
use \Workerman\Worker;
use \Workerman\Lib\Timer;
require_once __DIR__ . '/vendor/workerman/Autoloader.php';
require_once __DIR__ . '/vendor/channel/src/Client.php';
$ws_worker = new Worker(DWS_IP . ':' . DWS_PORT);
$ws_worker->count = 1;
$ws_worker->name = 'pusher';
$ws_worker->onWorkerStart = function($worker)
{
// Channel客户端连接到Channel服务端
Channel\Client::connect('127.0.0.1', 2206);
// 订阅广播事件
$event_name = 'pusher';
// 收到广播事件后向当前进程内所有客户端连接发送广播数据
Channel\Client::on($event_name, function($event_data)use($worker){
$output = $event_data['content'];
foreach($worker->connections as $connection)
{
$connection->send(json_encode($output));
}
});
};
$ws_worker->onConnect = function($connection){};
$ws_worker->onMessage = function($connection, $data){};
// 运行worker
Worker::runAll();
定义监听HTTP端口,收到信息时调用广播事件:
<?php
use \Workerman\Worker;
require_once __DIR__ . '/vendor/workerman/Autoloader.php';
require_once __DIR__ . '/vendor/channel/src/Client.php';
// 创建一个Worker监听8080端口,使用http协议通讯
$http_worker = new Worker('http://:127.0.0.1:8080');
$http_worker->name = 'receiver';
$http_worker->count = 4;
$http_worker->onWorkerStart = function()
{
// 连接到channel
Channel\Client::connect('127.0.0.1', 2206);
};
$http_worker->onConnect = function($connection){};
$http_worker->onMessage = function($connection, $data){
// TODO 业务逻辑
// 通过广播事件向websocket服务推送广播数据
$event_name = 'pusher';
Channel\Client::publish($event_name, array(
'content' => $new_file
));
$output = json_encode(['status' => 'success', 'message' => '新文件保存成功', 'data' => '' ]);
$connection->send($output);
};
// 运行worker
Worker::runAll();
这是一个全局广播例子,我们还可以定义成指定对象广播。通过Channel我们都可以写一个简单的点对点聊天例子了。
0 条评论
来做第一个留言的人吧!