完整功能体验
查看源代码,有完整的通讯流程实现。 https://iyuu.cn/
什么是Workerman
workerman是一个高性能的PHP socket 服务器框架,workerman基于PHP多进程以及libevent事件轮询库,PHP开发者只要实现一两个接口,便可以开发出自己的网络应用,例如Rpc服务、聊天室服务器、手机游戏服务器等。
workerman的目标是让PHP开发者更容易的开发出基于socket的高性能的应用服务,而不用去了解PHP socket以及PHP多进程细节。 workerman本身是一个PHP多进程服务器框架,具有PHP进程管理以及socket通信的模块,所以不依赖php-fpm、nginx或者apache等这些容器便可以独立运行。
说明
此功能是利用微信公众号带参数二维码,实现扫码识别用户,并且实时通知前端扫码状态,并非ajax轮询!从而进行后续的其他业务逻辑。
php后端及时推送消息给客户端,原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
工作流程详解
1、 用workerman框架,编写websocket服务后端监听2129端口
,进程启动同时再监听一个内部通讯5678端口
,2129端口
等待前端页面发起连接:https://iyuu.cn/usr/index.html
;
2、 用户进入前端页面,自动连接wss://iyuu.cn:2129
;
3、 用户点击获取二维码
,请求二维码生成接口:https://iyuu.cn/qrcode
,返回二维码参数:
{"ticket":"gQH47zwAAAAAAAAAAS5odHRwOi8vd2VpeGluLnFxLmNvbS9xLzAycTMtdzlMVEhlYzIxcF9jQU50MWsAAgQHjGRdAwR4AAAA","expire_seconds":120}
4、 把二维码参数
,转发到websocket服务wss://iyuu.cn:2129
,websocket服务保存转发来的信息建立映射关系;
5、 显示二维码:https://mp.weixin.qq.com/cgi-bin/showqrcode?ticket={data.ticket},用户扫码;
6、 微信开发者接口会收到扫码结果,获取到场景值ID;
7、 根据场景值ID从Redis缓存取出ticket校验通过,执行业务逻辑(登录、绑定、解绑、积分等等),并通过5678端口
实时通知用户扫码后的处理结果。
完整Websocket服务代码
<?php
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\TcpConnection;
/**
* 单进程的WebSocket服务,支持WSS协议
* Class WebSocketServer
*/
class WebSocketServer extends Worker
{
/**
* 心跳间隔40秒,过期未通信,主动断开连接
*/
const HEARTBEAT_TIME = 40;
/**
* 内部通信密钥
*/
const SECRET = 'f1342dc1ee636bd3dda035d2b299f1cdabf5d0fdef89114153d98231ace243a38bb4b0d4';
/**
* 进程名字
* @var string
*/
public $name = 'WebSocketServer';
/**
* 内部监听协议与端口,方便内部系统推送数据,Text协议格式 文本+换行符
* @var string
*/
public $innerListen = 'text://127.0.0.1:5678';
/**
* 新增加一个属性,用来保存uid到connection的映射(uid是用户id或者客户端唯一标识)
* @var array
*/
public $uidConnections = [];
/**
* 记录在线用户数
* @var int
*/
public $lastOnlineCount = 0;
/**
* 进程启动时间
*
* @var int
*/
protected $_startTime = 0;
/**
* 构造函数
* - 如果设置证书,会使用SSL协议
* @param string $socket_name
* @param array $context_option
*/
public function __construct($socket_name = '', array $context_option = array())
{
if (!empty($context_option)) {
$this->transport = 'ssl';
}
// 运行父方法
parent::__construct($socket_name, $context_option);
}
/**
* Run worker instance.
*/
public function run()
{
// 设置 onWorkerStart 回调
$this->onWorkerStart = array($this, 'onWorkerStart');
// 设置 onConnect 连接回调
$this->onConnect = array($this, 'onConnect');
// 设置 onMessage 回调
$this->onMessage = array($this, 'onMessage');
// 设置 onClose 回调
$this->onClose = array($this, 'onClose');
// 记录进程启动的时间
$this->_startTime = time();
// 强制单进程
$this->count = 1;
// 运行父方法
parent::run();
}
/**
* 设置Worker子进程启动时的回调函数
*
* @param WebSocketServer $worker
* @throws Exception
*/
public function onWorkerStart($worker)
{
/**
* 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
*/
$inner_text_worker = new Worker($this->innerListen);
/**
* @param TcpConnection $connection
* @param $buffer
*/
$inner_text_worker->onMessage = function ($connection, $buffer) use ($worker) {
// JSON格式
$data = json_decode($buffer, true);
if (empty($data)) {
$connection->send('empty');
}
// 优先处理事件
if (!empty($data['event']) && is_string($data['event'])) {
/**
* 处理内部推送过来的事件
*/
$worker->innerEventHandler($connection, $data);
// 事件处理内已回复,这里直接返回
return;
} else {
/**
* 处理内部向用户推送的消息(两种推送消息的方式,任选其一即可)
* - 优先级高:uid
* - 优先级低:client_id + signHash
*/
$ret = $worker->innerSendHandler($data);
$connection->send($ret ? 'ok' : 'fail');
}
};
// 执行内部监听
$inner_text_worker->listen();
/**
* 进程启动后设置定时器,关闭超时的链接
*/
Timer::add(2, function () use ($worker) {
$time_now = time();
foreach ($worker->connections as $connection) {
// 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
if (empty($connection->lastMessageTime)) {
$connection->lastMessageTime = $time_now;
continue;
}
// 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
if ($time_now - $connection->lastMessageTime > static::HEARTBEAT_TIME) {
if (isset($connection->uid)) {
// 连接断开时删除映射
$worker->unbindUid($connection->id, $connection->uid);
}
$connection->close();
}
}
});
/**
* 进程启动后设置一个每天重启进程的定时器
*/
Timer::add(86400, function () use ($worker) {
Worker::stopAll();
});
}
/**
* 当客户端建立连接时(TCP三次握手完成后)触发的回调函数
*
* @param TcpConnection $connection
*/
public function onConnect($connection)
{
// 增加在线人数
$this->lastOnlineCount++;
/**
* 向所有在线连接推送数据
*/
$rs = [
'cmd' => 'online',
'online' => $this->lastOnlineCount
];
$this->sendToAll(json_encode($rs, JSON_UNESCAPED_UNICODE), null);
/**
* 响应当前连接
*/
$time = time();
$resp = [
'event' => 'onConnect',
'client_id' => $connection->id,
'timestamp' => $time,
'signHash' => md5(self::SECRET . $connection->id . $time . self::SECRET)
];
$connection->send(json_encode($resp, JSON_UNESCAPED_UNICODE));
/**
* 给connection临时设置一个signHash属性,表示当前连接的签名
* - 供内部系统绑定uid使用(绑定uid时,必须校验此参数,避免wss服务重启或者宕机引起绑定关系错乱)
*/
if (!isset($connection->signHash)) {
$connection->signHash = $resp['signHash'];
}
}
/**
* 当客户端通过连接发来数据时,触发的回调函数
*
* @param TcpConnection $connection
* @param $data
*/
public function onMessage($connection, $data)
{
// 给connection临时设置一个lastMessageTime属性,用来记录上次收到消息的时间
$connection->lastMessageTime = time();
// 客户端传递的是json数据
$msg = json_decode($data, true);
if (empty($msg)) {
return;
}
/**
* 业务分拣
*/
if (isset($msg['cmd']) && $msg['cmd']) {
/**
* 命令消息,根据命令执行不同的业务
*/
switch ($msg['cmd']) {
case 'ping':
// 发送当前在线人数
$ret = [
'cmd' => 'online',
'online' => $this->lastOnlineCount
];
$connection->send(json_encode($ret, JSON_UNESCAPED_UNICODE));
break;
default:
$connection->send('{"cmd":"default"}');
break;
}
return;
} else {
if (!empty($msg['ticket'])) {
/**
* 关联客户端ticket
* - 用户从内部应用获取客户端ticket,主动上报给wss服务与client_id绑定。
*/
if (isset($connection->ticket)) {
//覆盖过期的ticket
$connection->ticket = $msg['ticket'];
} else {
//首次关联,带参数二维码ticket
$connection->ticket = $msg['ticket'];
}
$connection->send('{"cmd":"bind","success":"ticket"}');
return;
} else {
/**
* 没有ticket
*/
$connection->send('{"message":"miss"}');
return;
}
}
}
/**
* 当客户端连接断开时触发的回调函数
*
* @param TcpConnection $connection
*/
public function onClose($connection)
{
if (isset($connection->uid)) {
// 连接断开时删除映射
$this->unbindUid($connection->id, $connection->uid);
}
// 减少在线人数
$this->lastOnlineCount and $this->lastOnlineCount--;
$ret = [
'cmd' => 'online',
'online' => $this->lastOnlineCount,
'event' => 'onClose',
'client_id' => $connection->id
];
// 向所有链接用户推送数据
$this->sendToAll(json_encode($ret, JSON_UNESCAPED_UNICODE), $connection->id);
}
/**
* 进程关闭时触发的回调函数
*
* @param Worker $worker
*/
public function onWorkerStop($worker)
{
//通知运维人员
#todo
}
/**
* 向uid绑定的所有在线client_id发送数据
* - 默认uid与client_id是一对多的关系,如果当前uid下绑定了多个client_id,则多个client_id对应的客户端都会收到消息,这类似于PC QQ和手机QQ同时在线接收消息。
*
* @param string|int $uid 用户uid
* @param string $message 要发送的数据(字符串类型)
* @return bool
*/
public function sendToUid($uid, $message)
{
if (isset($this->uidConnections[$uid])) {
/**
* @param TcpConnection $connection
*/
array_walk($this->uidConnections[$uid], function ($connection, $k) use (&$message) {
$connection->send($message);
});
return true;
}
return false;
}
/**
* 针对客户端id推送数据
*
* @param int $id 客户端ID
* @param string $message 要发送的数据(字符串类型)
* @return bool
*/
public function sendToClient($id, $message)
{
if (isset($this->connections[$id])) {
/**
* @param TcpConnection $connection
*/
$connection = $this->connections[$id];
$connection->send($message);
return true;
}
return false;
}
/**
* 向所有链接用户推送数据
*
* @param string $message 要发送的数据(字符串类型)
* @param int $exclude_client_id 客户端ID将被排除在外,不会收到本次发的消息
*/
public function sendToAll($message, $exclude_client_id = null)
{
/**
* @param TcpConnection $connection
*/
foreach ($this->connections as $connection) {
if ($connection->id !== $exclude_client_id) {
$connection->send($message);
}
}
}
/**
* 将client_id与uid绑定,以便通过sendToUid($uid)发送数据
* - 一个client_id只能绑定一个uid,如果绑定多次uid,则只有最后一次绑定有效
*
* @param int $id 客户端ID
* @param string|int $uid 用户的uid
* @return bool
*/
public function bindUid(int $id, $uid):bool
{
if (isset($this->connections[$id])) {
$connection = $this->connections[$id];
if (isset($connection->uid) && $connection->uid) {
/**
* 已经绑定过,解除绑定并删除映射
*/
if ($connection->uid !== $uid) {
$this->unbindUid($id, $uid);
}
}
// 绑定uid
$connection->uid = $uid;
if (!isset($this->uidConnections[$uid])) {
$this->uidConnections[$uid] = [];
}
/**
* uid与client_id是一对多的关系,系统允许一个uid下有多个client_id
*/
$this->uidConnections[$uid][$id] = $connection;
return true;
}
return false;
}
/**
* 将client_id与uid解绑
* - 当client_id下线(连接断开)时会自动与uid解绑,开发者无需在onClose事件调用
*
* @param int $id
* @param $uid
* @return bool
*/
public function unbindUid(int $id, $uid):bool
{
if (isset($this->uidConnections[$uid][$id])) {
unset($this->uidConnections[$uid][$id]);
if (empty($this->uidConnections[$uid])) {
unset($this->uidConnections[$uid]);
}
return true;
}
return false;
}
/**
* 返回值为与uid绑定的所有在线的client_id数组
*
* @param string|int $uid
* @return array
*/
public function getClientIdByUid($uid):array
{
if (isset($this->uidConnections[$uid])) {
if (empty($this->uidConnections[$uid])) {
unset($this->uidConnections[$uid]);
return [];
} else {
return array_column($this->uidConnections[$uid], 'id');
}
}
return [];
}
/**
* 断开与client_id对应的客户端的连接
*
* @param int $id 客户端ID
*/
public function closeClient(int $id)
{
if (isset($this->connections[$id])) {
$connection = $this->connections[$id];
if (isset($connection->uid) && $connection->uid) {
/**
* 已经绑定过
* - 删除映射解除绑定
*/
$this->unbindUid($id, $connection->uid);
}
// 关闭连接
$connection->close();
}
}
/**
* 内部推送事件的处理函数
*
* @param TcpConnection $connection
* @param array $data
* @return bool
*/
public function innerEventHandler($connection, array $data):bool
{
try {
switch ($data['event']) {
case 'bindUid':
/**
* 凭借(client_id、signHash)绑定uid
*/
$ret = false;
if (!empty($data['client_id']) && !empty($data['uid']) && !empty($data['signHash'])) {
$uid = $data['uid'];
$client_id = (int)$data['client_id'];
$signHash = $data['signHash'];
if ($this->verifySignHash($client_id, $signHash)) {
// 绑定成功,通知websocket客户端
if ($ret = $this->bindUid($client_id, $uid)) {
$response = [
'event' => 'bindUid',
'client_id' => $client_id,
'uid' => $uid,
'uid_online'=> count($this->uidConnections[$uid])
];
$this->sendToUid($uid, json_encode($response, JSON_UNESCAPED_UNICODE));
}
}
}
$connection->send($ret ? 'ok' : 'fail');
break;
default:
$connection->send('event.default');
break;
}
} catch (\Exception $ex) {
$connection->send($ex->getMessage());
}
return true;
}
/**
* 内部凭借signHash推送消息的处理函数
* - client_id + signHash双重安全验证(防止wss服务重启或宕机可能导致的混乱)
*
* @param array $data 内部推送过来的数据
* @return bool
*/
public function innerSendHandler(array $data):bool
{
// 优先uid发送
if (isset($data['uid']) && $data['uid']) {
return $this->sendToUid($data['uid'], json_encode($data, JSON_UNESCAPED_UNICODE));
}
// 凭借(client_id、ticket)或者(client_id、signHash)发送
if (isset($data['client_id']) && $data['client_id']) {
$client_id = (int)$data['client_id'];
$signHash = $data['signHash'];
if ($this->verifySignHash($client_id, $signHash)) {
unset($data['client_id'], $data['signHash']);
// 向客户端id的页面推送数据
return $this->sendToClient($client_id, json_encode($data, JSON_UNESCAPED_UNICODE));
}
}
return false;
}
/**
* 验证客户端signHash
* @param int $id 客户端ID
* @param string $signHash 客户端当前连接的签名signHash
* @return bool
*/
public function verifySignHash(int $id, string $signHash = ''):bool
{
if (empty($signHash)) {
return false;
}
if (isset($this->connections[$id])) {
$conn = $this->connections[$id];
$conn_signHash = isset($conn->signHash) && $conn->signHash ? $conn->signHash : '';
if ($signHash === $conn_signHash) {
return true;
}
}
return false;
}
/**
* 获取服务状态(AccessToken服务、SendMessage服务、WSS服务)
*/
public function getServiceStatus()
{
}
}
前端页面关键代码
请查看http://iyuu.cn
首页源码
执行方法:
/磁盘/路径/php /路径/start_wss.php start -d