Press "Enter" to skip to content

使用PHP Socket开发Yar TCP服务

Yar支持HTTP和TCP俩种Transporter, HTTP的是基于CURL,PHP中的Yar默认就是走的HTTP Transporter, 这个大家应该都不陌生, 但是基于TCP的, 可能大家会用的少一些。

事实上,我6年前也写过一个C的Yar server框架,叫做Yar-c, 代码地址在Yar-C at Github, 它提供了服务启动,worker进程管理,Yar打包协议等。当时我们用这个框架,实现了高性能的微博白名单等服务,以供PHP端使用Yar Client来调用。

只不过,Yar C需要用C来写Handle, 可能对于不少PHPer来说,会稍微有点陌生,那今天我们尝试用PHP来写一个TCP的Server,来介绍下如何实现对Yar RPC协议的处理, 这个例子可以方便的结合Swoole等异步PHP框架,实现一个高性能的Yar TCP Server。 这个过程中, 会让大家了解Yar的RPC通信协议,以及捎带了解下Socket编程。

我们今天还是用“白名单”服务作为例子,我们提供一个接口,接受RPC客户端的请求,参数是一个用户ID,返回bool,表示是否在白名单:

function query(int $id) : bool;

首先,我们建立一个文件yar_server, 为了方便的直接执行,我们在文件写下:

#!/bin/env php7
<?php
class WhiteList {
}

然后,通过chmod a+x 给这个文件增加可执行的权限。

第一步我们需要处理服务的启动参数处理, 接受一个参数S表示要监听的IP和端口,值的格式是host:port, 我们使用PHP的getopt函数来处理命令行参数:

class WhiteList {
    protected $host;

    public function __construct() {
        $options = getOpt("S:");
        if (!isset($options["S"])) {
            $this->usage();
        }
    }

    protected function usage() {
        exit("Usage: yar_server -S hostname:port\n");
    }
}

这样,当用户启动yar_server的时候,没有指定S参数,我们就退出,并提示Usage。 我们还需要另外一个配置,就是指向一个词表文件,词表文件中每一行是一个在白名单中的用户ID, 我们用F表示:

class WhiteList {
    protected $host;
    protected $dicts;

    public function __construct() {
        $options = getOpt("S:F:");
        if (!isset($options["S"]) || !isset($options["F"])) {
            $this->usage();
        }
        $this->host = $options["S"];
        $this->dicts = $options["F"];
    }

    protected function usage() {
        exit("Usage: yar_server -F path_to_dict -S hostname:port\n");
    }
}

好了, 现在启动参数处理完成, 当然为了简单,我省去了对输入参数的有效性检查。

接下来, 我们需要完成俩个函数, 第一个是读取-F指定的词表文件,把所有的用户ID读入到一个数组中,因为我们的这个服务会是常驻进行, 所以不用担心性能, 它只会在启动阶段处理这个词表文件:

protected function loadDict() {
	$this->ids = array();

	$fp = fopen($this->dicts, "r");
	while (!feof($fp)) {
		$line = trim(fgets($fp));
		if ($line) {
			$this->ids[$line] = true;
		}
	}
	fclose($fp);
	echo "Loading dict successfully, ", count($this->ids), " loaded\n";

	return $this;
}

因为用户ID是整型,所以我们把它当作Hashtable的key,这样在将来查找的时候,使用isset会非常高效。 需要注意的是因为文件处理不是我们今天要讲的重点,也就省去了对文件存在行,可读性,合法性的检查。

好了, 接下来是重点了, 我们要启动一个IPV4 TCP Socket服务,监听在$host指定的地方, 为了方便大家了解Socket API,我们不采用PHP的Stream系列函数,而是采用PHP直接包装的Socket系列API, 首先我们用socket_create创建一个Socket套接字:

protected function listen() {
	$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
	if ($socket == false) {
		throw new Exception("socket_create() failed: reason: " . socket_strerror(socket_last_error()));
	}
}

然后,我们需要使用socket_bind绑定这个Socket到我们需要监听的地址, 并且使用socket_listen来监听请求:

protected function listen() {
	$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
	if ($socket == false) {
		throw new Exception("socket_create() failed: reason: " . socket_strerror(socket_last_error()));
	}
	list($hostname, $port) = explode(":", $this->host);
	if (socket_bind($socket, $hostname, $port) == false) {
		throw new Exception("socket_bind() failed: reason: " . socket_strerror(socket_last_error()));
	}
	if (socket_listen($socket, 64) === false) {
		throw new Exception("socket_listen() failed: reason: " . socket_strerror(socket_last_error()));
	}
	echo "Starting Yar_Server at {$this->host}\nPresss Ctrl + C to quit\n";

	$this->socket = $socket;
	return $this;
}

好了, 如果一切没问题,接下来我们就可以socket_accept来监听请求了, 默认的socket是阻塞模式,如果没有请求,进程会一直阻塞等待, 对于高性能的服务来说, 最好采用非阻塞+select或者epoll的模式来同时处理多个请求, 但是我们的这个例子主要是为了介绍Yar的协议, 所以还是采用简单的阻塞模式。

接下来,我们来编写真正的RPC处理部分,首先我们通过accept接受一个请求, 然后读取请求的的内容,分析请求头中的Yar RPC Header信息, Yar RPC的协议头定义如下:

typedef struct _yar_header {
    uint32_t       id;            // transaction id
    uint16_t       version;       // protocl version
    uint32_t       magic_num;     // default is: 0x80DFEC60
    uint32_t       reserved;
    unsigned char  provider[32];  // reqeust from who
    unsigned char  token[32];     // request token, used for authentication
    uint32_t       body_len;      // request body len
}

其中, magic_num是用来验证请求有效性的一个特殊值, 合法的Yar RPC请求都会设置这个值为0x80DFEC60(我很想告诉你为啥是这个值,但我真不记得当时我为啥用这个数字了),这个头部是82个字节,可能有同学会问,不对啊一看这个Struct不应该是82啊,那是因为头部申明的时候采用pack模式,也就是不对齐, 所以确实是82个字节.

这里有一个需要注意的是, 0x80DFEC60如果你是自在32位的系统上的话,这个值超过了PHP的最大有符号整数的表示范围,类似我之前的这篇文章介绍的PHP_INT_MIN, PHP会自动转换成浮点数,所以如果是在32位系统上,你不能直接定义0x80DFEC60, 而是需要这么写来定义这个值:

pack("H*", "80DFEC60");

provider是一个字符串,标明了客户端的名字, 比如对于Yar扩展的Yar_Client就是"Yar PHP Cient-x.x.x"

token在设计的最初是为了做API key验证的,但是后来没用上,因为大部分都是内网应用,可以有多种办法来保证请求来源的合法性。

id是一个唯一请求id,这个是为了排查请求问题的, version默认为0,或者1,目前我没有升级过协议头,所以这个暂时我们也不用关心,reserved可以用来传递一些请求参数, 比如客户端可以说明是否保持连接。

body_len是我们需要关心的, 这个字段表明了这次请求,请求体一共多大(不包括Yar协议头部)。

所有的这些数字, 都是以网络字节序传递的, 我们采用PHP处理二进制流的unpack函数来解析读取进来的二进制流:

protected function parseHeader($header) {
   return 
     unpack("Nid/nversion/Nmagic_num/Nreserved/A32provider/A32token/Nbody_len", $header);
}

这个函数会返回一个上面说到的头部结构体的数组。

对应的我们也需要使用pack来实现生成Yar Header的方法:

const YAR_MAGIC_NUM = 0x80DFEC60;
protected function genHeader($id, $len) {
	$bin = pack("NnNNA32A32N", $id, 0, self::YAR_MAGIC_NUM, 0, "Yar PHP TCP Server", "", $len);
	return $bin;
}

如刚才说的,我们需要在接受一个请求以前, 验证请求的合法性:

protected function validRequest($header) {
	if ($header["magic_num"] != self::YAR_MAGIC_NUM) {
		return false;
	}
	return true;
}

所以大概请求的处理整个逻辑框架是:

protected function accept() {
	while (($conn = socket_accept($this->socket))) {
		$buf = socket_read($conn, self::HEADER_SIZE, PHP_BINARY_READ);
		if ($buf === false) {
			socket_shutdown($conn);
			continue;
		}

		if (!$this->validHeader($header = $this->parseHeader($buf))) {
			$output = $this->response(1, "illegal Yar RPC request");
			goto response;
		}

		$buf = socket_read($conn, $header["body_len"], PHP_BINARY_READ);
		if ($buf === false) {
			$output = $this->response(1, "insufficient request body");
			goto response;
		}

		if (!$this->validPackager($buf)) {
			$output = $this->response(1, "unsupported packager");
			goto response;
		}

		$buf = substr($buf, 8); /* 跳过打包信息的8个字节 */
		$request = $this->parseRequest($buf);
		if ($request == false) {
			$this->response(1, "malformed request body");
			goto response;
		}

		$status = $this->handle($request, $ret);

		$output = $this->response($status, $ret);
response:
		socket_write($conn, $output, strlen($output));

		socket_shutdown($conn); /* 关闭写 */
	}
}

现在整体的框架就算完成了,我们需要完成handle,response方法就可以了,handle是要根据用户的请求中的m, 来调用指定的方法

protected function handle($request, &$ret) {
	if ($request["m"] == "query") {
		$ret = $this->query(...$request["p"]);
	} else {
		$ret = "unsupported method '" . $request["m"]. "'";
		return 1;
	}
	return 0;
}

现在来实现query方法本身, 这个会很简单,就检查下id是不是在白名单数组:

protected function query($id) {
	return isset($this->ids[$id]);
}

好了,接下来我们要完成response方法,这个方法是打包一个符合Yar协议的返回体,包括82个字节的头部,8个字节的打包信息,以及序列化后的响应体, 我们需要根据status不同,来选择设置响应体中的r还是e字段:

protected function response($status, $ret) {
	$body = array();

	$body["i"] = 0;
	$body["s"] = $status;
	if ($status == 0) {
		$body["r"] = $ret;
	} else {
		$body["e"] = $ret;
	}

	$packed = serialize($body);
	$header = $this->genHeader(0, strlen($packed) + 8);

	return $header . str_pad("PHP", 8, "\0") . $packed;
}

好了, 马上就要大功告成了,我们最后完成启动方法和析构函数(关闭socket):

public function run() {
	$this->loadDict()->listen()->accept();
}
public function __destruct() {
	if ($this->socket) {
		socket_close($this->socket);
	}
}

现在一切就绪, 我们最后在文件末尾加入:

(new Whitelist)->run();

在测试之前,我们先准备一个测试词表,比如1到1000的id:

seq 1 1 10000 > user_id.dict

然后启动服务, 监听在本机的9000端口:

$ ./yar_server -F user_id.dict -S127.0.0.1:9000
Loading dict successfully, 1000 loaded
Starting Yar_Server at 127.0.0.1:9000
Presss Ctrl + C to quit

不错,服务启动成功,然后我们使用Yar扩展来编写客户端(你需要首先安装好Yar扩展), 测试下用户id 999和99999的调用效果:

<?php
$yar = new Yar_Client("tcp://127.0.0.1:9000");
var_dump($yar->query("999"));
var_dump($yar->query("99999"));
?>

和调用HTTP的Yar服务不同,此处我们应该使用tcp://做地址头,表示这是一个TCP的服务。

来,运行一下看看:

php7 client.php
bool(true)
bool(false)

看起来不错, 符合预期!

你也可以尝试故意构造一些错误的可能,比如调用不存在的方法之类的,来看看服务器的反应, 这个例子的代码你可以在这里找到.

到这里我就算介绍完了如何采用PHP来编写Yar的TCP服务, 大家应该可以很方便的把这个例子修改完善成自己希望的格式,或者嵌入Swoole(可以参考Swoole作者写的:这里)。

还是要再次说明,因为本文的主要目的是为了介绍Yar RPC通信协议,所以在服务管理这块并没有做的很完善,比如socket_accept, socket_read/write等都默认采用了阻塞模式,也没有加入超时设计,服务进程也只有一个,这个如果真的想用做实际服务的话,还是需要一些功课的,不过我相信你有兴趣的话,都是可以搞定的。:)

当然,最简单的是,你可以直接使用Yar-C服务框架来编写C Yar TCP服务。

在这里也有一个Yar-C Server的例子yar_server in C.

enjoy!

18 Comments

  1. ffeng
    ffeng May 12, 2020

    按照GitHub 的两个 demo 试了一下,服务器端,body 解析不出来数据。

    yar 版本:
    yar support => enabled
    Version => 2.1.2

    $header = unpack(“Nid/nversion/Nmagic_num/Nreserved/A32provider/A32token/Nbody_len”, substr($data, 0,HEADER_SIZE));
    var_dump($header); //header 数据正常
    //var_dump(substr($data, HEADER_SIZE,8)); //output: MSGPACK

    $buf = substr($data, HEADER_SIZE + 8);
    var_dump($buf);
    $request = unserialize($buf); // notice: unserialize(): Error at offset 0 of 23 bytes
    var_dump($request); //false

    • ffeng
      ffeng May 12, 2020

      可以了,需要修改配置文件,yar.packager=php

    • ffeng
      ffeng May 12, 2020

      MSGPACK, 使用 msgpack_unpack 来解析body.

  2. lex
    lex May 11, 2020

    鸟哥,self::HEADER_SIZE 这个常量未找到,其值是否是 82?
    还有几个方法的实现没有找到具体的示例:
    accept() 中的 validHeader() 方法是 validRequest() 吗?
    validPackager()、parseRequest() 均未找到实现,鸟哥能指导一下咩

    • PHPer
      PHPer May 11, 2020

      你直接下载github上的那个示例代码就行吧?

      • lex
        lex May 11, 2020

        哦哦哦,谢谢,没仔细看链接,谢谢

    • ffeng
      ffeng May 12, 2020

      你的可以吗? 两个例子我都试了下,都解析不出body的数据。

  3. yisangwu
    yisangwu April 18, 2020

    这里:
    seq 1, 1, 10000 > user_id.dict

    似乎不需要逗号分隔:
    seq 1 1 10000 > user_id.dict

    • laruence
      laruence April 18, 2020

      对,不需要逗号,写顺手了,我修正下

  4. joke
    joke April 11, 2020

    能不能整点有技术含量的东西, 能不能来点有创新的东西,难怪php越来越不行…

    • tangq
      tangq April 14, 2020

      你是不是有毒?
      你行的话超越鸟哥,做出自己的贡献,别在这阴阳怪气的。

      • shudun
        shudun May 5, 2020

        这种人就是现实失意 来网上找存在感的 越搭理他越来劲
        所以忽略就好了

  5. mXu
    mXu April 10, 2020

    作者是谁啊,就这?

  6. DesolateVAGH
    DesolateVAGH April 8, 2020

    相当高深 得细品

  7. DusNoob
    DusNoob April 8, 2020

    还是 HTTP Transporter 适合我-_-!

  8. 月染指上
    月染指上 April 1, 2020

    看起来不错

Leave a Reply

Your email address will not be published. Required fields are marked *