使用PHP7.3 Event扩展实现php版本IO多路复用

    • # 服务端代码:**
    • #
    • # <?php
    • # /
    • # * Created by PhpStorm.
    • # * User: fanghouguo.com
    • # * Date: 2019/1/4
    • # * Time: 11:47
    • # * Desc:改进版本,把event read write 分开实现。
    • # */
    • #
    • #
    • # class Worker
    • # {
    • # /
    • # * receive client data
    • # * @var array
    • # */
    • # public static $receive = [];
    • #
    • # /
    • # * master process data
    • # * @var array
    • # */
    • # public static $master = [];
    • #
    • # /
    • # * server buffer
    • # * @var array
    • # */
    • # public static $buffers = [];
    • #
    • # /
    • # * master socket
    • # * @var resource
    • # */
    • # public $mainSocket = NULL;
    • #
    • # public $count = 1;
    • #
    • # public $globalEventBase = NULL;
    • #
    • # /
    • # * Worker constructor.
    • # * @param array $config
    • # */
    • # function construct($config = array())
    • # {
    • # if (empty($config)) {
    • # echo “配置错误\n”;
    • # exit;
    • # }
    • # $scheme = $config[‘protocol’] . “://” . $config[‘ip’] . “:” . $config[‘port’];
    • # $socket = stream_socket_server($scheme, $errno, $errstr);
    • # if (false === $socket) {
    • # echo “$errstr($errno)\n”;
    • # exit();
    • # }
    • # if (!$socket) {
    • # die($errstr . “–” . $errno);
    • # }
    • # $id = (int)$socket;
    • # static::$master[$id] = $socket;
    • # $this->mainSocket = $socket;
    • # //$base = new EventBase();
    • # //$this->globalEventBase = $base;
    • #
    • # }
    • #
    • #
    • # /
    • # * accept事件回调
    • # * @param $socket
    • # * @param $flag
    • # * @param $base
    • # */
    • # public function acceptConnect($socket,$flag,$base)
    • # {
    • # $connection = stream_socket_accept($socket);
    • # $id = (int)$connection;
    • # //$event = new EventBufferEvent($base, $connection, 0, array($this, ‘eventRead’), array($this, ‘eventWrite’), array($this, ‘eventStatus’), $id);
    • # $event = new EventBufferEvent($base, $connection, 0, array($this, ‘eventRead’), array($this, ‘eventStatus’), $id);
    • # $event->setTimeouts(30, 30);
    • # $event->setWatermark(Event::READ, 0, 0xffffff);
    • # $event->setPriority(10);
    • # $event->enable(Event::READ | Event::PERSIST);
    • # static::$master[$id] = $connection;
    • # static::$receive[$id] = NULL;
    • # static::$buffers[$id] = $event;
    • # }
    • #
    • #
    • # /
    • # * read事件回调
    • # * @param $buffer
    • # * @param $id
    • # */
    • # public function eventRead($buffer, $id)
    • # {
    • # //循环读取并解析客户端消息
    • # while (1) {
    • # $read = $buffer->read(65535);
    • # if ($read === ” || $read === false || $read === NULL) {
    • # break;
    • # }
    • # $pos = strpos($read, “\n”);
    • # if ($pos === false) {
    • # static:: $receive[$id] .= $read;
    • # } else {
    • # static::$receive[$id] .= trim(substr($read, 0, $pos + 1));
    • # $receive_data = static::$receive[$id];
    • # fwrite(static::$master[$id], “receive {$receive_data} success\n”);
    • # unset(static::$buffer[$id]);
    • # unset(static::$receive[$id]);
    • # $buffer->free();
    • # }
    • # }
    • # }
    • #
    • # /
    • # * 数据写回client
    • # * @param $buffer
    • # * @param $id
    • # */
    • # public function eventWrite($buffer, $id)
    • # {
    • # print_r($buffer);
    • # $receive_data = static::$receive[$id];
    • # fwrite($buffer, “receive {$receive_data} success\n”);
    • # $buffer[$id]->free();
    • # unset(static::$buffer[$id]);
    • # unset(static::$receive[$id]);
    • # }
    • #
    • # /
    • # * 事件状态
    • # * @param $buffer
    • # * @param $events
    • # * @param $id
    • # */
    • # public function eventStatus($buffer, $events, $id)
    • # {
    • # print_r($buffer);
    • # print_r($events);
    • # echo “ev_status – ” . $events . “\n”;
    • # }
    • # /
    • # * 进程主函数,负责启动进程
    • # * 主进程负责接收client请求。转发给acceptConnect函数进行
    • # * acceptConnect函数负责将client请求转发给eventRead、eventWrite、eventStatus函数进行数据
    • # */
    • # function run()
    • # {
    • # $pid = pcntl_fork();
    • # if ($pid < 0) {
    • # echo ‘fork pid error’;
    • # exit;
    • # }elseif ($pid > 0){
    • # exit;
    • # }elseif ($pid == 0) {
    • # $base = new EventBase();
    • # $event = new Event($base, $this->mainSocket, Event::READ | Event::PERSIST, array($this, ‘acceptConnect’),$base);
    • # $event->add();
    • # $base->loop();
    • # }
    • # $sid = posix_setsid();
    • # if ($sid < 0) {
    • # echo ‘setsid error’;
    • # exit;
    • # }
    • # }
    • #
    • # /
    • # * 测试
    • # * 进程主函数,负责启动进程
    • # * 主进程负责接收client请求。转发给acceptConnect函数进行
    • # * acceptConnect函数负责将client请求转发给eventRead、eventWrite、eventStatus函数进行数据
    • # */
    • # // function unit_run()
    • # // {
    • # // $event = new Event($this->globalEventBase, $this->mainSocket, Event::READ | Event::WRITE | Event::PERSIST, array($this, ‘acceptConnect’),$this->globalEventBase);
    • # // $event->add();
    • # // $this->globalEventBase->loop();
    • # // }
    • # }
    • #
    • # 客户端代码:
    • #
    • # <?php
    • # /
    • # * Created by PhpStorm.
    • # * User: fanghouguo.com
    • # * Date: 2019/1/2
    • # * Time: 11:34
    • # */
    • #
    • # require_once “Worker.php”;
    • # $config = array(
    • # ‘protocol’ => ‘tcp’,
    • # “ip” => ‘127.0.0.1’,
    • # ‘port’ => 9501,
    • # “demaoin”
    • # );
    • # $worker = new Worker($config);
    • # $worker->run();
    • #
    • #
    • # 下期把测试结果奉上。
    • # 以上参考了飞鸿影博客。
    • #
    • #
    • # **

点击量:44

发表评论

电子邮件地址不会被公开。 必填项已用*标注