消息队列类1

<?php

namespace redisqueue;

// 使用原生Redis来设置消息队列
class RedisQueue
{
// Redis 客户端实例
private $redis;
// Redis 服务器地址和端口
private $host;
private $port;
// Redis 密码
private $password;

// 构造方法,初始化 Redis 连接
public function __construct($host, $port = 6379, $password = '')
{
$this->redis = new \Redis();
$this->host = $host;
$this->port = $port;
$this->password = $password;
$this->redis->connect($this->host, $this->port);
$this->redis->auth($this->password);
}

/**
* 生产者(生产一个数据)
* @param string $queue_name 消息队列的名称
* @param array $message 要处理的消息,数组格式
* @return bool 生产成功返回true,否则返回false
*/
public function producerOne($queue_name, $message)
{
try {
// 使用 lPush 将消息推送到队列的头部
$this->redis->lPush($queue_name, json_encode($message, JSON_UNESCAPED_UNICODE));
return true;
} catch (\Exception $e) {
// 如果捕获到异常,我们记录错误并返回 false
// 在实际应用中,您可能希望记录更详细的错误信息
// error_log("生产消息时发生错误: " . $e->getMessage());
return false;
}
}

/**
* 生产者(生产多个数据)
* @param string $queue_name 消息队列的名称
* @param array $messages 要处理的消息数组(二维数组)
* @return bool 生产成功返回true,否则返回false
*/
public function producerMore($queue_name, $messages)
{
try {
foreach ($messages as $message) {
// 使用 lPush 将消息推送到队列的头部
$this->redis->lPush($queue_name, json_encode($message, JSON_UNESCAPED_UNICODE));
}
// 如果没有抛出异常,我们假设所有消息都被成功发送
return true;
} catch (\Exception $e) {
// 如果捕获到异常,我们记录错误并返回 false
// 在实际应用中,您可能希望记录更详细的错误信息
// error_log("生产消息时发生错误: " . $e->getMessage());
return false;
}
}

/**
* 查看队列内容
* @param string $queue_name 消息队列的名称
* @return array 消息数组
*/
public function viewQueue($queue_name)
{
$data = $this->redis->lRange($queue_name, 0, -1);
$new = array();
foreach ($data as $item) {
// 把json字符串转为数组
$new[] = json_decode($item, true);
}
return $new;
}

/**
* 消费者(在实际应用中,需要将消费者作为一个独立的守护进程或后台任务来运行)
* @param string $queue_name 消息队列的名称
* @return
*/
public function consumer($queue_name)
{
while (true) {
$message = $this->redis->blPop($queue_name, 0);
if ($message) {
// 解包消息数组(队列名称和消息内容)
list($queue, $msg) = $message;
echo "Received message: {$msg}\n";
// 要处理的消息内容,使用睡眠来模拟处理过程
sleep(2);
}
}
}
}


// 使用条件:1.阿里云安全组规则开放6379端口、2.宝塔->安全->开放6379端口、3.宝塔的
// 消费原理:先进先出,后进后出
// 使用步骤:
// 1.生产者
// 1.1 生产单个信息
// $redisQueue = new RedisQueue('xxx.xxx.xxx.xxx', 6379, '密码');
// $bool = $redisQueue->producerOne('test_queue', ['id' => 1, 'name' => 'test', 'sex' => 1]);

// 1.2 生产多个信息
// $redisQueue = new RedisQueue('xxx.xxx.xxx.xxx', 6379, '密码');
// $messages = [
// ['id' => 1, 'name' => '产业化', 'sex' => 1],
// ['id' => 2, 'name' => '产业化2', 'sex' => 0]
// ];
// $bool = $redisQueue->producerMore('test_queue', $messages);


// 2.消费者
// $redisQueue = new RedisQueue('xxx.xxx.xxx.xxx', 6379, '密码');
// 启动消费者(通常会在另一个脚本或进程中运行),由于 consumer 方法是一个无限循环,下面的代码将不会停止执行,除非您手动中断它。
// 在实际应用中,您可能会将消费者作为一个独立的守护进程或后台任务来运行。
// $redisQueue->consumer('test_queue');


// 3.查看队列内容
// $redisQueue = new RedisQueue('xxx.xxx.xxx.xxx', 6379, '密码');
// $queueContents = $redisQueue->viewQueue('test_queue');
// foreach ($queueContents as $message) {
// dump($message);
// }


相关推荐

  • 生成图片

    from PIL import Image, ImageColor, ImageDraw, ImageFont, ImageFilterdef create_image_with_text(size, color, text, font_path, font_size, text_color, shadow_color, output_path): """ Create a new image of specified size and color with centered text that has a border and shadow. :param size: A tuple con

  • 获取指定目录下的所有图片信息

    1 获取指定目录下的所有图片信息// 获取指定目录下的所有图片信息 public function getImagesInfo($directory) { $images = []; // 创建递归目录迭代器 $iterator = new \RecursiveIteratorIterator( new \RecursiveDirectoryIterator($directory, \RecursiveDirectoryIterator::SKIP_DOTS), \RecursiveIteratorIterator::LEAVES_ONLY ); // 遍历目录中的每个文件 foreach (

  • Thinkphp各版本的PHP要求

    ThinkPHP 8.0:运行环境要求PHP8.0+,兼容PHP8.3ThinkPHP 6.1:运行环境要求PHP7.2+,兼容PHP8.1ThinkPHP 6.0:运行环境要求PHP7.2+,兼容PHP8.1ThinkPHP 5.1:运行环境要求PHP5.6+,兼容PHP8.0ThinkPHP 5.0:运行环境要求PHP5.4+,兼容PHP7.3