tp5.1基于Database的think-queue的基本用法

1 步骤1:创建测试表

首先,在数据库中创建如下的两张表,tp51_test表用于保存要处理的数据,tp51_jobs表用于保存信息队列信息,tp51_jobs表各个字段名不能更改,也不能删除,tp51_test表则可以修改字段名或删除字段名。

// 测试表
CREATE TABLE `tp51_test` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID序号',
`order_no` varchar(50) NOT NULL COMMENT '订单号',
`msg` varchar(255) NOT NULL COMMENT '消息内容',
`status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '状态 1未执行,2执行',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='测试表';

// 消息队列表
CREATE TABLE `tp51_jobs` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID序号',
`queue` varchar(255) NOT NULL COMMENT '队列名称',
`payload` longtext NOT NULL COMMENT '存储任务数据,可以是任何格式,如JSON或序列化的字符串',
`attempts` tinyint(3) unsigned NOT NULL COMMENT '尝试次数,如果任务失败,则此值会增加',
`reserved` tinyint(3) unsigned NOT NULL COMMENT '是否已被消费(即正在处理),0 表示未被消费,非0表示已被消费',
`reserved_at` int(10) unsigned DEFAULT NULL COMMENT '记录任务被取出的时间戳',
`available_at` int(10) unsigned NOT NULL COMMENT '记录任务可被执行的时间戳',
`created_at` int(10) unsigned NOT NULL COMMENT '记录任务创建的时间戳',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


2 步骤2:安装消息队列

在tp5.1项目的根目录下,执行如下的命令,用于安装think-queue信息队列插件,注意版本

tp50框架: composer require topthink/think-queue:2.*
tp51框架: composer require topthink/think-queue:2.*
tp6*框架:composer require topthink/think-queue:3.*


3 步骤3:配置信息队列

安装队列插件之后,在tp5.1的【根目录/config】目录下,会自动生成queue.php队列文件,接着,需要对该文件,进行如下的参数配置:

return [
// 'connector' => 'Sync'
'connector' => 'Database', // 数据库驱动
'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
'default' => 'default', // 默认的队列名称
'table' => 'jobs', // 存储消息的表名,不带前缀
'dsn' => [],

// 'connector' => 'Redis', // Redis 驱动
// 'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
// 'default' => 'default', // 默认的队列名称
// 'host' => '127.0.0.1', // redis 主机ip
// 'port' => 6379, // redis 端口
// 'password' => '', // redis 密码
// 'select' => 0, // 使用哪一个 db,默认为 db0
// 'timeout' => 0, // redis连接的超时时间
// 'persistent' => false, // 是否是长连接
];


4 步骤4:创建且推送消息

接着,在控制器方法中,进行消息的创建与推送

<?php
namespace app\index\controller;
use think\Controller;
use think\Queue;
use think\Db;

class Index extends Controller
{
// 消息的创建与推送
public function queueTest()
{
// 消息的创建
$data = [
'order_no' => rand(100000,999999)
];
$this->add($data['order_no']);

// 消息的推送
$cname = 'app\job\Job1';
$data = json_encode($data);
$qname = 'firstQueue';
// 将该任务推送到消息队列,等待对应的消费者去执行
// 参数1:表示当前任务将由哪个类来负责处理,当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
// 参数2:当前任务所需的业务数据 . 不能为resource类型,其他类型最终将转化为json形式的字符串
// 参数3:当前任务归属的队列名称,如果为新队列,会自动创建
$res = Queue::push($cname, $data, $qname);

// 结果处理:database 驱动时,返回值为 1|false; redis驱动时,返回值为 随机字符串|false
if ($res !== false) {
echo '已成功将任务推送到信息队列中';
} else {
echo '推送失败';
}
}

// 把业务数据添加到数据库
public function add($orderNo)
{
Db::name('test')->insert([
'order_no' => $orderNo,
'msg' => $orderNo,
'status' => 1,
'create_time' => date('Y-m-d H:i:s'),
'update_time' => date('Y-m-d H:i:s')
]);
}
}


5 步骤5:创建队列处理类

接着,定义消息队列处理类

<?php

namespace app\job;
use think\Controller;
use think\Db;
use think\queue\Job;

class Job1 extends Controller
{
// fire方法是消息队列默认调用的方法
// 参数1:当前的任务对象
// 参数2:发布任务时自定义的数据
public function fire(Job $job, $data)
{
// 步骤1:先把数据解析为数组
$data = json_decode($data, true);

// 步骤2:判断当前任务是否已经被执行,因为有些消息在到达消费者时,可能已经不再需要执行了
$isJobStillNeedToBeDone = $this->checkDoJobStatus($data);
if (!$isJobStillNeedToBeDone) {
// 如果该任务已经被执行, 则删除任务
$job->delete();
}

// 步骤3:如果该任务没有被执行,则执行任务,即进行实际的业务处理
$isJobDone = $this->doJob($data);
if ($isJobDone) {
// 如果任务执行成功, 记得删除任务
$job->delete();
} else {
// 通过这个方法可以检查这个任务已经重试了几次了,例如任务轮询4次后删除
if ($job->attempts() > 3) {
// 第1种处理方式:重新发布这个任务,例如如下表示该任务延迟10秒后再执行
// $job->release(10);
// 第2种处理方式:删除任务
$job->delete();
}
}
}

// 有些消息在到达消费者时,可能已经不再需要执行了
// 参数:发布任务时自定义的数据
// 返回:任务执行的结果,已成功执行返回true, 否则返回false
public function checkDoJobStatus($data)
{
$status = Db::name('test')->where('order_no', $data['order_no']) -> value('status');
if ($status == 2) {
return true;
} else {
return false;
}
}


// 根据消息中的数据进行实际的业务处理
// 参数:发布任务时自定义的数据
// 返回:任务执行的结果
public function doJob($data)
{
$res = Db::name('test')->where('order_no', $data['order_no'])->update(['status' => 2]);
if (!empty($res)) {
return true;
} else {
return false;
}
}
}


6 步骤6:发布任务

在浏览器中访问:http://域名/index/index/queueTest,用于创建消息,并将信息推送到队列中。


此时,查看数据库的那两个表,看看数据:




7 步骤7:处理任务

处理任务(即消息的消费与删除),需要在项目的根目录下,执行如下的命令:

# firstQueue为自定义的队列名称
php think queue:work --queue firstQueue


8 步骤8:查看数据表




9 步骤9:结束

到此,消息队列演示完成,另外消息队列的常用命令如下:

(1) 默认队列

php think queue:listen                 // 监听 开发环境用(常用)
php think queue:work // 只执行一次
nohup php think queue:work --daemon & // 守护进程,多次执行(常用)


(2) 指定队列

php think queue:listen --queue 队列名称                  // 监听,开发环境用,一次性执行指定队列的所有任务(常用)
php think queue:work --queue 队列名称 // 只执行一次,执行指定队列的第一个任务
nohup php think queue:work --queue 队列名称 --daemon & // 守护进程,多次执行, 一次性执行指定队列的所有任务(常用)


消息队列的参考资料1

消息队列的参考资料2

消息队列的参考资料3


步骤10:常驻进程即supervisor安装(可选)

安装Supervisor作用:可以使think-queue队列,在后台以守护进程的方式一直运行

步骤1:安装Supervisor
yum install -y supervisor

步骤2:查看Supervisor安装位置,supervisor安装完成后,会在/usr/bin下生成三个执行程序:supervisortd、supervisorctl、echo_supervisord_conf,分别是supervisor的守护进程服务(用于接收进程管理命令)、客户端(用于和守护进程通信,发送管理进程的指令)、生成初始配置文件程序
whereis supervisord
whereis echo_supervisord_conf
whereis supervisorctl

步骤3:修改配置文件,【vi /etc/supervisord.conf】,修改内容如下:
[include]
files = supervisord.d/*.ini

步骤4:自定义待守护进程配置文件,在 /etc/supervisord.d 下创建以.ini 后缀的文件,编辑如下配置:
[program:pdfQueue] ;程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作
command=php /www/wwwroot/templet.tik-im.com/think queue:work --daemon --queue pdfQueue
autostart=true ;在 supervisord 启动的时候也自动启动
autorestart=true ; 程序异常退出后自动重启
user=root ;用哪个用户启动
redirect_stderr=true ;把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes=20MB ;stdout 日志文件大小,默认 50MB
stdout_logfile_backups=20 ;stdout 日志文件备份数
stderr_logfile=/www/wwwroot/templet.tik-im.com/worker_err.log ; 错误日志文件
stdout_logfile=/www/wwwroot/templet.tik-im.com/worker.log ;输出日志文件

具体配置如下:
[program:pdfQueue]
command=php /www/wwwroot/web_management/think queue:work --daemon --queue pushQueue
autostart=true
autorestart=true
user=root
redirect_stderr=true
stdout_logfile_maxbytes=20MB
stdout_logfile_backups=20
stderr_logfile=/www/wwwroot/web_management/runtime/worker_err.log
stdout_logfile=/www/wwwroot/web_management/runtime/worker.log


步骤5:Surpervisor的启动
# supervisord二进制启动
supervisord -c /etc/supervisord.conf
# 检查进程
ps aux | grep supervisord
# 更新Supervisor
supervisorctl update
# 关闭Supervisor
supervisorctl shutdown


相关推荐

  • 获取指定目录下的所有图片信息

    1 获取指定目录下的所有图片信息// 获取指定目录下的所有图片信息 public function getImagesInfo($directory) { $images = []; // 创建递归目录迭代器 $iterator = new \RecursiveIteratorIterator( new \RecursiveDirectoryIterator($directory, \RecursiveDirectoryIterator::SKIP_DOTS), \RecursiveIteratorIterator::LEAVES_ONLY ); // 遍历目录中的每个文件 foreach (

  • Thinkphp各版本的PHP要求

    ThinkPHP 8.0:运行环境要求PHP8.0+,兼容PHP8.3ThinkPHP 6.1:运行环境要求PHP7.2+,兼容PHP8.1ThinkPHP 6.0:运行环境要求PHP7.2+,兼容PHP8.1ThinkPHP 5.1:运行环境要求PHP5.6+,兼容PHP8.0ThinkPHP 5.0:运行环境要求PHP5.4+,兼容PHP7.3

  • Thinkphp5.1路径常量

    1 配置文件位置根目录/config/template.php2 配置文件内容&lt;?php// +----------------------------------------------------------------------// | ThinkPHP [ WE CAN DO IT JUST THINK ]// +----------------------------------------------------------------------// | Copyright (c) 2006~2018 http://thinkphp.cn All rights reser

  • Thinkphp5.0路径常量

    1 配置文件位置根目录/application/模块名/config.php2 配置文件内容&lt;?php//配置文件return [ // 后台视图输出字符串内容替换 'view_replace_str' =&gt; [ '__PUBLIC__' =&gt; '/', '__STATIC__' =&gt; '/static', '__CONSOLE__' =&gt; '/static/console', '__CONSOLE_CSS__' =&gt; '/static/console/css', '__CONSOLE_IMAGES__' =&gt; '/static/console/ima

  • wp站点防止别人进行DDOS攻击

    1 简介wp站点防止别人进行DDOS攻击。2 配置位置位置:根目录/wp-config.php3 配置内容在【根目录/wp-config.php】文件的开头添加如下代码:if(strpos($_SERVER['REQUEST_URI'], 'xmlrpc.php') !== false){ $protocol = $_SERVER['SERVER_PROTOCOL'] ?? ''; if(!in_array($protocol, ['HTTP/1.1', 'HTTP/2', 'HTTP/2.0', 'HTTP/3'], true)){ $protocol = 'HTTP/1.0'; } hea

  • 只读属性

    1 只读属性简介只读属性的声明方式类似于普通属性,但需要使用 readonly 关键字。2 只读属性例子class Point { public readonly float $x; public readonly float $y; public function __construct(float $x, float $y) { $this-&gt;x = $x; $this-&gt;y = $y; }}$point = new Point(3.5, 2.8);echo $point-&gt;x; // 输出: 3.5echo $point-&gt;y; // 输出: 2.8// 下面的尝