package com.dobbinsoft.fw.support.mq; import com.alibaba.fastjson.JSONObject; import com.dobbinsoft.fw.support.properties.FwReliableMQProperties; import com.rabbitmq.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import java.io.IOException; import java.io.Serializable; /** * ClassName: AbstractReliableMessageHandler * Description: 一个抽象的RabbitMQ,用于自动创建Consumer * * @author: e-weichaozheng * @date: 2021-05-26 */ public abstract class AbstractReliableMessageHandler implements ReliableMessageHandler, InitializingBean { @Autowired private ConnectionFactory connectionFactory; @Autowired private FwReliableMQProperties fwReliableMQProperties; private Connection connection; private static final Logger logger = LoggerFactory.getLogger(AbstractReliableMessageHandler.class); @Override public void afterPropertiesSet() throws Exception { if ("rabbit".equals(fwReliableMQProperties.getEnable())) { this.connection = connectionFactory.newConnection(); Channel channel = this.connection.createChannel(); channel.exchangeDeclare(fwReliableMQProperties.getRabbitExchangeName(), "topic", true, false, null); channel.queueDeclare(getTopic(), true, false, false, null); // 绑定队列和交换机 channel.queueBind(getTopic(), fwReliableMQProperties.getRabbitExchangeName(), "topic"); // 创建消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 反序列化为 T T t = JSONObject.parseObject(body, AbstractReliableMessageHandler.this.getMessageClass()); AbstractReliableMessageHandler.this.handle(t, body); } }; // 开始获取消息 channel.basicConsume(getTopic(), fwReliableMQProperties.getRabbitAutoAck(), consumer); logger.info("[Rabbit MQ] 消费者开始监听消息 Topic=" + getTopic()); } else { throw new RuntimeException("不支持队列类型:" + fwReliableMQProperties.getEnable()); } } }