RabbitMq - Java客户端基础【简单案例 +Work模型】

news/2024/7/18 17:02:56 标签: java, java-rabbitmq, rabbitmq

目录

 1、前置知识

1.1、AMQP怎么理解

1.2、Spring AMQP是什么

1.3、为什么要了解Spring-AMQP?

2、使用Spring-AMQP实现一个发消息案例

3、Work模型

问题:

优化:

小结:Work模型的使用:


 1、前置知识

1.1、AMQP怎么理解

  • 全称:Advance Message Queuing Protocol
  • 用途:用于在应用程序之间传递业务消息的开放标准;
  • 该协议与语言、平台无关,更符合微服务中独立性的要求

1.2、Spring AMQP是什么

  • Spring AMQP是基于AMQP协议定义的一套API规范,提供了模版来发送和接收消息;
  • 包含两部分,其中spring-amqp是基础抽象(接口),spring-rabbit是底层的默认实现(实现)

        也就是说,你在使用中,只需要调用Spring AMQP提供的接口就可以了,而Spring AMQP的底层是使用AMQP的(可以理解为AMQP是一种思想,Spring AMQP是它的实现);

1.3、为什么要了解Spring-AMQP?

        RabbitMq给java提供的原生的一些使用方法,过于的复杂不便于日常开发的使用,而Spring-AMQP对RabbitMQ进行了一层封装,让我们在使用中更加的简洁了~


2、使用Spring-AMQP实现一个发消息案例

案例 - 黑马课程中的一个简单的微服务~

需求如下:

  • 利用控制台创建队列demo1.queue
  • 在publisher服务中,利用SpringAMQP直接向demo1.queue发送消息
  • 在consumer服务中,利用SpringAMQP编写消费者,监听demo1.queue队列
  • 这个案例先不考虑交换机~

准备一个项目,我的项目目录如下:

步骤一:在控制台中新建一个demo1.queue队列

注:不会创建的可以看我的上一篇文章~

步骤二:父工程中引入AMQP的依赖

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

注:父工程中引入了,他的两个子工程都可以使用~ 

步骤三:配置RabbitMQ服务端信息

需要在每个微服务中引入MQ服务端信息,这样微服务才能连接到RabbitMQ,配置如下:

spring:
  rabbitmq:
    host: env-base
    port: 5672
    virtual-host: /
    username: root
    password: 1111

具体信息,需要根据你自己电脑的信息修改哦~

步骤四:发送消息

        正常是在业务中发送消息,我们这里为了便于快速看到结果,就在publisher下的单元测试中模拟发送(效果一样)~

@SpringBootTest
public class AMQPTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testdemo1(){
        //队列名
        String queueName = "demo1.queue";
        //消息
        String message = "are you ok ?";
        //发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

步骤五:接收消息

        接收消息是需要长期监控着队列,因此我们写在consumer服务下业务代码中即可~

@Component
public class ListenAMQP {
    
    @RabbitListener(queues = "demo1.queue")
    public void listenDemo1(String msg){
        System.out.println("接收到消息:" +msg);
    }
}

步骤六:启动项目(consumer) - 启动后再执行publisher下的单元测试

观察到的结果如下:


3、Work模型

我们使用案例来理解work模型~

模拟WorkQueue,实现一个队列绑定多个消费者

需求如下:

  • 在RabbitMQ的控制台创建一个队列 - work.queue
  • 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
  • 在consumer服务中定义两个消息监听者,都监听work.queue队列
  • 消费者1秒处理50条消息,消费者2每秒处理5条消息

创建队列就不说了,我们来看代码:

publisher中的代码(同上,使用单元测试来模拟):

    @Test
    public void testwork() throws InterruptedException {
        //队列名
        String queueName = "work.queue";

        for (int i = 1; i <= 50 ;i++){
            rabbitTemplate.convertAndSend(queueName, "work消息 --- " + i);
            Thread.sleep(20);
        }
    }

consumer代码:

    @RabbitListener(queues = "work.queue")
    public void listenwork1(String msg){
        System.out.println("接收到消息:" + msg);
    }
    @RabbitListener(queues = "work.queue")
    public void listenwork2(String msg){
        System.err.println("接收到消息:" + msg);
    }

结果:

上面打印时,我将两个消费者使用不同的颜色打印:

我们会看到消息是一人一个,很均匀有序的划分~

我们把每个消费者处理的速度控制一下:

再观察结果:

虽然顺序不太一样了,还好像依然是一人一个的划分,即使其中一个消费者比另一个消费的更快~

问题:

        消费者的消息推送有一定限制,在默认情况下,RabbitMQ会将消息依次投递给绑定在队列上的每一个消费者,但是这并没有考虑到消费者是否已经处理完消息,可能会出现消息堆积

优化:

        修改application.yml,设置preFetch的值为1,确保同一时刻最多投递给消费者1条消息,一条处理完了,才会收到下一条~

运行结果,能者多劳:

小结:Work模型的使用:

  • 多个消费者绑定到一个队列,可以加快消息处理速度(解决消息堆积问题)
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一题,实现能者多劳

http://www.niftyadmin.cn/n/5544381.html

相关文章

【leetcode周赛记录——405】

405周赛记录 #1.leetcode100339_找出加密后的字符串2.leetcode100328_生成不含相邻零的二进制字符串3.leetcode100359_统计X和Y频数相等的子矩阵数量4.leetcode100350_最小代价构造字符串 刷了一段时间算法了&#xff0c;打打周赛看看什么水平了 #1.leetcode100339_找出加密后的…

拿客户电脑,用豆包IDE逆天改命完成需求紧急开发!被公司奖励500!

故事背景 原文链接&#xff1a;拿客户电脑&#xff0c;用豆包IDE逆天改命完成需求紧急开发&#xff01;被公司奖励500&#xff01; 前几天&#xff0c;业务拉了一个大客户&#xff0c;客户需要先看我们做的样本项目&#xff08;类似于官网首页&#xff09;&#xff0c;然后才…

手把手教学!金融行业Google UAC 3.0 广告到底该怎么投?

浅谈 Google UAC 通用应用广告系列&#xff08;即Universal App Campaigns&#xff0c;简称UAC&#xff09;是Google于2015年推出的一种旨在为移动应用&#xff08;Mobile App&#xff09;寻找更多新用户的广告形式。UAC可覆盖Google用户量最多的各款产品和服务&#xff0c;包…

3.js - 模板渲染 - side: THREE.FrontSide、THREE.BackSide - 狗都不学

// ts-nocheck// 引入three.js import * as THREE from three// 导入轨道控制器 import { OrbitControls } from three/examples/jsm/controls/OrbitControls// 导入lil.gui import { GUI } from three/examples/jsm/libs/lil-gui.module.min.js// 导入tween import * as TWEEN…

百度amis vue3引入减少包提体积

不用amis的包 , 在index.html引入代码如下,要在main上面 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><link rel"icon" href"/favicon.ico" media"print"/><meta http…

vue3中antd表格展示后端返回的嵌套集合数据

后端返回数据格式&#xff1a; 实现方法&#xff1a; 走了很多坑&#xff0c;记录下

软考高级第四版备考--第12天(管理质量)Management Quanlity

定义&#xff1a;把组织的质量政策用于项目&#xff0c;并将质量管理计划转化为可执行的质量活动的过程 作用&#xff1a; 提高实现质量目标的可能性&#xff1b;识别无效过程和导致质量低劣的原因&#xff1b;使用控制质量过程的数据和结果向干系人展示项目的总体质量状态 …

vulhub-activemq(CVE-2016-3088)

在 Apache ActiveMQ 5.12.x~5.13.x 版本中&#xff0c;默认关闭了 fileserver 这个应用&#xff08;不过&#xff0c;可以在conf/jetty.xml 中开启&#xff09;&#xff1b;在 5.14.0 版本后&#xff0c;彻底删除了 fileserver 应用。【所以在渗透测试过程中要确定好 ActiveMQ …