php rabbitmq的开发体验(三)【PHP教程】

!
也想出现在这里? 联系我们
信息

php rabbitmq的开发体验(三),第1张

概述一、前言在上一篇rabbitmq开发体验(二),我们正式的用我们php来 *** 作消息队列的生产和消费,并利用的rabbitmq的高级特性来进行ack确认机制,幂等性,限流机制,重回机制,ttl,死信队列(相当于失败消息的回收站)。已经可以正常的使用,但消息消费异常问题罗列以下。1、自动ack机制会导致消息丢失的 一、前言

在上一篇rabbitmq开发体验(二),我们正式的用我们PHP来 *** 作消息队列的生产和消费,并利用的rabbitmq的高级特性来进行ack确认机制,幂等性,限流机制,重回机制,ttl,死信队列(相当于失败消息的回收站)。已经可以正常的使用,但消息消费异常问题罗列以下。

1、自动ack机制会导致消息丢失的问题;

简要代码如下,设置消息自动ack,这种情况下,MQ只要确认消息发送成功,无须等待应答就会丢弃消息,
这会导致客户端还未处理完时,出异常或断电了,导致消息丢失的后果。解决方法就是把代码里的true,改成false,并在消息处理完后发ack响应。
注:自动ack还有个弊端,只要队列不空,RabbitMQ会源源不断的把消息推送给客户端,而不管客户端能否消费的完。

$this->channel->basic_consume(    $this->query_name,    \'\',     //customer_tag    false,  //no_local    true,  //no_ack 消息自动ack    false,   //exclusive 排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接    false,  //Nowait

2、自动ack机制会导致消息丢失的问题;

为了解决问题1,做了改进,简要代码如下:


$this->channel->basic_consume(    $this->query_name,    \'\',     //customer_tag    false,  //no_local    false,  //no_ack 关闭自动ack,手工发送ack    false,   //exclusive 排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接    false,  //Nowait

 

$msg->delivery_info[\'channel\']->basic_ack($msg->delivery_info[\'delivery_tag\']); //手动在成功消费后发送ack

先处理消息,完成后,再做ack响应,失败就不做ack响应,这样消息会储存在MQ的Unacked消息里,不会丢失,看起来没啥问题,
但是有一次,callback触发了一个BUG,导致所有消息都抛出异常,然后队列的Unacked消息数暴涨,导致MQ响应越来越慢,甚至崩溃的问题。
原因是如果MQ没得到ack响应,这些消息会堆积在Unacked消息里,不会抛弃,直至客户端断开重连时,才变回ready;
如果Consumer客户端不断开连接,这些Unacked消息,永远不会变回ready状态,Unacked消息多了,占用内存越来越大,就会异常了。
解决办法就是及时去ack消息了。

3、启用nack机制后,导致的死循环;

为了解决问题2,再调整一下代码,简要代码如下:

catch (Exception $e){        $this->writeLog(\'runtime/vm_exception.log\',$e->getMessage());        //发送nack信息应答当前消息处理异常 第三个参数是否重回队列 默认false不重回队列        $msg->delivery_info[\'channel\']->basic_nack($msg->delivery_info[\'delivery_tag\'],false,true);}

嗯,改成这模样总没问题了吧,正常就ack,不正常就nack,并等下一次重新消费。
果然,又出问题了,这回又是callback出异常了,但是故障现象是Ready的消息猛增,一直不见减少。
原因是出异常后,把消息塞回队列头部,下一步又消费这条会出异常的消息,又出错,塞回队列……
进入了死循环了,当然新的消息不会消费,导致堆积了……

我的解决方案:

 

$retry = $this->getRetryCount($msg);try {        $routingKey = $this->getorigRoutingKey($msg);        $subMessage = new SubMessage($msg, $routingKey , [              \'retry_count\' => $retry, // 重试次数        ]);        $this->subscribe($subMessage);} catch (\\Exception $ex) {                        $this->writeLog(\'runtime/vm_consume_Failed.log\', \'消费失败!\' . $ex->getMessage() . $msg->getbody());        if ($retry > 3) {               // 超过最大重试次数,消息无法处理               $publishFailed($msg);               return;        }        // 消息处理失败,稍后重试        $publishRetry($msg);}
    /**     * 获取消息重试次数     * @param AMQPMessage $msg     * @return int     */    protected function getRetryCount($msg)    {        $retry = 0;        if ($msg->has(\'application_headers\')) {            $headers = $msg->get(\'application_headers\')->getNativeData();            if (isset($headers[\'x-death\'][0][\'count\'])) {                $retry = $headers[\'x-death\'][0][\'count\'];            }        }        return (int)$retry;    }
        // 发起延时重试        $publishRetry = function ($msg) use ($queuename,$exchangeRetryname) {            /** @var AMQPtable $headers */            if ($msg->has(\'application_headers\')) {                $headers = $msg->get(\'application_headers\');            } else {                $headers = new AMQPtable();            }            $headers->set(\'x-orig-routing-key\', $this->getorigRoutingKey($msg));            $propertIEs = $msg->get_propertIEs();            $propertIEs[\'application_headers\'] = $headers;            $newMsg = new AMQPMessage($msg->getbody(), $propertIEs);            $this->channel->basic_publish(                $newMsg,                $exchangeRetryname,                $queuename            );            //发送ack信息应答当前消息处理完成            $msg->delivery_info[\'channel\']->basic_ack($msg->delivery_info[\'delivery_tag\']);        };
    /**     * 声明重试队列     */    private function declareRetryQueue()    {        $this->channel->queue_declare($this->query_retry_name, false, true, false, false, false,new AMQPtable(array(            \'x-dead-letter-exchange\' => $this->exchange_name,            \'x-dead-letter-routing-key\' => $this->query_name,            \'x-message-ttl\'          => 3 * 1000,        )));        $this->channel->queue_bind($this->query_retry_name, $this->exchange_retry_name, $this->query_name);    }

 

总结

以上是内存溢出为你收集整理的php rabbitmq的开发体验(三)全部内容,希望文章能够帮你解决php rabbitmq的开发体验(三)所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

© 版权声明
THE END
喜欢就支持一下吧
点赞90 分享
评论 抢沙发

请登录后发表评论

    请登录后查看评论内容