如何基于Hyperf实现RabbitMQ+WebSocket消息推送【PHP教程】

!
也想出现在这里? 联系我们
信息

如何基于Hyperf实现RabbitMQ+WebSocket消息推送,第1张

概述如何基于Hyperf实现RabbitMQ+WebSocket消息推送 介绍

基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。

思路

利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端 ID。订阅发布者发布的消息针对已保存的客户端 ID 进行广播消息。

WebSocket 服务

composer require hyperf/websocket-server

配置文件 [config/autoload/server.PHP]

<?PHPreturn [    'mode' => SWOolE_PROCESS,    'servers' => [        [            'name' => 'http',            'type' => Server::SERVER_http,            'host' => '0.0.0.0',            'port' => 11111,            'sock_type' => SWOolE_SOCK_TCP,            'callbacks' => [                SwooleEvent::ON_REQUEST => [Hyperf\\httpServer\\Server::class, 'onRequest'],            ],        ],        [            'name' => 'ws',            'type' => Server::SERVER_WEBSOCKET,            'host' => '0.0.0.0',            'port' => 12222,            'sock_type' => SWOolE_SOCK_TCP,            'callbacks' => [                SwooleEvent::ON_HAND_SHAKE => [Hyperf\\WebSocketServer\\Server::class, 'onHandShake'],                SwooleEvent::ON_MESSAGE => [Hyperf\\WebSocketServer\\Server::class, 'onMessage'],                SwooleEvent::ON_CLOSE => [Hyperf\\WebSocketServer\\Server::class, 'onClose'],            ],        ],    ],

WebSocket 服务器端代码示例

<?PHPdeclare(strict_types=1);/** * This file is part of Hyperf. * * @link     https://www.hyperf.io * @document https://doc.hyperf.io * @contact  group@hyperf.io * @license  https://github.com/hyperf-cloud/hyperf/blob/master/liCENSE */namespace App\\Controller;use Hyperf\\Contract\\OnCloseInterface;use Hyperf\\Contract\\OnMessageInterface;use Hyperf\\Contract\\OnopenInterface;use Swoole\\http\\Request;use Swoole\\Server;use Swoole\\Websocket\\Frame;use Swoole\\WebSocket\\Server as WebSocketServer;class WebSocketController extends Controller implements OnMessageInterface, OnopenInterface, OnCloseInterface{    /**     * 发送消息     * @param WebSocketServer $server     * @param Frame $frame     */    public function onMessage(WebSocketServer $server, Frame $frame): voID    {        //心跳刷新缓存        $redis = $this->container->get(\\Redis::class);        //获取所有的客户端ID        $fdList = $redis->sMembers('websocket_sjd_1');        //如果当前客户端在客户端集合中,就刷新        if (in_array($frame->fd, $fdList)) {            $redis->sAdd('websocket_sjd_1', $frame->fd);            $redis->expire('websocket_sjd_1', 7200);        }        $server->push($frame->fd, 'Recv: ' . $frame->data);    }    /**     * 客户端失去链接     * @param Server $server     * @param int $fd     * @param int $reactorID     */    public function onClose(Server $server, int $fd, int $reactorID): voID    {        //删掉客户端ID        $redis = $this->container->get(\\Redis::class);        //移除集合中指定的value        $redis->sRem('websocket_sjd_1', $fd);        var_dump('closed');    }    /**     * 客户端链接     * @param WebSocketServer $server     * @param Request $request     */    public function onopen(WebSocketServer $server, Request $request): voID    {        //保存客户端ID        $redis = $this->container->get(\\Redis::class);        $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);        var_dump($res1);        $res = $redis->expire('websocket_sjd_1', 7200);        var_dump($res);        $server->push($request->fd, 'Opened');    }}

WebSocket 前端代码

    function WebSockettest() {        if ("WebSocket" in window) {            console.log("您的浏览器支持 WebSocket!");            var num = 0            // 打开一个 web socket            var ws = new WebSocket("ws://127.0.0.1:12222");            ws.onopen = function () {                // Web Socket 已连接上,使用 send() 方法发送数据                //alert("数据发送中...");                //ws.send("发送数据");            };            window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开                var Ping = {"type": "Ping"};                ws.send(JsON.stringify(Ping));            }, 5000);            ws.onmessage = function (evt) {                var d = JsON.parse(evt.data);                console.log(d);                if (d.code == 300) {                    $(".address").text(d.address)                }                if (d.code == 200) {                    var v = d.data                    console.log(v);                    num++                    var str = `<div class="item">                                    <p>${v.recordOutTime}</p>                                    <p>${v.userOutname}</p>                                    <p>${v.userOutNum}</p>                                    <p>${v.doorOutname}</p>                                </div>`                    $(".tablehead").after(str)                    if (num > 7) {                        num--                        $(".table .item:nth-last-child(1)").remove()                    }                }            };            ws.error = function (e) {                console.log(e)                alert(e)            }            ws.onclose = function () {                // 关闭 websocket                alert("连接已关闭...");            };        } else {            alert("您的浏览器不支持 WebSocket!");        }    }

AMQP 组件

composer require hyperf/amqp

配置文件 [config/autoload/amqp.PHP]

<?PHPreturn [    'default' => [        'host' => 'localhost',        'port' => 5672,        'user' => 'guest',        'password' => 'guest',        'vhost' => '/',        'pool' => [            'min_connections' => 1,            'max_connections' => 10,            'connect_timeout' => 10.0,            'wait_timeout' => 3.0,            'heartbeat' => -1,        ],        'params' => [            'insist' => false,            'login_method' => 'AMQPLAIN',            'login_response' => null,            'locale' => 'en_US',            'connection_timeout' => 3.0,            'read_write_timeout' => 6.0,            'context' => null,            'keepalive' => false,            'heartbeat' => 3,        ],    ],];

MQ 消费者代码

<?PHPdeclare(strict_types=1);namespace App\\Amqp\\Consumer;use Hyperf\\Amqp\\Annotation\\Consumer;use Hyperf\\Amqp\\Message\\ConsumerMessage;use Hyperf\\Amqp\\Result;use Hyperf\\Server\\Server;use Hyperf\\Server\\ServerFactory;/** * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1) */class DemoConsumer extends ConsumerMessage{    /**     * rabbmitMQ消费端代码     * @param $data     * @return string     */    public function consume($data): string    {        print_r($data);        //获取集合中所有的value        $redis = $this->container->get(\\Redis::class);        $fdList=$redis->sMembers('websocket_sjd_1');        $server=$this->container->get(ServerFactory::class)->getServer()->getServer();        foreach($fdList as $key=>$v){            if(!empty($v)){                $server->push((int)$v, $data);            }        }        return Result::ACK;    }}

控制器代码

    /**     * test     * @return array     */    public function test()    {        $data = array(            'code' => 200,            'data' => [                'userOutname' => 'ccflow',                'userOutNum' => '9999',                'recordOutTime' => date("Y-m-d H:i:s", time()),                'doorOutname' => '教师公寓',            ]        );        $data = \\Guzzlehttp\\Json_encode($data);        $message = new DemoProducer($data);        $producer = ApplicationContext::getContainer()->get(Producer::class);        $result = $producer->produce($message);        var_dump($result);        $user = $this->request->input('user', 'Hyperf');        $method = $this->request->getmethod();        return [            'method' => $method,            'message' => "{$user}.",        ];    }

最终效果

推荐:《PHP教程》 总结

以上是内存溢出为你收集整理的如何基于Hyperf实现RabbitMQ+WebSocket消息推送全部内容,希望文章能够帮你解决如何基于Hyperf实现RabbitMQ+WebSocket消息推送所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

© 版权声明
THE END
喜欢就支持一下吧
点赞199 分享
评论 抢沙发

请登录后发表评论

    请登录后查看评论内容