workerman实现微信公众号带参数二维码扫码识别用户

爱语飞飞 2019-09-01 PM 1334℃ 0条

完整功能体验

查看源代码,有完整的通讯流程实现。 https://iyuu.cn/

什么是Workerman

workerman是一个高性能的PHP socket 服务器框架,workerman基于PHP多进程以及libevent事件轮询库,PHP开发者只要实现一两个接口,便可以开发出自己的网络应用,例如Rpc服务、聊天室服务器、手机游戏服务器等。
workerman的目标是让PHP开发者更容易的开发出基于socket的高性能的应用服务,而不用去了解PHP socket以及PHP多进程细节。 workerman本身是一个PHP多进程服务器框架,具有PHP进程管理以及socket通信的模块,所以不依赖php-fpm、nginx或者apache等这些容器便可以独立运行。

说明

此功能是利用微信公众号带参数二维码,实现扫码识别用户,并且实时通知前端扫码状态,并非ajax轮询!从而进行后续的其他业务逻辑。

php后端及时推送消息给客户端,原理:

1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送

工作流程详解

1、 用workerman框架,编写websocket服务后端监听2129端口,进程启动同时再监听一个内部通讯5678端口,2129端口等待前端页面发起连接:https://iyuu.cn/usr/index.html
2、 用户进入前端页面,自动连接wss://iyuu.cn:2129;
3、 用户点击获取二维码,请求二维码生成接口:https://iyuu.cn/qrcode,返回二维码参数:

{"ticket":"gQH47zwAAAAAAAAAAS5odHRwOi8vd2VpeGluLnFxLmNvbS9xLzAycTMtdzlMVEhlYzIxcF9jQU50MWsAAgQHjGRdAwR4AAAA","expire_seconds":120}

4、 把二维码参数,转发到websocket服务wss://iyuu.cn:2129,websocket服务保存转发来的信息建立映射关系;
5、 显示二维码:https://mp.weixin.qq.com/cgi-bin/showqrcode?ticket={data.ticket},用户扫码;
6、 微信开发者接口会收到扫码结果,获取到场景值ID;
7、 根据场景值ID从Redis缓存取出ticket校验通过,执行业务逻辑(登录、绑定、解绑、积分等等),并通过5678端口实时通知用户扫码后的处理结果。

完整Websocket服务代码

<?php

use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\TcpConnection;

/**
 * 单进程的WebSocket服务,支持WSS协议
 * Class WebSocketServer
 */
class WebSocketServer extends Worker
{
    /**
     * 心跳间隔40秒,过期未通信,主动断开连接
     */
    const HEARTBEAT_TIME = 40;

    /**
     * 内部通信密钥
     */
    const SECRET = 'f1342dc1ee636bd3dda035d2b299f1cdabf5d0fdef89114153d98231ace243a38bb4b0d4';

    /**
     * 进程名字
     * @var string
     */
    public $name = 'WebSocketServer';

    /**
     * 内部监听协议与端口,方便内部系统推送数据,Text协议格式 文本+换行符
     * @var string
     */
    public $innerListen = 'text://127.0.0.1:5678';

    /**
     * 新增加一个属性,用来保存uid到connection的映射(uid是用户id或者客户端唯一标识)
     * @var array
     */
    public $uidConnections = [];

    /**
     * 记录在线用户数
     * @var int
     */
    public $lastOnlineCount = 0;

    /**
     * 进程启动时间
     *
     * @var int
     */
    protected $_startTime = 0;

    /**
     * 构造函数
     * - 如果设置证书,会使用SSL协议
     * @param string $socket_name
     * @param array  $context_option
     */
    public function __construct($socket_name = '', array $context_option = array())
    {
        if (!empty($context_option)) {
            $this->transport = 'ssl';
        }

        // 运行父方法
        parent::__construct($socket_name, $context_option);
    }

    /**
     * Run worker instance.
     */
    public function run()
    {
        // 设置 onWorkerStart 回调
        $this->onWorkerStart = array($this, 'onWorkerStart');

        // 设置 onConnect 连接回调
        $this->onConnect = array($this, 'onConnect');

        // 设置 onMessage 回调
        $this->onMessage = array($this, 'onMessage');

        // 设置 onClose 回调
        $this->onClose = array($this, 'onClose');

        // 记录进程启动的时间
        $this->_startTime = time();

        // 强制单进程
        $this->count = 1;

        // 运行父方法
        parent::run();
    }

    /**
     * 设置Worker子进程启动时的回调函数
     *
     * @param WebSocketServer $worker
     * @throws Exception
     */
    public function onWorkerStart($worker)
    {
        /**
         * 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
         */
        $inner_text_worker = new Worker($this->innerListen);
        /**
         * @param TcpConnection $connection
         * @param $buffer
         */
        $inner_text_worker->onMessage = function ($connection, $buffer) use ($worker) {
            // JSON格式
            $data = json_decode($buffer, true);
            if (empty($data)) {
                $connection->send('empty');
            }

            // 优先处理事件
            if (!empty($data['event']) && is_string($data['event'])) {
                /**
                 * 处理内部推送过来的事件
                 */
                $worker->innerEventHandler($connection, $data);
                // 事件处理内已回复,这里直接返回
                return;
            } else {
                /**
                 * 处理内部向用户推送的消息(两种推送消息的方式,任选其一即可)
                 * - 优先级高:uid
                 * - 优先级低:client_id + signHash
                 */
                $ret = $worker->innerSendHandler($data);
                $connection->send($ret ? 'ok' : 'fail');
            }
        };
        // 执行内部监听
        $inner_text_worker->listen();

        /**
         * 进程启动后设置定时器,关闭超时的链接
         */
        Timer::add(2, function () use ($worker) {
            $time_now = time();
            foreach ($worker->connections as $connection) {
                // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
                if (empty($connection->lastMessageTime)) {
                    $connection->lastMessageTime = $time_now;
                    continue;
                }
                // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
                if ($time_now - $connection->lastMessageTime > static::HEARTBEAT_TIME) {
                    if (isset($connection->uid)) {
                        // 连接断开时删除映射
                        $worker->unbindUid($connection->id, $connection->uid);
                    }
                    $connection->close();
                }
            }
        });

        /**
         * 进程启动后设置一个每天重启进程的定时器
         */
        Timer::add(86400, function () use ($worker) {
            Worker::stopAll();
        });
    }

    /**
     * 当客户端建立连接时(TCP三次握手完成后)触发的回调函数
     *
     * @param TcpConnection $connection
     */
    public function onConnect($connection)
    {
        // 增加在线人数
        $this->lastOnlineCount++;

        /**
         * 向所有在线连接推送数据
         */
        $rs = [
            'cmd'    => 'online',
            'online' => $this->lastOnlineCount
        ];
        $this->sendToAll(json_encode($rs, JSON_UNESCAPED_UNICODE), null);

        /**
         * 响应当前连接
         */
        $time = time();
        $resp = [
            'event'  => 'onConnect',
            'client_id' => $connection->id,
            'timestamp' => $time,
            'signHash'   => md5(self::SECRET . $connection->id . $time . self::SECRET)
        ];
        $connection->send(json_encode($resp, JSON_UNESCAPED_UNICODE));

        /**
         * 给connection临时设置一个signHash属性,表示当前连接的签名
         * - 供内部系统绑定uid使用(绑定uid时,必须校验此参数,避免wss服务重启或者宕机引起绑定关系错乱)
         */
        if (!isset($connection->signHash)) {
            $connection->signHash = $resp['signHash'];
        }
    }

    /**
     * 当客户端通过连接发来数据时,触发的回调函数
     *
     * @param TcpConnection $connection
     * @param $data
     */
    public function onMessage($connection, $data)
    {
        // 给connection临时设置一个lastMessageTime属性,用来记录上次收到消息的时间
        $connection->lastMessageTime = time();
        // 客户端传递的是json数据
        $msg = json_decode($data, true);
        if (empty($msg)) {
            return;
        }

        /**
         * 业务分拣
         */
        if (isset($msg['cmd']) && $msg['cmd']) {
            /**
             * 命令消息,根据命令执行不同的业务
             */
            switch ($msg['cmd']) {
                case 'ping':
                    // 发送当前在线人数
                    $ret = [
                        'cmd'    => 'online',
                        'online' => $this->lastOnlineCount
                    ];
                    $connection->send(json_encode($ret, JSON_UNESCAPED_UNICODE));
                    break;
                default:
                    $connection->send('{"cmd":"default"}');
                    break;
            }
            return;
        } else {
            if (!empty($msg['ticket'])) {
                /**
                 * 关联客户端ticket
                 * - 用户从内部应用获取客户端ticket,主动上报给wss服务与client_id绑定。
                 */
                if (isset($connection->ticket)) {
                    //覆盖过期的ticket
                    $connection->ticket = $msg['ticket'];
                } else {
                    //首次关联,带参数二维码ticket
                    $connection->ticket = $msg['ticket'];
                }
                $connection->send('{"cmd":"bind","success":"ticket"}');
                return;
            } else {
                /**
                 * 没有ticket
                 */
                $connection->send('{"message":"miss"}');
                return;
            }
        }
    }

    /**
     * 当客户端连接断开时触发的回调函数
     *
     * @param TcpConnection $connection
     */
    public function onClose($connection)
    {
        if (isset($connection->uid)) {
            // 连接断开时删除映射
            $this->unbindUid($connection->id, $connection->uid);
        }

        // 减少在线人数
        $this->lastOnlineCount and $this->lastOnlineCount--;

        $ret = [
            'cmd'    => 'online',
            'online' => $this->lastOnlineCount,
            'event'  => 'onClose',
            'client_id' => $connection->id
        ];
        // 向所有链接用户推送数据
        $this->sendToAll(json_encode($ret, JSON_UNESCAPED_UNICODE), $connection->id);
    }

    /**
     * 进程关闭时触发的回调函数
     *
     * @param Worker $worker
     */
    public function onWorkerStop($worker)
    {
        //通知运维人员
        #todo
    }

    /**
     * 向uid绑定的所有在线client_id发送数据
     * - 默认uid与client_id是一对多的关系,如果当前uid下绑定了多个client_id,则多个client_id对应的客户端都会收到消息,这类似于PC QQ和手机QQ同时在线接收消息。
     *
     * @param string|int $uid       用户uid
     * @param string $message       要发送的数据(字符串类型)
     * @return bool
     */
    public function sendToUid($uid, $message)
    {
        if (isset($this->uidConnections[$uid])) {
            /**
             * @param TcpConnection $connection
             */
            array_walk($this->uidConnections[$uid], function ($connection, $k) use (&$message) {
                $connection->send($message);
            });
            return true;
        }
        return false;
    }

    /**
     * 针对客户端id推送数据
     *
     * @param int    $id            客户端ID
     * @param string $message       要发送的数据(字符串类型)
     * @return bool
     */
    public function sendToClient($id, $message)
    {
        if (isset($this->connections[$id])) {
            /**
             * @param TcpConnection $connection
             */
            $connection = $this->connections[$id];
            $connection->send($message);
            return true;
        }
        return false;
    }

    /**
     * 向所有链接用户推送数据
     *
     * @param string $message               要发送的数据(字符串类型)
     * @param int $exclude_client_id        客户端ID将被排除在外,不会收到本次发的消息
     */
    public function sendToAll($message, $exclude_client_id = null)
    {
        /**
         * @param TcpConnection $connection
         */
        foreach ($this->connections as $connection) {
            if ($connection->id !== $exclude_client_id) {
                $connection->send($message);
            }
        }
    }

    /**
     * 将client_id与uid绑定,以便通过sendToUid($uid)发送数据
     * - 一个client_id只能绑定一个uid,如果绑定多次uid,则只有最后一次绑定有效
     *
     * @param int $id            客户端ID
     * @param string|int $uid    用户的uid
     * @return bool
     */
    public function bindUid(int $id, $uid):bool
    {
        if (isset($this->connections[$id])) {
            $connection = $this->connections[$id];
            if (isset($connection->uid) && $connection->uid) {
                /**
                 * 已经绑定过,解除绑定并删除映射
                 */
                if ($connection->uid !== $uid) {
                    $this->unbindUid($id, $uid);
                }
            }

            // 绑定uid
            $connection->uid = $uid;
            if (!isset($this->uidConnections[$uid])) {
                $this->uidConnections[$uid] = [];
            }
            /**
             * uid与client_id是一对多的关系,系统允许一个uid下有多个client_id
             */
            $this->uidConnections[$uid][$id] = $connection;
            return true;
        }
        return false;
    }

    /**
     * 将client_id与uid解绑
     * - 当client_id下线(连接断开)时会自动与uid解绑,开发者无需在onClose事件调用
     *
     * @param int $id
     * @param $uid
     * @return bool
     */
    public function unbindUid(int $id, $uid):bool
    {
        if (isset($this->uidConnections[$uid][$id])) {
            unset($this->uidConnections[$uid][$id]);
            if (empty($this->uidConnections[$uid])) {
                unset($this->uidConnections[$uid]);
            }
            return true;
        }
        return false;
    }

    /**
     * 返回值为与uid绑定的所有在线的client_id数组
     *
     * @param string|int $uid
     * @return array
     */
    public function getClientIdByUid($uid):array
    {
        if (isset($this->uidConnections[$uid])) {
            if (empty($this->uidConnections[$uid])) {
                unset($this->uidConnections[$uid]);
                return [];
            } else {
                return array_column($this->uidConnections[$uid], 'id');
            }
        }
        return [];
    }

    /**
     * 断开与client_id对应的客户端的连接
     *
     * @param int $id         客户端ID
     */
    public function closeClient(int $id)
    {
        if (isset($this->connections[$id])) {
            $connection = $this->connections[$id];
            if (isset($connection->uid) && $connection->uid) {
                /**
                 * 已经绑定过
                 * - 删除映射解除绑定
                 */
                $this->unbindUid($id, $connection->uid);
            }
            // 关闭连接
            $connection->close();
        }
    }

    /**
     * 内部推送事件的处理函数
     *
     * @param TcpConnection $connection
     * @param array $data
     * @return bool
     */
    public function innerEventHandler($connection, array $data):bool
    {
        try {
            switch ($data['event']) {
                case 'bindUid':
                    /**
                     * 凭借(client_id、signHash)绑定uid
                     */
                    $ret = false;
                    if (!empty($data['client_id']) && !empty($data['uid']) && !empty($data['signHash'])) {
                        $uid = $data['uid'];
                        $client_id = (int)$data['client_id'];
                        $signHash = $data['signHash'];
                        if ($this->verifySignHash($client_id, $signHash)) {
                            // 绑定成功,通知websocket客户端
                            if ($ret = $this->bindUid($client_id, $uid)) {
                                $response = [
                                    'event'     =>  'bindUid',
                                    'client_id' =>  $client_id,
                                    'uid'       =>  $uid,
                                    'uid_online'=>  count($this->uidConnections[$uid])
                                ];
                                $this->sendToUid($uid, json_encode($response, JSON_UNESCAPED_UNICODE));
                            }
                        }
                    }
                    $connection->send($ret ? 'ok' : 'fail');
                    break;
                default:
                    $connection->send('event.default');
                    break;
            }
        } catch (\Exception $ex) {
            $connection->send($ex->getMessage());
        }
        return true;
    }

    /**
     * 内部凭借signHash推送消息的处理函数
     * - client_id + signHash双重安全验证(防止wss服务重启或宕机可能导致的混乱)
     *
     * @param array $data   内部推送过来的数据
     * @return bool
     */
    public function innerSendHandler(array $data):bool
    {
        // 优先uid发送
        if (isset($data['uid']) && $data['uid']) {
            return $this->sendToUid($data['uid'], json_encode($data, JSON_UNESCAPED_UNICODE));
        }

        // 凭借(client_id、ticket)或者(client_id、signHash)发送
        if (isset($data['client_id']) && $data['client_id']) {
            $client_id = (int)$data['client_id'];
            $signHash = $data['signHash'];
            if ($this->verifySignHash($client_id, $signHash)) {
                unset($data['client_id'], $data['signHash']);
                // 向客户端id的页面推送数据
                return $this->sendToClient($client_id, json_encode($data, JSON_UNESCAPED_UNICODE));
            }
        }

        return false;
    }

    /**
     * 验证客户端signHash
     * @param int    $id        客户端ID
     * @param string $signHash  客户端当前连接的签名signHash
     * @return bool
     */
    public function verifySignHash(int $id, string $signHash = ''):bool
    {
        if (empty($signHash)) {
            return false;
        }

        if (isset($this->connections[$id])) {
            $conn = $this->connections[$id];
            $conn_signHash = isset($conn->signHash) && $conn->signHash ? $conn->signHash : '';
            if ($signHash === $conn_signHash) {
                return true;
            }
        }
        return false;
    }

    /**
     * 获取服务状态(AccessToken服务、SendMessage服务、WSS服务)
     */
    public function getServiceStatus()
    {
    }
}

前端页面关键代码

请查看http://iyuu.cn首页源码

执行方法:

/磁盘/路径/php /路径/start_wss.php start -d
标签: PHP, workerman

非特殊说明,本博所有文章均为博主原创。

评论啦~