Tagged: cURL

PHP实现MySQL并发查询

一般的,一个看似很简单的页面,一次http请求后,到达服务端,穿过Cache层,落到后台后,实际可能会有很多很多的数据查询逻辑!而这些查询实际是不相互依赖的,也即可以同时查询。比如各种用户信息,用户的APP列表,每个APP对应的流量数据、消耗记录、服务状态,平台运行状态,消息通知,新闻资讯等等。
这篇文章主要介绍了数据查询层,如何把串行变并行,提高查询效率、提升应用性能。实现方式包括:mysqlnd异步查询,cURL并发请求,Swoole异步非阻塞!

PHP脚本是按文档流的形式来执行的,所以我们在编写PHP程序时,代码基本都是串行的,尤其是SQL,比如:
倒流’s Bolg
这种方式,是每次查询都需要等待结果返回之后再开始下一次的查询,
运行时间 = (第1次发送请求时间 + 0.01 + 第1次返回时间)+(第2次发送请求时间 + 0.05 + 第2次返回时间)+(第3次发送请求时间 + 0.03 + 第3次返回时间)

如果是并发查询,那么流程就成了:
倒流’s Bolg
运行时间 = 发送请求时间 + 0.05 + 返回时间

显然,并发查询要比串行查询快!

那么PHP可以实现并发查询吗?答案是肯定的!

一、利用MySQL的异步查询功能

目前 MySQL 的异步查询,只在 MySQLi 扩展提供,查询方法分别是:

  1. 使用 MYSQLI_ASYNC 模式执行 mysqli::query
  2. 获取异步查询结果:mysqli::reap_async_query

需要注意的是,使用异步查询,需要使用 mysqlnd 作为PHP的MySQL数据库驱动,
mysqlnd(MySQL Native Driver) 是 Zend 公司开发的 MySQL 数据库驱动,采用PHP开源协议,用于代替旧版的由 MySQL AB公司(现在的Oracle)开发的 libmysql,PHP 5.3 及以上版本开始提供,PHP5.4 之后的版本 mysqlnd 为默认配置选项,
如果 PHP 小于 5.4,编译时需要指定编译参数:

--with-mysqli=mysqlnd --with-pdo-mysql=mysqlnd

MySQL异步查询示例脚本:

<?php
/**
 * MySQL异步查询示例脚本:
 * @filename p_async.php
 * @url http://test.979137.com/ParallelSQL/p_async.php
 */

//期望结果集:获取以下用户的每个月每个APP的消费统计
$top = array('979137', '555555', '666666', '888888', '999999');
$ret = array_fill_keys($top, array());

//组织结构查询
$cmd = $resources = array();
$sql = "SELECT access_key,SUM(amount) sum_amount FROM consume_2016%s WHERE uid=%d AND product='SAE' GROUP BY access_key";
foreach($top as $uid) {
    for($i = 1; $i <= 12; $i++) {
        $ret[$uid][$i] = $tmp = array();
        $tmp['uid'] = $uid;
        $tmp['month'] = $i;
        $tmp['resource'] = $resources[] = new \mysqli('localhost', 'root', '123456', 'sae', '3306');
        $tmp['resource']->set_charset('utf8');
        $tmp['resource']->query(sprintf($sql, sprintf('%02d', $i), $uid), MYSQLI_ASYNC);
        $tag = spl_object_hash($tmp['resource']);
        $cmd[$tag] = $tmp;
    }
}

$total = $query_times = count($resources);

//获取结果
do {
    $read = $error = $reject = $resources;
    //等待查询结束
    if (!\mysqli::poll($read, $error, $reject, 1)) {
        continue;
    }
    //批量获取结果
    foreach($read as $resource) {
        $result = $resource->reap_async_query();
        if ($result) {
            $tag = spl_object_hash($resource);
            $uid = $cmd[$tag]['uid'];
            $month = $cmd[$tag]['month'];
            while(($row = $result->fetch_assoc()) != false) {
                $ret[$uid][$month][$row['access_key']] = $row['sum_amount'];
            }
            $result->free();
            $total--;

        } else die('MySQLi error: '.$resource->error);
    }
} while ($total > 0);

var_dump($ret);

二、cURL实现并发请求

先解释下 cURL 和 SQL 怎么就扯上关系了呢!

我们知道在很多系统架构里,PHP是不会直接操作DB的,而是 RESTful 架构,这时候所有操作都接口化了,
这时上述所讲的 SQL 就演变成 接口调用 了,
因为 API,所以 cURL!

以下是PHP中cURL多线程相关函数:

curl_multi_add_handle — 向curl批处理会话中添加单独的curl句柄
curl_multi_close — 关闭一组cURL句柄
curl_multi_exec — 运行当前 cURL 句柄的子连接
curl_multi_getcontent — 如果设置了CURLOPT_RETURNTRANSFER,则返回获取的输出的文本流
curl_multi_info_read — 获取当前解析的cURL的相关传输信息
curl_multi_init — 返回一个新cURL批处理句柄
curl_multi_remove_handle — 移除curl批处理句柄资源中的某个句柄资源
curl_multi_select — 等待所有cURL批处理中的活动连接
curl_multi_setopt — 为 cURL 并行处理设置一个选项
curl_multi_strerror — 返回描述错误码的字符串文本

我们可以利用这些多线程函数,实现 cURL 并发请求,从而实现并发 SQL!

cURL并发请求,服务端接口示例脚本:

<?php
/**
 * cURL并发请求,服务端接口示例脚本
 * @filename consume.php
 * @url http://test.979137.com/ParallelSQL/consume.php
 */
$resource = new \mysqli('localhost', 'root', '123456', 'sae', '3306');
$resource->set_charset('utf8');

$sql = "SELECT access_key,SUM(amount) sum_amount FROM consume_%d WHERE uid=%d AND product='%s' GROUP BY access_key";
$sql = sprintf($sql, $_GET['ym'], $_GET['uid'], $_GET['product']);
$res = $resource->query($sql);

$out['code'] = $resource->errno;
$out['message'] = $resource->error;
$data = array();
if (is_object($res)) {
    while(($row = $res->fetch_assoc()) != false) {
        $data[$row['access_key']] = $row['sum_amount'];
    }
}
$out['data'] = $data;

header('Content-Type: application/json; charset=utf-8');
echo json_encode($out);

cURL并发请求,客户端调用示例脚本:

<?php
/**
 * cURL并发请求,客户端调用示例脚本
 * @filename p_curl.php
 * @url http://test.979137.com/ParallelSQL/p_curl.php
 */

//期望结果集:获取以下用户的每个月每个APP的消费统计
$top = array('979137', '555555', '666666', '888888', '999999');
$ret = array_fill_keys($top, array());

$mch = curl_multi_init();
$opt[CURLOPT_HEADER] = 0;
$opt[CURLOPT_CONNECTTIMEOUT] = 60;
$opt[CURLOPT_RETURNTRANSFER] = true;
$opt[CURLOPT_HTTPHEADER] = array('Host: api.979137.com');

//生成句柄并加入到批处理
$cmd = array();
foreach($top as $uid) {
    for($i = 1; $i <= 12; $i++) {
        $ret[$uid][$i] = $tmp = array();
        $tmp['url'] = sprintf('http://127.0.0.1/ParallelSQL/consume.php?ym=2016%02d&uid=%d&product=SAE', $i, $uid);
        $tmp['uid'] = $uid;
        $tmp['month'] = $i;
        $tmp['resource'] = curl_init($tmp['url']);
        curl_setopt_array($tmp['resource'], $opt);
        curl_multi_add_handle($mch, $tmp['resource']);
        $cmd[] = $tmp;
    }
}

//并发执行,直到全部结束
do {
    curl_multi_exec($mch, $active); 
} while ($active > 0);

//获取全部结果
foreach($cmd as $c) {
    $res = curl_multi_getcontent($c['resource']);
    $http_code = curl_getinfo($c['resource'], CURLINFO_HTTP_CODE);
    if ($res === false || $http_code != 200) {
        die(curl_error($c['resource']));
    }
    $res = json_decode($res, true);
    $res['code'] && die($res['message']);
    $ret[$c['uid']][$c['month']] = $res['data'];
}
curl_multi_close($mch);

var_dump($ret);

查询效率对比

1、数据表机构:

CREATE TABLE `consume_201612` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `uid` int(10) unsigned NOT NULL,
  `product` enum('UID','SAE','SC2','SCE','SEM','SCS','SLS') NOT NULL DEFAULT 'SAE',
  `access_key` varchar(255) NOT NULL,
  `service_code` varchar(255) NOT NULL,
  `amount` int(10) unsigned NOT NULL,
  `remark` varchar(255) NOT NULL,
  `data` text NOT NULL,
  `times` int(10) unsigned NOT NULL,
  PRIMARY KEY (`id`),
  KEY `uid_pa` (`uid`,`product`,`access_key`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

2、数据量:总共分12张表,每张表的数量在 500万~1100万 之间,全量数据在1亿以上

3、MySQL异步查询结果:

$ mysql.server restart
$ php p_async.php
Query times: 60
Run time: 2.4453s
(CPU)User time: 0.087966s
(CPU)System time: 0.117625s

4、cURL并发请求查询结果

$ mysql.server restart
$ php p_curl.php
Query times: 60
Run time: 2.173035s
(CPU)User time: 0.40652s
(CPU)System time: 0.869902s

5、普通串行查询结果

$ mysql.server restart
$ php p_sync.php
Query times: 60
Run time: 20.485623s
(CPU)User time: 0.083185s
(CPU)System time: 0.036566s
Memory usage: 304.72kb

并发执行时,我们可以在 MySQL 服务器看到所有的正在执行的SQL:

总结

1、在并发查询下,查询效率提高了近10倍
2、使用 MySQL 异步查询,因为需要给所有查询都创建一个新的连接,而 MySQL 服务端会为每个连接创建一个单独的线程进行处理,如果创建的线程数过多,会给系统造成负担,请谨慎使用
3、使用 cURL 并发请求后端接口时,CPU负载明显上升,所以并发请求后端接口,一定程度上会增加后端压力,这和前端大流量下的高并发原理是一样的
4、使用 cURL 并发请求,还需要考虑一个网络延时的问题,网络延时越小,查询效率提升越明显。如果你是想代替类方法或函数调用,在条件允许的情况,建议直接连接服务器本机即127.0.0.1
5、在并发请求下,因为需要一次性接收全部返回结果,所以会占用更多的内存资源

需要说明的是,在实际应用中 cURL 的并发请求,一般不只单用于数据查询,而是为了完成更多的后台业务逻辑,
所以,在服务器负载能力允许的情况下,推荐使用 cURL 并行转发的形式,提升前端响应速度!

最后说一下 Swoole,

Swoole 是 PHP 的异步、并行、高性能网络通信引擎,使用纯C语言编写,提供了PHP语言的异步多线程服务器,异步TCP/UDP网络客户端,异步MySQL,异步Redis,数据库连接池,AsyncTask,消息队列,毫秒定时器,异步文件读写,异步DNS查询!
其中的异步MySQL,其原理是通过 MYSQLI_ASYNC 模式查询,然后获取 mysql 连接的 socket,加入到 epoll 时间循环中,当数据库返回结果时会回调指定函数。
这个过程是完全异步非阻塞的,不浪费CPU,具体实现方式这里不再详细介绍!

PHP也支持多线程:cURL并发请求

PHP cURL所有函数列表:
https://secure.php.net/manual/zh/ref.curl.php

以下是PHP中cURL多线程相关函数:

  • curl_multi_add_handle — 向curl批处理会话中添加单独的curl句柄
  • curl_multi_close — 关闭一组cURL句柄
  • curl_multi_exec — 运行当前 cURL 句柄的子连接
  • curl_multi_getcontent — 如果设置了CURLOPT_RETURNTRANSFER,则返回获取的输出的文本流
  • curl_multi_info_read — 获取当前解析的cURL的相关传输信息
  • curl_multi_init — 返回一个新cURL批处理句柄
  • curl_multi_remove_handle — 移除curl批处理句柄资源中的某个句柄资源
  • curl_multi_select — 等待所有cURL批处理中的活动连接
  • curl_multi_setopt — 为 cURL 并行处理设置一个选项
  • curl_multi_strerror — Return string describing error code

一般来说,想到要用这些函数时,目的显然应该是要同时请求多个URL,而不是一个一个依次请求,否则不如自己循环去调curl_exec好了。

步骤总结如下:

1、调用 curl_multi_init,初始化一个批处理handle
2、循环调用 curl_multi_add_handle,往1中的批处理handle 添加curl_init来的子handle
3、持续调用 curl_multi_exec,直到所有子handle执行完毕。
4、根据需要循环调用 curl_multi_getcontent 获取结果
5、调用 curl_multi_remove_handle,并为每个字handle调用curl_close
6、调用 curl_multi_close

以下是一个通过并发请求抓取网页的demo

$urls = array(
    'https://979137.com/',
    'http://www.sina.com.cn/',
    'http://www.sohu.com/',
    'http://www.163.com/'
);

//1、初始化一个批处理handle
$mh = curl_multi_init();

//2、往批处理handle 添加curl_init来的子handle
foreach ($urls as $i => $url) {
    $conn[$i] = curl_init($url);
    curl_setopt($conn[$i], CURLOPT_HEADER, 0);   
    curl_setopt($conn[$i], CURLOPT_CONNECTTIMEOUT, 60);
    curl_setopt($conn[$i], CURLOPT_RETURNTRANSFER, true);
    curl_multi_add_handle($mh, $conn[$i]);
}

//3、并发执行,直到全部结束。
do {
    curl_multi_exec($mh, $active);
} while ($active);

//4、获取结果
foreach ($urls as $i => $url) {
    $data = curl_multi_getcontent($conn[$i]);
    echo ($data);
}

//5、移除子handle,并close子handle
foreach ($urls as $i => $url) {
    curl_multi_remove_handle($mh,$conn[$i]);
    curl_close($conn[$i]);
}

//6、关闭批处理handle
curl_multi_close($mh);

PHP,CURL和你的安全!

如果最近你在美国看电视,你会经常看到一个这样的广告:

一个和蔼友善的家伙说“我希望我的电脑被病毒感染”,
“我希望所有我家的照片都被人删除,找不回来。”

“我希望我的笔记本运转的声音听起来像打雷。”

当然,没有一个正常人希望遇到这样的痛苦,但如果你不对自己的电脑采取保护措施,结果就是让黑客得逞。
你需要理解,这就像在你家里,车或钱袋子,你不能让它们都敞着口放在外面,你不能认为陌生路人都是可信的。
大部分的陌生人并不像你想象的那样友好。

如果没有人告诉你应该怎么做,你很容易犯错误。置之不理是愚蠢的,幸好你读了这篇文章。我要首先假设你不是那么愚蠢的人。

不应该做的事情

下面是一个列表,解释了什么不该做,以及为什么。

这是外表美味可口巧克力,里面却藏着恶魔。它的意思是

去 www.webhek.com 网站,取回页面内容,运行这些内容,不论是什么内容。

如果是像下面的这些内容到无所谓:

Hello World

但是,如果你不那么幸运,这个网站被人动过手脚,它的内容被替换成:

Evil ruuLzzzzorz!!!

这句代码会删除你的电脑上的所有东西。

这样会稍微安全一些,因为这句代码的做法是读取远程页面的内容,然后打印它们。即使有人在内容里插入了恶意的PHP代码,这些代码也没有机会被执行。
但是,黑客仍然可以在内容里注入恶意的 JavaScript,你会发现你的页面上突然间被植入了无数的弹出式广告窗口页面。
这会让你的网站的浏览者非常恼怒。这里面有很多的学问,但上面这些是最大的问题。

应该如何做

PHP里面有一个非常强大的函数库,它们的目的就是让你安全的从远程网站上取回内容。
这些函数被称作CURL。现在,你不要被CURL官方页面上大量的东西吓阻,它实际上非常的简单。

下面是一个简单的替换上面read_file()命令的做法:

<?php
$curl_handle=curl_init();
curl_setopt($curl_handle,CURLOPT_URL,'http://www.webhek.com');
curl_exec($curl_handle);
curl_close($curl_handle);

就是这样,这才是你应该做的,最后一句 curl_close() 不是必要的。

小心,你仍然有被远程网站上的恶意 JavaScript 和 cookie 盗取者袭击的风险。
防范这些攻击需要牵涉到更多的内容。如果你想做这些,我建议你使用PHP正则表达式函数里的 preg_replace()。

假设我们确实要用CURL来做一些事情。假设 www.webhek.com 这个网站不是那么稳定。它有时候会没有响应,一个页面需要30秒才能拉取成功。对于这种情况,我们的办法是:

<?php
$curl_handle=curl_init();
curl_setopt($curl_handle,CURLOPT_URL,'http://www.webhek.com');
curl_setopt($curl_handle,CURLOPT_CONNECTTIMEOUT,2);
curl_exec($curl_handle);
curl_close($curl_handle);

这种写法是说,2秒钟内如果不能抓取完数据就做超时处理。
是的,也许你更愿意设定为1秒就算超时,因为它妨碍你的页面的速度。(注意,不要设置为0,这是告诉curl没有超时限制。)

但是,如果是什么东西都没有取回了,你想显示一个提示信息,这该怎么办?哈哈,简单!

<?php
$curl_handle=curl_init();
curl_setopt($curl_handle,CURLOPT_URL,'http://www.webhek.com');
curl_setopt($curl_handle,CURLOPT_CONNECTTIMEOUT,2);
curl_setopt($curl_handle,CURLOPT_RETURNTRANSFER,1);
$buffer = curl_exec($curl_handle);
curl_close($curl_handle);

if (empty($buffer)) {
    print "抱歉,webhek.com 这个网站又无响应了。<p>";
} else {
    print $buffer;
}

微信公众号:程序员到架构师

最新文章

Return Top