1. 目的

有时候,我们需要数据在不同的服务之间进行流通,跨服务通讯;这个时候我们就需要将数据进行服务间的转发。

2. 应用场景

可能的应用场景有:

  • APP通过HTTP请求发送消息给服务器,服务器通过websocket推送给web客户端
  • Web客户端在web端进行操作(HTTP或websocket),操作数据通过TCP由服务器推送到移动端
  • ……

在做物联网相关的项目,可能会用得比较多。

3. 实现方法

不同的业务场景会有不同的解决方案,这里我们就不谈那些服务器crontab定时消费,用Redis队列来消费了。现在就来看看用workerman来实现,服务间数据的消费。

3.1 进程上再增加端口监听(一个进程监听两个端口协议)

通过Worker::listen()在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。这是最方便的方法,将本来进程间的通讯,转化为进程内的操作。

代码实例:

<?php
use Workerman\Worker;
require_once './Workerman/Autoloader.php';

//监听一个http端口
$worker = new Worker('websocket://0.0.0.0:1234');

// 4个进程
$worker->count = 4;

// 每个进程启动后在当前进程新增一个Worker监听
$worker->onWorkerStart = function($worker)
{
    $inner_worker = new Worker('http://0.0.0.0:2016');

    // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    $inner_worker->reusePort = true;

    $inner_worker->onMessage = function($connection, $data)
    {
        global $worker;

        // 转发到所有的ws连接
        $message = $data;
        foreach($worker->conections as $con){
            $con->send($message);
        }
    };

    // 执行监听。正常监听不会报错
    $inner_worker->listen();
};

$worker->onMessage = function($connection, $data)
{
    $message = $data;
    $connection->send($message);
};

// 运行worker
Worker::runAll();

同样地我们也可以从外部监听端口处转发里面发来的消息,只需要将要调用的worker定为global来共享就可以。

需要注意的是,如果PHP版本小于7.0,则不支持在多个子进程中实例化相同端口的Worker,即端口复用,只有PHP版本大于7.0才能使用:$inner_worker->reusePort = true;

3.2 用Channel分布式通讯组件订阅发布

这种方式是非阻塞式IO的。Channel组件需要到官网下载,是额外的拓展包。

代码实例如下:
首先需要开启一个Channel服务,:

<?php
use Workerman\Worker;

require_once __DIR__ . '/vendor/workerman/Autoloader.php';
require_once __DIR__ . '/vendor/channel/src/Server.php'; // 引人channel服务端文件

// 初始化一个Channel服务端
$channel_server = new Channel\Server('0.0.0.0', 2206);

// 如果不是在根目录启动,则运行runAll方法
if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}

定义Websocket服务,在worker启动时定义一个广播事件:

<?php
use \Workerman\Worker;
use \Workerman\Lib\Timer;

require_once __DIR__ . '/vendor/workerman/Autoloader.php';
require_once __DIR__ . '/vendor/channel/src/Client.php';


$ws_worker = new Worker(DWS_IP . ':' . DWS_PORT);
$ws_worker->count = 1;
$ws_worker->name = 'pusher';

$ws_worker->onWorkerStart = function($worker)
{
    // Channel客户端连接到Channel服务端
    Channel\Client::connect('127.0.0.1', 2206);

    // 订阅广播事件
    $event_name = 'pusher';
    // 收到广播事件后向当前进程内所有客户端连接发送广播数据
    Channel\Client::on($event_name, function($event_data)use($worker){
        $output = $event_data['content'];

        foreach($worker->connections as $connection)
        {
            $connection->send(json_encode($output));
        }
    });
};

$ws_worker->onConnect = function($connection){};

$ws_worker->onMessage = function($connection, $data){};

// 运行worker
Worker::runAll();

定义监听HTTP端口,收到信息时调用广播事件:

<?php
use \Workerman\Worker;

require_once __DIR__ . '/vendor/workerman/Autoloader.php';
require_once __DIR__ . '/vendor/channel/src/Client.php';

// 创建一个Worker监听8080端口,使用http协议通讯
$http_worker = new Worker('http://:127.0.0.1:8080');
$http_worker->name = 'receiver';
$http_worker->count = 4;

$http_worker->onWorkerStart = function()
{
    // 连接到channel
    Channel\Client::connect('127.0.0.1', 2206);
};


$http_worker->onConnect = function($connection){};

$http_worker->onMessage = function($connection, $data){

    // TODO 业务逻辑

    // 通过广播事件向websocket服务推送广播数据
    $event_name = 'pusher';
    Channel\Client::publish($event_name, array(
        'content'          => $new_file
    ));

    $output = json_encode(['status' => 'success', 'message' => '新文件保存成功', 'data' => '' ]);

    $connection->send($output);
};

// 运行worker
Worker::runAll();

这是一个全局广播例子,我们还可以定义成指定对象广播。通过Channel我们都可以写一个简单的点对点聊天例子了。