本文总结自慕课网的一个教程:PHP消息队列实现及应用;这个教程非常适合新手刚接触消息队列,我觉得蛮不错。
1. 关于消息队列
1.1 基础模式
一个基础的消息队列应该是这样的:
1.2 应用场景
消息队列的应用场景大概可以是以下几种:
- 冗余;
- 解耦(比如业务系统和队列处理系统一方奔溃不会影响另一方);
- 流量削峰(抢购,秒杀等场景);
- 异步通信;
- 拓展性;
- 排序保证(比如做成单进程单线程单进单出);
1.3 队列介质
实现消息队列的方式有很多,这里介绍三种模式的实现:
Mysql:可靠性高、易实现,速度慢
Redis:速度快,单条大消息包时效率低
消息系统:专业性强,可靠,学习成本高(RabbitMQ)
1.4 消息处理触发机制
死循环方式读取:易实现,故障时无法及时恢复(适合比如秒杀系统)
定时任务:压分均分,有处理上限(要控制好进程,防止上一个任务还没完成就开始了下一个)
守护进程:类似于PHP-FPM和PHP-CG,需要shell基础
2. 案例
2.1 解耦案例:处理订单系统和配送系统(Mysql实现)
2.1.1 案例架构
订单系统和配送系统是解耦的,通过MySQL队列表做队列:
2.1.2 案例流程
程序的大概流程:
2.1.3 开发流程
1)创建一个示例MySQL队列表:
CREATE TABLE `order_queue`(
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT `id号`,
`order_id` int(11) NOT NULL,
`mobile` varchar(20) NOT NULL COMMENT `用户的手机号`,
`address` varchar(100) NOT NULL COMMENT `用户的地址`,
`created_at` datetime NOT NULL DEFAULT `0000-00-00 00:00:00` COMMENT `订单创建时间`,
`updated_at` datetime NOT NULL DEFAULT `0000-00-00 00:00:00` COMMENT `处理完成时间`,
`status` tinyint(2) NOT NULL COMMENT `当前状态, 0未处理, 1已处理, 2处理中`,
PRIMARY KEY(`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
2)接受用户订单order.php:
<?php
include '../include/db.php'; //引入一个DB类用来操作数据库
if (!empty($_GET['mobile'])) {
// 订单处理流程
// ......
// 过滤从用户获取的数据
$order_id = rand (10000, 99999); // 示例生成订单
// 要插入的数据
$insert_data = array(
'order_id' => $order_id,
'mobile' => $_GET['mobile'],
'created_time' => date('Y-m-d H:i:s', time()),
'status' => 0
);
// 插入数据
$db = DB::getIntance();
$res = $db -> insert('order_queue', $insert_data);
if ($res) {
echo $insert_data['order_id']."保存成功";
} else {
echo '保存失败';
}
}
3)配送处理goods.php
<?php
// 配送系统处理队列中的订单并进行标记的一个文件
include '../include/db.php'; //引入一个DB类用来操作数据库
$db = DB::getIntance();
// 1. 先把要处理的记录更新为等待处理; 这一步是为了实现一个锁的机制,防止其它程序操作数据冲突
$waiting = array('status' => 0);
$lock = array('status' => 2);
$res_lock = $db->update('order_queue', $lock, $waiting, 2);
// 2. 选择出刚刚更新的这些数据, 然后进行配送系统的处理
if ($res_lock) {
$res = $db->selectAll('order_queue', $lock);
// 然后由配送系统进行处理
// ......
// 3. 把这些处理过的程序更新为已完成
$success = array(
'status' => 1,
'update_time' => date('Y-m-d H:i:s'),
);
$res_last = $db->update('order_queue', $success, $lock);
if ($res_last) {
echo 'success:'.$res_last;
} else {
echo 'Fail:'.$res_last;
}
} else {
echo 'All Finished!';
}
4)定时脚本good.sh
#!/bin/bash
date "+%G-%m-%d %H:$M:S"
cd /home/path/to/queue_demo/
php goods.php
5)设置corntab定时任务
设定为每一分钟执行goods.sh一次并记录到log.log中:*/1 * * * * /home/path/to/queue_demo/goods.sh >> /home/path/to/queue_demo/log.log 2>&1
创建log文件:touch /home/path/to/queue_demo/log.log
6)运行,测试
调用order.php接收用户的订单信息;查看mysql表中是否插入数据;
定时任务已执行,查看数据status是否改变;
进行测试时,实时查看log文件:tail -f log.log
2.2 流量削峰案例:通过 Redis 的 List 类型实现秒杀
2.2.1 了解 Redis 的 list 类型数据
Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。一个列表最多可以包含 4294967295 个元素 (每个列表超过40亿个元素)。
常用命令:
- LPUSH/LPUSHX:将值插入到(/存在的)列表头部
- RPUSH/RPUSHX:将值插入到(/存在的)列表尾部
- LPOP:移出并获取列表的第一个元素
- RPOP:移出并获取列表的最后一个元素
- LTRIM:保留指定区间内的元素
- LLEN:获取列表长度
- LSET:通过索引设置列表元素的值
- LINDEX:通过索引获取列表中的元素
- LRANGE:获取列表指定范围的元素
2.2.2 案例架构
2.2.3 代码设计
- 秒杀程序把请求写入Redis。(Uid,time_stamp)
- 检查Redis已存放数据的长度,超出上限直接丢弃。(比如秒杀限制为100个,超过100个的数据直接丢弃返回秒杀已结束)
- 死循环处理存入Redis的数据库并入库。
2.2.4 开发流程
1)创建一个示例MySQL秒杀表:
CREATE TABLE `redis_queue`(
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`uid` int(11) NOT NULL DEFAULT `0`,
`time_stamp` varchar(24) NOT NULL,
PRIMARY KEY(`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
2)接收用户请求的user.php
<?php
// 加载redis组件
$redis = new Redis();
$redis -> connect('127.0.0.1', 6379);
$redis_name = 'miaosha';
// 接收用户的id
$uid = $_GET['uid'];
// 获取一下redis里面已有的数量
$num = $redis->lLen($redis_name);
// 如果当天人数少于10的时候,则加入这个队列
if ($num < 10) {
$redis->rPush($redis_name, $uid.'%'.microtime());
echo $uid.'秒杀成功';
} else{
// 如果当天人数已经达到了10个人,则返回秒杀已完成
echo '秒杀已结束';
}
$redis->close();
3)处理队列的入库程序
<?php
include '../include/db.php';
// 加载redis组件
$redis = new Redis();
$redis -> connect('127.0.0.1', 6379);
$redis_name = 'miaosha';
$db= DB::getIntance();
// 死循环
while (1) {
// 从队列最左取出一个值来
$user = $redis->lPop($redis_name);
// 然后判断这个值是否存在
if (!$user || $user=='nil') {
sleep(2);
continue;
}
// 切割出时间
$user_arr = explode('%', $user);
$insert_data = array(
'uid' => $user_arr[0],
'time_stamp' => $user_arr[1],
);
// 保存到数据库中
$res = $db->insert('redis_queue', $insert_data);
// 数据库插入失败的时候的回滚机制
if (!$res) {
$redis->rPush($redis_name, $user);
}
sleep(2);
}
//释放redis
$redis -> close();
3. 其它消息系统
3.1 RabbitMQ
3.1.1 关于RabbitMQ
官网:RabbitMQ
文档:RabbitMQ Documentation
3.1.2 RabbitMQ架构和原理
RabbitMQ完整的实现了AMQP、集群简化、持久化、跨平台。
3.1.3 RabbitMQ使用
- 安装RabbitMQ(rabbitmq-server、php-amqplib)
- 生产者向消息通道发送消息
- 消费者处理消息
0 条评论
来做第一个留言的人吧!