rabbitmq延时重试队列

   日期:2020-06-01     浏览:235    评论:0    
核心提示:如果只是网络抖动 出现异常那么直接进入死信队列 那么是不合理的这就可以使用延时重试队列原理:1.发送到业务队里 如果正常收到 正常运行2.如果处理失败 重试 并投入延时队列 如果超过延时时间 重新投入业务队列3.如果重试次数大于3 那么进入死信队列1.业务队列@Configurationpublic class BusinessConfig { public s.大数据

如果只是网络抖动 出现异常那么直接进入死信队列 那么是不合理的

这就可以使用延时重试队列

原理:

1.发送到业务队里 如果正常收到 正常运行

2.如果处理失败 重试  并投入延时队列 如果超过延时时间 重新投入业务队列

3.如果重试次数大于3 那么进入死信队列

1.业务队列

@Configuration
public class BusinessConfig {

    
    public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";

    
    public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";

    
    public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";

    
    @Bean
    public DirectExchange yewu1Exchange() {
        DirectExchange directExchange = new DirectExchange(YEWU1_EXCHANGE, true, false);
        return directExchange;
    }

    
    @Bean
    public Queue yewu1DemoQueue() {
        return new Queue(YEWU1_DEMO_QUEUE, true, false, false);
    }

    
    @Bean
    public Binding yewu1DemoBinding() {
        return BindingBuilder.bind(yewu1DemoQueue()).to(yewu1Exchange())
            .with(YEWU1_DEMO_ROUTINGKEY);
    }
}

2.延时队列

@Configuration
public class RetryConfig {

    
    public static final String RETRY_LETTER_QUEUE_KEY = "x-dead-letter-exchange";

    
    public static final String RETRY_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

    
    public static final String RETRY_MESSAGE_TTL = "x-message-ttl";

    
    public final static String YEWU1_RETRY_EXCHANGE_NAME = "yewu1_retry_exchange";

    
    public final static String YEWU1_DEMO_RETRY_QUEUE_NAME = "yewu1_demo_retry_queue";

    
    public final static String YEWU1_DEMO_RETRY_ROUTING_KEY = "yewu1_demo_retry_key";

    
    @Bean
    public DirectExchange yewu1RetryExchange() {
        DirectExchange directExchange = new DirectExchange(YEWU1_RETRY_EXCHANGE_NAME, true, false);
        return directExchange;
    }

    
    @Bean
    public Queue yewu1DemoRetryQueue() {
        Map<String, Object> args = new ConcurrentHashMap<>(3);
        // 将消息重新投递到业务交换机Exchange中
        args.put(RETRY_LETTER_QUEUE_KEY, BusinessConfig.YEWU1_EXCHANGE);
        args.put(RETRY_LETTER_ROUTING_KEY, BusinessConfig.YEWU1_DEMO_ROUTINGKEY);
        // 消息在队列中延迟3s后超时,消息会重新投递到x-dead-letter-exchage对应的队列中,routingkey为自己指定
        args.put(RETRY_MESSAGE_TTL, 3 * 1000);
        return new Queue(YEWU1_DEMO_RETRY_QUEUE_NAME, true, false, false, args);
    }

    
    @Bean
    public Binding retryDirectBinding() {
        return BindingBuilder.bind(yewu1DemoRetryQueue()).to(yewu1RetryExchange())
            .with(YEWU1_DEMO_RETRY_ROUTING_KEY);
    }

}

3.死信队列

@Configuration
public class DeadConfig {

    
    public final static String FAIL_QUEUE_NAME = "fail_queue";

    
    public final static String FAIL_EXCHANGE_NAME = "fail_exchange";

    
    public final static String FAIL_ROUTING_KEY = "fail_routing";

    
    @Bean
    public Queue deadQueue() {
        return new Queue(FAIL_QUEUE_NAME, true, false, false);
    }

    
    @Bean
    public DirectExchange deadExchange() {
        DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
        return directExchange;
    }

    
    @Bean
    public Binding failBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
    }

}

4.生产者

@RestController
@RequestMapping("/TestRabbit")
public class ProducerDemo {

    @Resource
    private RabbitTemplate rabbitTemplate;

    //@RequestMapping("/sendDirect")
    String sendDirect(@RequestBody String message) throws Exception {
        System.out.println("开始生产");
        CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, BusinessConfig.YEWU1_DEMO_ROUTINGKEY,
            message, data);
        System.out.println("结束生产");
        System.out.println("发送id:" + data);
        return "OK,sendDirect:" + message;
    }
}

5.消费者

public enum RabbitEnum {
 
    
    ACCEPT,
 
    
    RETRY,
 
    
    REJECT
@Component
public class ConsumerDemo {

    private final static Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);

    @Resource
    private RabbitTemplate rabbitTemplate;

    // @RabbitListener(queues = "yewu1_demo_queue")
    protected void consumer(Message message, Channel channel) throws Exception {
        RabbitEnum ackSign = RabbitEnum.RETRY;
        System.out.println(message.getMessageProperties().getCorrelationId());
        try {
            // 可以加入重复消费判断
            int i = 1 / 0;

        } catch (Exception e) {
            ackSign = RabbitEnum.RETRY;
            throw e;
        } finally {
            // 通过finally块来保证Ack/Nack会且只会执行一次
            if (ackSign == RabbitEnum.ACCEPT) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else if (ackSign == RabbitEnum.RETRY) {
                String correlationData =
                    (String)message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
                System.out.println(message.getMessageProperties().getCorrelationId());
                long retryCount = getRetryCount(message.getMessageProperties());
                if (retryCount >= 3) {
                    // 重试次数超过3次,则将消息发送到失败队列等待特定消费者处理或者人工处理
                    try {
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                        rabbitTemplate.convertAndSend(DeadConfig.FAIL_EXCHANGE_NAME, DeadConfig.FAIL_ROUTING_KEY,
                            message, new CorrelationData(correlationData));
                        logger.info("连续失败三次,将消息发送到死信队列,发送消息:" + new String(message.getBody()));
                    } catch (Exception e1) {
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                        logger.error("发送死信队列报错:" + e1.getMessage() + ",原始消息:" + new String(message.getBody()));
                    }
                } else {
                    try {
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                        // 重试次数不超过3次,则将消息发送到重试队列等待重新被消费
                        rabbitTemplate.convertAndSend(RetryConfig.YEWU1_RETRY_EXCHANGE_NAME,
                            RetryConfig.YEWU1_DEMO_RETRY_ROUTING_KEY, message,
                            new CorrelationData(correlationData));
                        logger.info("消费失败,消息发送到重试队列;" + "原始消息:" + new String(message.getBody()) + ";第"
                            + (retryCount + 1) + "次重试");
                    } catch (Exception e1) {
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                        logger.error("消息发送到重试队列的时候,异常了:" + e1.getMessage() + ",重新发送消息");
                    }
                }
            }
        }
    }

  

    
    public long getRetryCount(MessageProperties messageProperties) {
        Long retryCount = 0L;
        if (null != messageProperties) {
            List<Map<String, ?>> deaths = messageProperties.getXDeathHeader();
            if (deaths != null && deaths.size() > 0) {
                Map<String, Object> death = (Map<String, Object>)deaths.get(0);
                retryCount = (Long)death.get("count");
            }
        }
        return retryCount;
    }
}

 

 

 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服