谷粒商城-高级-65 -商城业务-消息队列-Springboot 整合
一、RMQ相关配置
1、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
说明:
使用RabbitMQ
1、引入amqp场景;RabbitAutoConfiguration就会自动生效
2、给容器中自动配置了
RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
@ConfigurationProperties(prefix="spring.rabbitmq")
public class RabbitProperties
3、给配置文件中配置 spring.rabbitmq 信息
4、@EnableRabbit (开启RMQ注解)
2、启动类开启RMQ注解
gulimall-order/xxx/order/GulimallOrderApplication.java
@EnableRabbit // 开启消息队列
// 添加注册发现功能
@EnableDiscoveryClient
@MapperScan("com.atguigu.gulimall.order.dao")
@SpringBootApplication
public class GulimallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(GulimallOrderApplication.class, args);
}
}
3、 配置文件增加RMQ属性
gulimall-order/src/main/resources/application.properties
# RabbitMQ配置
spring.rabbitmq.host=192.168.10.10
spring.rabbitmq.port=5672
# 虚拟主机配置
spring.rabbitmq.virtual-host=/
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
二、RMQ使用
1、单元测试
gulimall-order/xxx/order/GulimallOrderApplicationTests.java
package com.atguigu.gulimall.order;
import com.atguigu.gulimall.order.entity.OrderReturnReasonEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.util.Date;
import java.util.HashMap;
import java.util.UUID;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallOrderApplicationTests {
@Autowired
private AmqpAdmin amqpAdmin; // 创建交换机名,创建队列,创建绑定
@Autowired
private RabbitTemplate rabbitTemplate; // 发送消息相关
/**
* 1、如何创建Exchange、Queue、Binding
* 1)、使用AmqpAdmin进行创建
* 2、如何收发消息
*/
@Test
public void createExchange() {
Exchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange[{}]创建成功:","hello-java-exchange");
}
@Test
public void testCreateQueue() {
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功:","hello-java-queue");
}
@Test
public void createBinding() {
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",
null);
amqpAdmin.declareBinding(binding);
log.info("Binding[{}]创建成功:","hello-java-binding");
}
@Test
public void create() {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange");
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功:","order.delay.queue");
}
@Test
public void sendMessageTest() {
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("reason");
reasonEntity.setStatus(1);
reasonEntity.setSort(2);
String msg = "Hello World";
//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
//2、发送的对象类型的消息,可以是一个json
rabbitTemplate.convertAndSend("hello-java-exchange","hello2.java",
reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
log.info("消息发送完成:{}",reasonEntity);
}
}
三、RMQ队列监听
监听消息
使用 @RabbitListener
:必须有 @EnableRabbit
@RabbitListener
:类+方法上(监听那些队列即可)@RabbitHandler
:标在方法上(重载区分不同的消息)
RabbitListener
`com/atguigu/gulimall/order/service/impl/OrderItemServiceImpl.java`
```
/**
* 监听队列
* 参数可以写类型
* 1、Message message:原生消息详细信息。头+体
* queues:声明需要监听的所有队列
* channel:当前传输数据的通道
*
* Queue:可以很多人来监听。只要收到消息,队列删除消息,而且只能有一个收到消息(分布式场景)
* 场景:
* 1)、订单服务启动多个:同一个消息,只能有一个客户端收到
*/
@RabbitListener(queues = {"hello-java-queue"})
public void revieveMessage(Message message,
OrderReturnReasonEntity content) {
//拿到主体内容
byte[] body = message.getBody();
//拿到的消息头属性信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("接受到的消息...内容" + message + "===内容:" + content);
}
```
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)