Swoole

swoole文档中心

上代码:

class AcePush
{

    public $server;

    public function __construct()
    {
        $this->server = new Swoole\Websocket\Server("0.0.0.0", 9573);

        $this->server->set([
            'dispatch_mode' => 5,
            'worker_num' => 1,
            'heartbeat_check_interval' => 30,
            'heartbeat_idle_time' => 62,
        ]);

        $this->server->on('open', [$this, 'onOpen']);

        $this->server->on('message', [$this, 'onMessage']);

        $this->server->on('close', [$this, 'onClose']);

        $this->server->on('request', [$this, 'onRequest']);
    }

    public function onOpen(swoole_websocket_server $server, $request)
    {
        // get不存在或者uid和token有一项不存在,关闭当前连接
        if (!isset($request->get) || !isset($request->get['uid']) || !isset($request->get['token']) || !isset($request->get['client'])) {
            $this->server->close($request->fd);
            return false;
        }
        $token = $request->get['token'];
        $client = $request->get['client'];
        $uid = $request->get['uid'];
        if (false == $this->checkAccess($token, $client)) {
            $this->server->close($request->fd);
            return false;
        }
        // 把 client, uid , fd 存入mysql或redis,以便两个用户问互发消息
        $this->server->bind($request->fd, $uid);
        return true;
    }

    public function onMessage(Swoole\WebSocket\Server $server, \Swoole\WebSocket\Frame $frame)
    {
        echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";

        $data = json_decode($frame->data, true);
        if (!$data) {
            $this->invalidMessageResponse($frame, "Invalid data format");
            return false;
        }
        if (!isset($data['act']) || !isset($data['message']) || !isset($data['to_uid'])) {
            $this->invalidMessageResponse($frame, "Invalid data parameter");
            return false;
        }
        // 根据act 判断是否特殊处理
        switch ($data['act']) {
            case 'ping':
                $this->server->push($frame->fd, json_encode(['act' => 'pong', 'status' => 1]));
                return true;
                break;
            default:
        }

        if ($data['to_uid'] == 'all') {
            $this->broadcastAll($data['message'], $data['act']);
            return true;
        } else {
            $this->server->push($data['to_uid'], json_encode($data, JSON_UNESCAPED_UNICODE));
            return true;
        }
    }

    public function onClose($ser, $fd)
    {
        echo "client {$fd} closed\n";
    }

    public function onRequest($request, \Swoole\Http\Response $response)
    {
        if (!isset($request->get) || !isset($request->get['act'])) {
            $this->invalidHttpRequest();
            return false;
        }
        $act = $request->get['act'];
        if ($act == 'shutdown') {
            $this->server->shutdown();
        } elseif ($act == 'stats') {
            $data['connections'] = [];
            foreach ($this->server->connections as $fd) {
                if ($this->server->exist($fd)) {
                    $this->server->push($fd, $request->get['message']);
                }
                $client = $this->server->getClientInfo($fd);
                $client['is_connect'] = $this->server->isEstablished($fd);
                $data['connections'][] = $client;
            }
            $data['stats'] = $this->server->stats();
            $response->end(json_encode($data));
        } elseif ($act == 'push') {
            $act = 'message';
            $message = [
                'time' => date('Y-m-d H:i:s'),
                'message' => $request->get['message'] ?? '',
                'voice' => $request->get['voice'] ?? ''
            ];
            $this->broadcastAll($message, $act);
            $response->end('ok');
        } else {
            $response->end(json_encode('Undefined action'));
        }

        return true;
    }

    public function invalidHttpRequest($message = "Invalid request")
    {
        $data = [
            'status' => 403,
            'message' => $message
        ];
        echo json_encode($data, JSON_UNESCAPED_UNICODE);
    }

    public function checkAccess($token, $client = 'admin')
    {
        if ($token) {
            $url = '';
            $resp = $this->httpRequest($url);
            return true;
        } else {
            return false;
        }
    }

    public function broadcastAll($message, $act)
    {
        $data = [
            'status' => 1,
            'message' => $message,
            'act' => $act
        ];
        $resp = json_encode($data, JSON_UNESCAPED_UNICODE);
        foreach ($this->server->connections as $fd) {
            if ($this->server->isEstablished($fd)) {
                $this->server->push($fd, $resp);
            }
        }
    }

    public function parsePushRequest($request)
    {

    }

    public function invalidMessageResponse(\Swoole\WebSocket\Frame $request, $message = "Invalid Message")
    {
        $resp = [
            'status' => 0,
            'message' => $message,
            'act' => 'none'
        ];
        $this->server->push($request->fd, json_encode($resp));
    }

    public function httpRequest($url, $data = null)
    {
        return '';
    }

    public function start()
    {
        $this->server->start();
    }

}

(new AcePush())->start();