竹磬网-邵珠庆の日记 生命只有一次,你可以用它来做些更多伟大的事情–Make the world a little better and easier


163月/160

PHP高级编程之消息队列

发布在 邵珠庆

1. 什么是消息队列

消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式

2. 为什么使用消息队列

消息队列技术是分布式应用间交换信息的一种技术。消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读出。通过消息队列,应用程序可独立地执行,它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。

3. 什么场合使用消息队列

你首先需要弄清楚,消息队列与远程过程调用的区别,在很多读者咨询我的时候,我发现他们需要的是RPC(远程过程调用),而不是消息队列。

消息队列有同步或异步实现方式,通常我们采用异步方式使用消息队列,远程过程调用多采用同步方式。

MQ与RPC有什么不同? MQ通常传递无规则协议,这个协议由用户定义并且实现存储转发;而RPC通常是专用协议,调用过程返回结果。

4. 什么时候使用消息队列

同步需求,远程过程调用(PRC)更适合你。

异步需求,消息队列更适合你。

目前很多消息队列软件同时支持RPC功能,很多RPC系统也能异步调用。

消息队列用来实现下列需求
  1. 存储转发
  2. 分布式事务
  3. 发布订阅
  4. 基于内容的路由
  5. 点对点连接

5. 谁负责处理消息队列

通常的做法,如果小的项目团队可以有一个人实现,包括消息的推送,接收处理。如果大型团队,通常是定义好消息协议,然后各自开发各自的部分,例如一个团队负责写推送协议部分,另一个团队负责写接收与处理部分。

那么为什么我们不讲消息队列框架化呢?

框架化有几个好处:
  1. 开发者不用学习消息队列接口
  2. 开发者不需要关心消息推送与接收
  3. 开发者通过统一的API推送消息
  4. 开发者的重点是实现业务逻辑功能

6. 怎么实现消息队列框架

下面是作者开发的一个SOA框架,该框架提供了三种接口,分别是SOAP,RESTful,AMQP(RabbitMQ),理解了该框架思想,你很容易进一步扩展,例如增加XML-RPC, ZeroMQ等等支持。

https://github.com/netkiller/SOA

本文只讲消息队列框架部分。

6.1. 守护进程

消息队列框架是本地应用程序(命令行程序),我们为了让他在后台运行,需要实现守护进程。

每个实例处理一组队列,实例化需要提供三个参数,$queueName = '队列名', $exchangeName = '交换名', $routeKey = '路由'

$daemon = new \framework\RabbitDaemon($queueName = 'email', $exchangeName = 'email', $routeKey = 'email');
			

守护进程需要使用root用户运行,运行后会切换到普通用户,同时创建进程ID文件,以便进程停止的时候使用。

守护进程核心代码https://github.com/netkiller/SOA/blob/master/system/rabbitdaemon.class.php

6.2. 消息队列协议

消息协议是一个数组,将数组序列化或者转为JSON推送到消息队列服务器,这里使用json格式的协议。

$msg = array(
	'Namespace'=>'namespace',
	"Class"=>"Email",
	"Method"=>"smtp",
	"Param" => array(
		$mail, $subject, $message, null
	)
);			
			

序列化后的协议

{"Namespace":"single","Class":"Email","Method":"smtp","Param":["netkiller@msn.com","Hello"," TestHelloWorld",null]}			
			

使用json格式是考虑到通用性,这样推送端可以使用任何语言。如果不考虑兼容,建议使用二进制序列化,例如msgpack效率更好。

6.3. 消息队列处理

消息队列处理核心代码

https://github.com/netkiller/SOA/blob/master/system/rabbitmq.class.php

所以消息的处理在下面一段代码中进行

$this->queue->consume(function($envelope, $queue) {

	$speed = microtime(true);
	
	$msg = $envelope->getBody();
	$result = $this->loader($msg);
	$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答

	//$this->logging->info(''.$msg.' '.$result)
	$this->logging->debug('Protocol: '.$msg.' ');
	$this->logging->debug('Result: '. $result.' ');
	$this->logging->debug('Time: '. (microtime(true) - $speed) .'');
});
			

public function loader($msg = null) 负责拆解协议,然后载入对应的类文件,传递参数,运行方法,反馈结果。

Time 可以输出程序运行所花费的时间,对于后期优化十分有用。

提示

loader() 可以进一步优化,使用多线程每次调用loader将任务提交到线程池中,这样便可以多线程处理消息队列。

6.4. 测试

测试代码 https://github.com/netkiller/SOA/blob/master/test/queue/email.php

73月/160

PHP的CURL方法curl_setopt()函数案例介绍(抓取网页,POST数据)

发布在 邵珠庆

     通过curl_setopt()函数可以方便快捷的抓取网页(采集很方便大笑),curl_setopt 是php的一个扩展库

     使用条件:需要在php.ini 中配置开启。(PHP 4 >= 4.0.2)
       //取消下面的注释
       extension=php_curl.dll

      在Linux下面,需要重新编译PHP了,编译时,你需要打开编译参数——在configure命令上加上“–with-curl” 参数。

1、 一个抓取网页的简单案例:

[php][/php] view plain copy
  1. // 创建一个新cURL资源  
  2. $ch = curl_init();  
  3.   
  4. // 设置URL和相应的选项  
  5. curl_setopt($ch, CURLOPT_URL, "http://www.baidu.com/");  
  6. curl_setopt($ch, CURLOPT_HEADER, false);  
  7.   
  8. // 抓取URL并把它传递给浏览器  
  9. curl_exec($ch);  
  10.   
  11. //关闭cURL资源,并且释放系统资源  
  12. curl_close($ch);  

2、POST数据案例:

[php][/php] view plain copy
  1. // 创建一个新cURL资源  
  2. $ch = curl_init();  
  3. $data = 'phone='. urlencode($phone);  
  4. // 设置URL和相应的选项  
  5. curl_setopt($ch, CURLOPT_URL, "http://www.post.com/");  
  6. curl_setopt($ch, CURLOPT_POST, 1);  
  7. curl_setopt($ch, CURLOPT_POSTFIELDS, $data);  
  8. // 抓取URL并把它传递给浏览器  
  9. curl_exec($ch);  
  10.   
  11. //关闭cURL资源,并且释放系统资源  
  12. curl_close($ch);  

3、关于SSL和Cookie

关于SSL也就是HTTPS协议,你只需要把CURLOPT_URL连接中的http://变成https://就可以了。当然,还有一个参数叫CURLOPT_SSL_VERIFYHOST可以设置为验证站点。
关于Cookie,你需要了解下面三个参数:
CURLOPT_COOKIE,在当面的会话中设置一个cookie
CURLOPT_COOKIEJAR,当会话结束的时候保存一个Cookie
CURLOPT_COOKIEFILE,Cookie的文件。

PS:新浪微博登陆API部分截取(部分我增加了点注释,全当参数翻译下。哈哈) 有兴趣的自己研究,自己挪为己用。嘿嘿

[php][/php] view plain copy
  1. /** 
  2.      * Make an HTTP request 
  3.      * 
  4.      * @return string API results 
  5.      * @ignore 
  6.      */  
  7.     function http($url, $method, $postfields = NULL, $headers = array()) {  
  8.         $this->http_info = array();  
  9.         $ci = curl_init();  
  10.         /* Curl settings */  
  11.         curl_setopt($ci, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);//让cURL自己判断使用哪个版本  
  12.         curl_setopt($ci, CURLOPT_USERAGENT, $this->useragent);//在HTTP请求中包含一个"User-Agent: "头的字符串。  
  13.         curl_setopt($ci, CURLOPT_CONNECTTIMEOUT, $this->connecttimeout);//在发起连接前等待的时间,如果设置为0,则无限等待  
  14.         curl_setopt($ci, CURLOPT_TIMEOUT, $this->timeout);//设置cURL允许执行的最长秒数  
  15.         curl_setopt($ci, CURLOPT_RETURNTRANSFER, TRUE);//返回原生的(Raw)输出  
  16.         curl_setopt($ci, CURLOPT_ENCODING, "");//HTTP请求头中"Accept-Encoding: "的值。支持的编码有"identity","deflate"和"gzip"。如果为空字符串"",请求头会发送所有支持的编码类型。  
  17.         curl_setopt($ci, CURLOPT_SSL_VERIFYPEER, $this->ssl_verifypeer);//禁用后cURL将终止从服务端进行验证  
  18.         curl_setopt($ci, CURLOPT_HEADERFUNCTION, array($this, 'getHeader'));//第一个是cURL的资源句柄,第二个是输出的header数据  
  19.         curl_setopt($ci, CURLOPT_HEADER, FALSE);//启用时会将头文件的信息作为数据流输出  
  20.   
  21.         switch ($method) {  
  22.             case 'POST':  
  23.                 curl_setopt($ci, CURLOPT_POST, TRUE);  
  24.                 if (!empty($postfields)) {  
  25.                     curl_setopt($ci, CURLOPT_POSTFIELDS, $postfields);  
  26.                     $this->postdata = $postfields;  
  27.                 }  
  28.                 break;  
  29.             case 'DELETE':  
  30.                 curl_setopt($ci, CURLOPT_CUSTOMREQUEST, 'DELETE');  
  31.                 if (!empty($postfields)) {  
  32.                     $url = "{$url}?{$postfields}";  
  33.                 }  
  34.         }  
  35.   
  36.         if ( isset($this->access_token) && $this->access_token )  
  37.             $headers[] = "Authorization: OAuth2 ".$this->access_token;  
  38.   
  39.         $headers[] = "API-RemoteIP: " . $_SERVER['REMOTE_ADDR'];  
  40.         curl_setopt($ci, CURLOPT_URL, $url );  
  41.         curl_setopt($ci, CURLOPT_HTTPHEADER, $headers );  
  42.         curl_setopt($ci, CURLINFO_HEADER_OUT, TRUE );  
  43.   
  44.         $response = curl_exec($ci);  
  45.         $this->http_code = curl_getinfo($ci, CURLINFO_HTTP_CODE);  
  46.         $this->http_info = array_merge($this->http_info, curl_getinfo($ci));  
  47.         $this->url = $url;  
  48.   
  49.         if ($this->debug) {  
  50.             echo "=====post data======\r\n";  
  51.             var_dump($postfields);  
  52.   
  53.             echo '=====info====='."\r\n";  
  54.             print_r( curl_getinfo($ci) );  
  55.   
  56.             echo '=====$response====='."\r\n";  
  57.             print_r( $response );  
  58.         }  
  59.         curl_close ($ci);  
  60.         return $response;  
  61.     }  

更详细的参数说明参考:http://cn2.php.net/curl_setopt