PHP使用workerman/redis-queue实现轻量级的消息队列

前言

在做PHP开发过程中,时常会遇到需要处理一些耗时任务的情况:例如发送大量邮件,大量数据库查询,导出数据量大的Excel等。在处理耗时、资源密集型任务时,不少PHP开发人员倾向于设置max_execution_time来延长php执行时间。除了该方法外,也可以通过消息队列来处理。

workerman/redis-queue是一个基于Redis的消息队列,支持消息延迟处理。

项目地址:https://github.com/walkor/redis-queue

安装redis

安装redis-queue

composer require workerman/redis-queue

worker.php

新建worker.php,内容如下,用于接收和处理消息:

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // 订阅
    $client->subscribe('send-email', function($data){
        echo "send email\n";
        var_export($data);
    });
    // 订阅
    $client->subscribe('send-sms', function($data){
        echo "send sms\n";
        var_export($data);
    });
    // 消费失败触发的回调(可选)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "队列 " . $package['queue'] . " 消费失败\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
};

Worker::runAll();

send.php

新建send.php,内容如下,用于发送消息:

<?php
function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'send-email';
$data= ['123456@qq.com', 'hello,world!'];
redis_queue_send($redis, $queue, $data);

测试

启动worker.php监听消息队列

在命令行中执行以下命令:

php worker.php start

php worker.php start -d

访问send.php发送消息

在命令行中执行以下命令:

php send.php

或直接在浏览器里访问http://[你的地址]/send.php

效果

当执行send.php发送消息后,worker.php将会立即接收并处理该消息,截图:

发表评论

邮箱地址不会被公开。 必填项已用*标注