Skip to content

components mq

tolizhan edited this page May 1, 2025 · 15 revisions

消息队列

使用说明

  1. 支持消息的增删改重试机制及键区分
  2. 每条消息可同步复制多个队列
  3. 访问 框架路径/?c=of_base_com_mq 进入控制台监控或热重启操作
  4. 异常的消息会转储到磁盘并在自动恢复, 异常恢复前控制台可查看
  5. 通过配置 _of.com.mq 来设置连接池
  6. 通过配置 _of.com.mq.连接池.queues 来设置队列
  7. 通过配置 _of.com.mq.连接池.adapter 来使用不同消息队列
  8. 通过配置 _of.com.mq.连接池.moveMq 无缝切换消息队列
  9. 连接池配置结构(_of.com.mq) : {
        消息队列池名 : {
            "adapter" : 适配器,
            "params"  : 调度参数 {
            },
            "moveMq"  :o迁移队列时设置, 原队列池P改名oP, 配置新队列池P.moveMq = oP, 直到oP全部消费后移除
            "bindDb"  : 事务数据库连接池名, 跟其提交或回滚,
            "queues"  : 生产消息时会同时发给队列, 队列的磁盘配置文件路径
        }, ...
    }
    

    连接池配置结构(_of.com.mq.连接池.queues) : {     队列名 : {         "mode"   : 队列模式, null=生产及消费,false=仅生产,true=仅消费,         "check"  : 自动重载消息队列触发函数,             true=(默认)校验"消费回调"加载的文件变动,             false=仅校验队列配置文件变动,             字符串=以"@"开头的正则忽略路径(软链接使用真实路径), 如: "@/ctrl/@i"         "memory" : 单个并发分配内存积累过高后自动重置, 单位M, 默认50, 0=不限制         "keys"   : 消费消息时回调结构 {             消息键 : 不存在的键将被抛弃 {                 "lots" : 批量消费, 1=单条消费, >1=一次消费最多数量(消息变成一维数组)                 "cNum" : 并发数量, 数字=每台分布系统相同, 数组=指定不同分布系统并发数 {                     数字键(分布系统自动排序从0开始小于等于键时生效) : 生效的并发数, 例如:                     3(首先0-3四台分布系统生效) : 1(这四台分布系统启动一个并发),                     6(接着4-6三台分布系统生效) : 0(这三台分布系统不启动),                     7(最后仅7这台分布系统生效) : 2(这台分布系统启动两并发, 之后都不启动)                 }                 "call" : 回调结构, 返回值决定消息的处理方式                     返回 true=成功删除消息                     返回 false=失败稍后重试                     返回 数字=五年内秒数为xx后秒重试, 其它为指定时间戳重试                     返回 其它=抛出错误稍后重试                     返回 接口响应结构="code" < 400为true, 反之为false             }, ...         }     }, ... } 或当多个队列池指定到同个文件时, 可按如下方式配置 {     队列池 : {         队列名 : {             "mode"   : 队列模式, null=生产及消费,false=仅生产,true=仅消费,             "check"  : 自动重载消息队列触发函数,                 true=(默认)校验"消费回调"加载的文件变动,                 false=仅校验队列配置文件变动,                 字符串=以"@"开头的正则忽略路径(软链接使用真实路径), 如: "@/ctrl/@i"             "memory" : 单个并发分配内存积累过高后自动重置, 单位M, 默认50, 0=不限制             "keys"   : 消费消息时回调结构 {                 消息键 : 不存在的键将被抛弃 {                     "lots" : 批量消费, 1=单条消费, >1=一次消费最多数量(消息变成一维数组)                     "cNum" : 并发数量,                     "call" : 回调结构                         返回 true=成功删除消息                         返回 false=失败稍后重试                         返回 数字=五年内秒数为xx后秒重试, 其它为指定时间戳重试                         返回 其它=抛出错误稍后重试                         返回 接口响应结构="code" < 400为true, 反之为false                 }, ...             }         }, ...     } }

  10. 消费回调统一的回调参数与返回值
  11. 回调参数(_of.com.mq.连接池.queues.队列名.keys.消息键.call) : {
        "pool"  : 指定消息队列池,
        "queue" : 队列名称,
        "key"   : 消息键,
        "lots"  : 批量消费数量,
        "this"  : 当前并发信息 {
            "cMd5" : 回调唯一值
            "cCid" : 当前并发值
        }
        "count" : 调用计数, 首次为 1
            lots=1时 : 调用计数
            lots>1时 : [调用计数, ...]
        "msgId" : 消息ID列表
            lots=1时 : 消息ID
            lots>1时 : [消息ID, ...]
        "data"  :x消息数据, _fire 函数实现
            lots=1时 : 消息数据
            lots>1时 : {
                消息ID : 消息数据,
                ...
            }
        "msgs"  : 消息列表 {
            消息ID : 单条消息 {
                "msgId" : 消息ID
                "count" : 调用计数, 首次为 1
                "data"  : 消息数据,
                "uTime" : 更新时间戳
            }, ...
        }
    }
    返回值(_of.com.mq.连接池.queues.队列名.keys.消息键.call) :
        返回 true=成功删除消息
        返回 false=失败稍后重试
        返回 数字=五年内秒数为xx后秒重试, 其它为指定时间戳重试
        返回 其它=抛出错误稍后重试
        返回 接口响应结构="code" < 400为true, 反之为false
        注意 回调不能使用类似exit的方式结束回调, 会导致消费进程重启, 降低消费速度

队列方式

  1. redis 方式的消息队列
  2. 切换redis步骤
    
    1. 搭建redis分布式节点(不建议集群)
    2. 通过slotNum方法计算节点分槽(使消息可以分配均匀)
    3. 修改mq配置中mqSlot为计算的分槽结果
    4. 迁移消息队列需要配置 moveMq

    redis 配置结构(_of.com.mq) : {     消息队列池名 : {         //配置Redis时建议配置 maxmemory, 内存满时只会拒绝服务不会导致服务崩溃         "adapter" : "redis",         "params"  : 调度参数 {             "vHost"  : 虚拟主机, 不同主机下的同名队列互不冲突, 默认=""             "kvPool" : _of.com.kv 的redis连接池, 建议使用混合型持久化             "mqSlot" : 包含两项>0的一维数组, 如[[0, 1], [0, 1, 2]]                        其作用是将消息均衡分布到各节点, 每项对应一个分槽                        选择合适的数字或字符串让每个节点有一个分槽就是最优的方式                        第0项数组代表生产分槽, 总个数代表总分槽数;                        第1项数组代表消费分槽, 要覆盖生产分槽所有项, 当生产分槽调整时, 要包含调整前所有项, 待分槽自动调整完后, 改为生产分槽                        注意: 真实节点变动时(含域名)不能使用此项, 必须须使用"moveMq"迁移队列, 用vHost或db防重, 否则会丢消息     }, ... }

    /**   描述 : 计算分槽数字   参数 :        host : 测试地址, 格式 [host:port, ...]        pwd  : 测试密码   打印 :        输出num个数的分槽数字   作者 : Edgar.lee  / function slotNum($host, $pwd) {     of_base_com_kv::pool($pool = uniqid(), array(         'adapter' => 'redis',         'params'  => array(             'type' => 'distributed',             'host' => $host,             'auth' => $pwd,             'db'   => 0,         )     ));

        $i = 0;     $num = count(array_unique($host));     $redis = of_base_com_kv::link($pool);

        do {         $h = $redis->_target($i);         isset($list[$h]) || $list[$h] = $i;         ++$i;     } while (count($list) < $num);

        echo join(', ', $list); } slotNum(array('xxx:6379', ...), '****');

    /**   描述 : 获取redis消息队列运行信息   参数 :        params : {            "match"   :o正则匹配队列标识, 以@开头, 默认false不过滤            "mqSlot"  :o过滤队列分槽列表, 默认false=全部消息(消费分槽), true=升级中的(生产与消费分槽差集)            "total"   :o是否统计消息总数, 默认false不统计, true=统计            "overdue" :o是否统计可消费数, 默认false不统计, true=统计            "failNo"  :o是否统计失败总数, 默认false不统计, true=统计            "failed"  :o读取最大失败消息, 默认0不查询, >0为最大长度            "recent"  :o读取即将消费消息, 默认0不查询, >0为最大长度        }   返回 :        {            队列标识, 队列池.队列名.消息键 : {                "mqName"  : 消息名称, 虚拟机.队列名.消息键                "mqSlot"  : 队列槽列表 {                    虚拟机.队列名.消息键.槽编码 : {                        "total"   : 消息总数                        "overdue" : 可消费数                        "failNo"  : 失败总数                    }, ...                }                "total"   : 消息总数                "overdue" : 可消费数                "failNo"  : 失败总数                    消息ID : {}, ...                }                "recent"  : 消费列表 {                    消息ID : {}, ...                }                "msgList" : 消息属性汇总列表 {                    消息ID : 消息属性 {                        "msgId"      : 消息ID,                        "data"       : 消息数据,                        "syncCount"  :o失败次数,                        "syncLevel"  : 同步等级(更新消息时重置),                        "planTime"   :o计划时间戳(不存在时为删除),                        "updateTime" : 更新时间戳,                        "lockTime"   :o锁定时间戳(在此范围内不执行),                        "lockMark"   :o锁定标记(防止执行超时被其它消费)                    },                    ...                }            }        }   作者 : Edgar.lee  */ of_accy_com_mq_redis::getMqInfo($params = array());

  3. mysql 方式的消息队列
  4. mysql 配置结构(_of.com.mq) : {
        消息队列池名 : {
            "adapter" : "mysql",
            "params"  : 调度参数 {
                "vHost"  : 虚拟主机, 不同主机下的同名队列互不冲突, 默认=""
                "dbPool" : _of.db 的连接池
            }
        }, ...
    }
    

    CREATE TABLE `_of_com_mq` (     `mark` char(35) NOT NULL COMMENT '消息唯一ID(虚拟主机+队列名称+消息类型+消息ID)',     `vHost` char(50) NOT NULL COMMENT '虚拟主机',     `queue` char(50) NOT NULL COMMENT '队列名称',     `type` char(50) NOT NULL COMMENT '消息类型',     `msgId` char(100) NOT NULL COMMENT '消息ID',     `data` mediumtext NOT NULL COMMENT '队列数据',     `syncCount` int(11) UNSIGNED NOT NULL COMMENT '已同步次数',     `createTime` timestamp NOT NULL DEFAULT '2000-01-01 00:00:00' COMMENT '生成时间',     `updateTime` timestamp NOT NULL DEFAULT '2000-01-01 00:00:00' COMMENT '消费时间, 2001为删除',     `syncLevel` int(11) UNSIGNED NOT NULL COMMENT '同步等级, 数值越大优先级越低',     `lockTime` timestamp NOT NULL DEFAULT '2000-01-01 00:00:00' COMMENT '锁定时间, 每 syncLevel * 5 分钟重试',     `lockMark` char(32) NOT NULL COMMENT '锁定时生成的唯一ID',     PRIMARY KEY (`type`,`mark`) USING BTREE,     KEY `idx_consumer` (`lockTime`,`type`,`queue`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息队列表' PARTITION BY KEY (`type`) PARTITIONS 251;

    下载 of < 200228 mysql模式消息队列升级包: 两种升级方式选其一     1. 停机升级: 停止WEB服务 -> 执行升级前SQL -> 执行升级后SQL -> 升级框架 -> 开启WEB服务     2. 无缝升级: 执行升级前SQL -> 升级框架 -> 重载队列 -> 执行升级后SQL

of_base_com_mq::set($keys, $data = null, $pool = 'default', $bind = null) 设置消息队列

  • 事务操作 :
        keys : null=开启事务, true=提交事务, false=回滚事务
        data : 指定消息队列池
        pool : 指定数据库连接池
    生产消息, 单条模式 :
        keys : 字符串=指定消息类型, 数组=[消息类型, 消息ID, 延迟秒数]
        data : null=删除 [消息类型, 消息ID] 指定的信息, 其它=消息数据
        pool : 指定消息队列池
        bind : ""=绑定到内部事务, 字符串=绑定数据池同步事务
    生产消息, 批量模式 :
        keys : 批量消息, [{"keys" : 单条模式keys结构, "data" : 单条模式data结构}, ...]
        pool : 指定数据库连接池
#_of.com.mq 配置结构
array(
    //消息队列池
    'exchange' => array(
        //适配器
        'adapter' => 'mysql',
        //调度参数
        'params'  => array(
            'dbPool' => 'default'
        ),
        //绑定事务数据库
        'bindDb'  => 'default',
        //队列列表
        'queues'  => '/demo/queue/queue.php'
    )
)

#/demo/queue/queue.php 配置结构 return array(     //生产消息时会同时发给多队列     'queue1' => array(         //队列模式, null=生产及消费, false=仅生产, true=仅消费         'mode' => null,         //消费消息时回调结构         'keys' => array(             //不存在的键将被抛弃             'key' => array(                 'cNum' => 3,                 'call' => 'demo_index::mqTest'             ),             'key1' => array(                 'cNum' => 1,                 'call' => 'demo_index::mqTest'             )         )     ),     'queue2' => array(         //队列模式, null=生产及消费, false=仅生产, true=仅消费         'mode' => null,         //消费消息时回调结构         'keys' => array(             //不存在的键将被抛弃             'key' => array(                 'cNum' => 4,                 'call' => 'demo_index::mqTest'             )         )     ) );

#demo_index::mqTest 代码 /**   描述 : 演示消息队列  / public function mqTest($params = null) {     //触发消息队列     if ($params) {         file_put_contents(             ROOT_DIR . OF_DATA . '/mqTest.txt',              time() . print_r($params, true),             FILE_APPEND | LOCK_EX         );         return true;     //生产消息队列     } else {         if (of::config('_of.com.mq.exchange')) {             echo '异步并发消息回调将在此文件中写入数据: ',                 ROOT_DIR . OF_DATA . '/mqTest.txt';             L::sql(null);             //批量创建消息队列, queue1 与 queue2 将同时收到信息             of_base_com_mq::set(array(                 array('keys' => 'key', 'data' => array(1, 2, 3)),                 array('keys' => array('key'), 'data' => array(4, 5, 6)),                 array('keys' => array('key1', '延迟ID', 600), 'data' => '消息信息(可传数组)'),             ), 'exchange');             //因 queue2 没有 key1 键, 所以仅 queue1 会收到信息             of_base_com_mq::set(array('key1', '消息ID'), '消息信息(可传数组)', 'exchange');             L::sql(true);         } else {             echo '先取消/demo/config.php下_of.com.mq的注释';         }     } }

#web 访问 demo_index::mqTest 将会在 OF_DATA/mqTest.txt 输出 1518646689Array ( [pool] => exchange [queue] => queue1 [key] => key [data] => 消息信息 [this] => 当前并发信息 ( [cMd5] => 回调唯一值 [cCid] => 当前并发值 ) [msgId] => 消息ID [count] => 调用计数, 首次为 1 ) 1518646689Array ( [pool] => exchange [queue] => queue2 [key] => key [data] => 消息信息 [this] => 当前并发信息 ( [cMd5] => 回调唯一值 [cCid] => 当前并发值 ) [msgId] => 消息ID [count] => 调用计数, 首次为 1 )

二次开发

  1. 文件夹 /of/accy/com/mq 下存储着不同方式的对接文件
  2. 目前有已封装 mysql redis
  3. 可以通过配置文件 _of.com.mq 来使用不同方式, 支持多连接池
  4. 开发更多的存储方式, 要继承 of_base_com_mq 类并实现以下方法

abstract protected function _init($fire) 初始队列

  • firearray
    队列定位
    {
        "pool" : 消息的队列池
        "bind" : 绑定的数据库池
    }

abstract protected function _sets(&$msgs) 设置消息, 成功返回true, 失败返回false

  • msgsarray
    需要设置的消息集合
    [{
        "keys"  : 消息定位 [消息类型, 消息主键],
        "data"  : 消息数据, null=删除 keys 指定的信息, 其它=消息数据(包括数组)
        "pool"  : 指定消息队列池,
        "bind"  : ""=绑定到手动事务, 字符串=绑定数据池同步事务
        "queue" : 队列名称
    }, ...]

abstract protected function _fire(&$calll, $data) 触发消息队列, 根据回调响应值执行对应动作

  • callstring array
    符合回调结构
  • dataarray
    需要设置的消息集合, call的回调参数
    {
        "pool"  : 指定消息队列池,
        "queue" : 队列名称,
        "key"   : 消息键,
        "lots"  : 批量消费数量,
        "this"  : 当前并发信息 {
            "cMd5" : 回调唯一值
            "cCid" : 并发ID, 从1-n
        },
        "msgs"  : 消息列表 {
            消息ID : 单条消息 {
                "msgId" : 消息ID
                "count" : 调用计数, 首次为 1
                "data"  : 消息数据,
                "uTime" : 更新时间戳
            }, ...
        },
        ...
    }
    

    在_fire实现的方法中调用of_base_com_mq::callback来执行消息队列 $result = of_base_com_mq::callback(_fire的$call参数, _fire的$data参数 + array(     "msgs"  : 消息列表 {         消息ID : 单条消息 {             "msgId" : 消息ID             "count" : 调用计数, 首次为 1             "data"  : 消息数据,             "uTime" : 更新时间戳         }, ...     },     ... )); 同时 $result 的三个返回值, 要去实现     返回 true=成功删除消息     返回 false=失败稍后重试     返回 数字=五年内秒数为xx后秒重试, 其它为指定时间戳重试

返回结构: true=已匹配到消息, false=未匹配到消息

abstract protected function _quit(&$data) 触发消息队列意外退出时回调

  • dataarray
    回调参数结构
    {
        "pool"  : 指定消息队列池,
        "queue" : 队列名称,
        "key"   : 消息键,
        "this"  : 当前并发信息 {
            "cMd5" : 回调唯一值
            "cCid" : 当前并发值
        }
        "msgs"  : 消息数据列表
    }

abstract protected function _begin() 开启事务, 成功返回 true, 失败返回 false

abstract protected function _commit($type) 提交事务, 成功返回 true, 失败返回 false

  • typestring
    "before"=提交开始回调, "after"=提交结束回调

abstract protected function _rollBack($type) 事务回滚, 成功返回 true, 失败返回 false

  • typestring
    "before"=回滚开始回调, "after"=回滚结束回调
Clone this wiki locally