`

rabbitmq 的三种模式fanout topic direct

阅读更多
1.需要在rabbitmq 管理界面上,定义用户和 Virtual host
登录地址:http://localhost:15672/
用户名为:gjpztb;密码:gjpztb; Virtual host

实现代码在附件中

2.建立maven 项目:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd" >

    <!-- 公共部分 -->
    <!-- 创建连接类 连接安装好的 rabbitmq -->
    <bean id="connectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="localhost" />
        <!-- username,访问RabbitMQ服务器的账户,默认是guest -->
        <property name="username" value="gjpztb" />
        <!-- username,访问RabbitMQ服务器的密码,默认是guest -->
        <property name="password" value="gjpztb" />
        <!-- host,RabbitMQ服务器地址,默认值"localhost" -->
        <property name="host" value="localhost" />
        <!-- port,RabbitMQ服务端口,默认值为5672 -->
        <property name="port" value="5672" />
        <!-- channel-cache-size,channel的缓存数量,默认值为25 -->
        <property name="channelCacheSize" value="50" />
        <!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) -->
        <property name="cacheMode" value="CHANNEL" />
        <property name="virtualHost" value="gjpztb" />
        <!--如果使用发布 confirm-callback="confirmCallBackListener"
                     return-callback="returnCallBackListener" publisherConfirms参数必须为true -->
        <property name="publisherConfirms" value="true" />

    </bean>
    <!--或者这样配置,connection-factory元素实际就是注册一个org.springframework.amqp.rabbit.connection.CachingConnectionFactory实例
    <rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}"
    username="${rmq.manager.user}" password="${rmq.manager.password}" />-->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--定义消息队列,durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,
    需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,
    而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性
    public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
    exclusive: 仅创建者可以使用的私有队列,断开后自动删除;
    auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
    <!--定义消息队列-->
    <rabbit:queue name="ztb.topic.queue.1" id="ztb_queue_1" durable="true" auto-delete="false" exclusive="false" />
    <rabbit:queue name="ztb.topic.queue.2" id="ztb_queue_2" durable="true" auto-delete="false" exclusive="false" />
    <rabbit:queue name="ztb.topic.queue.3" id="ztb_queue_3" durable="true" auto-delete="false" exclusive="false" />

    <!--绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,我们用direct模式,
    即rabbit:direct-exchange标签,Direct交换器很简单,如果是Direct类型,
    就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,
    如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:
    如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,
    这条消息会直接丢失,所以此处要小心,auto-delete:自动删除,如果为Yes,
    则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
    <!--绑定队列-->
    <!--符号#:匹配一个或者多个词cn.#  可以匹配cn.新闻或者cn.新闻.国内新闻-->
    <!--符号*:只能匹配一个词 cn.*  可以匹配cn.新闻或者us.新闻-->
    <!--主要如果key值与多个队列匹配时,多个队列都会受到信息-->
    <rabbit:topic-exchange id="ztb.topic" name="ztb.topic" durable="true" auto-delete="false" >
        <rabbit:bindings>
            <rabbit:binding queue="ztb.topic.queue.1" pattern="ztb.topic.*" ></rabbit:binding>
            <rabbit:binding queue="ztb.topic.queue.2" pattern="ztb.topic*" ></rabbit:binding>
            <rabbit:binding queue="ztb.topic.queue.3" pattern="ztb.phone.msg" ></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!-- 生产者部分 -->
    <!-- 发送消息的producer类,也就是生产者 -->
    <bean id="producterMq" class="com.ztb.pro.topic.ProducterMq">
        <property name="rabbitTemplate" ref="rabbitTemplate" />
    </bean>

    <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
    <!--<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>-->
    <!-- 或者配置jackson -->
    <!--   -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />


    <bean id="confirmCallBackListener" class="com.ztb.pro.topic.ConfirmCallBackListener" />

    <bean id="returnCallBackListener" class="com.ztb.pro.topic.ReturnCallBackListener" />
    <!--创建消息队列模板-->
    <!-- mandatory必须设置true,return callback才生效 -->
    <rabbit:template  id="rabbitTemplate" exchange="ztb.topic"
                     connection-factory="connectionFactory"
                     message-converter="jsonMessageConverter"
                     mandatory="true"
                     confirm-callback="confirmCallBackListener"
                     return-callback="returnCallBackListener"

    />

    <!-- 消费者部分 -->
    <!-- 自定义接口类 -->
    <!-- 消息确认机制:acknowledge  :manual收到确认-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="200"  >
        <rabbit:listener ref="topicListener" method="onMessage"
                         queues="ztb.topic.queue.1"  />
        <rabbit:listener ref="topicListener" method="onMessage"
                         queues="ztb.topic.queue.2"  />
        <rabbit:listener ref="topicListener" method="onMessage"
                         queues="ztb.topic.queue.3"  />
    </rabbit:listener-container>
    <!--收到确认-->
    <bean id="topicListener" class="com.ztb.pro.topic.ChannelAwareMessageListenerImp"/>


</beans>




3.建立类:

package com.ztb.pro.topic;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

/**
* Author by gjp, Date on 2019/8/26.
*/
public class ChannelAwareMessageListenerImp implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        try {
            String body = new String(message.getBody(), "UTF-8");
            System.out.println("consumer--:"+message.getMessageProperties()+": body:"+body);
            //MessageProperties [headers={spring_listener_return_correlation=bef4e64a-76fa-4d7a-a3ba-95d9c2367959, __TypeId__=com.ztb.pro.message.CommonMessage}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=ztb.fanout, receivedRoutingKey=ztb.faout.queue.11, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-ltHMYrVBPU4Ss89yNaUy2w, consumerQueue=ztb.faout.queue.1]: body:{"msg":"msg","source":"alert.queue.3","body":"body01566872705708"}

            try {
                //deliveryTag是消息传送的次数
                if (message.getMessageProperties().getDeliveryTag() >= 2) {

                }
            }catch (Exception ex){
                if (message.getMessageProperties().getRedelivered())
                {
                    System.out.println("消息已重复处理失败,拒绝再次接收...");
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
                }
                else
                {
                    System.out.println("消息即将再次返回队列处理...");
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue为是否重新回到队列
                }

            }
        }catch (Exception ex){
            ex.printStackTrace();
        }

       // Thread.sleep(1);

        System.out.println("已经确认信息");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }
}


package com.ztb.pro.topic;

import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;

/**
* Author by gjp, Date on 2019/8/26.
*发布信息确认
* 只确认生产者消息发送成功
*/
public class ConfirmCallBackListener implements ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
    }
}




package com.ztb.pro.topic;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
* Author by gjp, Date on 2019/8/26.
* 生产者
*/
public class ProducterMq {

    private RabbitTemplate rabbitTemplate;

    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }


    public void send(String exchange, String routingKey, Object message) {
       // rabbitTemplate.convertAndSend(exchange, routingKey, message);
        rabbitTemplate.convertAndSend(routingKey,message);
    }
}





package com.ztb.pro.topic;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
* Author by gjp, Date on 2019/8/26.
* 生产者消息发送失败返回监听器
*/
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
    }
}





package com.ztb.pro.topic;

import com.ztb.pro.message.CommonMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.Calendar;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Author by gjp, Date on 2019/8/23.
* 测试类
*/
public class TestMainTopic {

    public static void main(String[] args) {
        ApplicationContext app = new ClassPathXmlApplicationContext("classpath:conf/spring-rabbitmq-topic.xml");
       final ProducterMq producterMq = (ProducterMq) app.getBean("producterMq");

        int threadNum =1;
        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        for (int i = 0; i <threadNum ; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 1; i++) {
                        CommonMessage message = new CommonMessage();
                        message.setSource("topic.queue.3");
                        String sendMsg = "body" + i + Calendar.getInstance().getTimeInMillis();
                        System.out.println("发送信息::" + sendMsg);
                        message.setBody(sendMsg);
                        message.setMsg("msg");

                        producterMq.send("ztb.topic", "ztb.topic.test", message);

                    }

                }
            });

        }

        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }
}





结果输出:

发送信息::body01566892764139
consumer--:MessageProperties [headers={spring_listener_return_correlation=43347283-a2f8-4a46-90f4-072cb7500100, __TypeId__=com.ztb.pro.message.CommonMessage}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=ztb.topic, receivedRoutingKey=ztb.topic.test, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-zLSo4gdFpzrY-EhzGI5kmg, consumerQueue=ztb.topic.queue.1]: body:{"msg":"msg","source":"topic.queue.3","body":"body01566892764139"}
已经确认信息
confirm--:correlationData:null,ack:true,cause:null





分享到:
评论

相关推荐

    springboot2.1.4集成rabbitmq,三种模式:fanout direct topic的实现

    springboot集成rabbitmq,三种模式:fanout direct topic的实现,入门级别

    spring集成rabbitMq(基于direct、topic和fanout模式)

    spring集成rabbitMq(基于direct、topic和fanout模式),包括main方法,5种情景,一天总结运行

    基于spring集成rabbitMq(基于direct、topic和fanout模式)源码.rar

    基于spring集成rabbitMq(基于direct、topic和fanout模式)源码.rar

    rabbitmq常用三种模式的配置

    rabbitmq包含多种模式,主要是fanout,direct,topic,本代码主要针对这三种模式进行相关的配置,配置文件单独放置在不同的文件夹里,以便学习。

    springboot+RabbitMQ三种模式demo

    springboot+RabbitMQ的direct,topic,fanout三种模式demo

    rabbitmq三种exchange

    rabbitmq三种exchange方式:direct,fanout,topic发送和接收演示程序, 这是java版本,基于rabbitmq 3.1.0

    Go语言版本rabbitmq消息队列库:simple、worker、Fanout 模型、Direct 模型、Topic模型

    Go语言版本rabbitmq消息队列库:simple、worker、Fanout 模型、Direct 模型、Topic模型。 RabbitMQ 是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的...

    基于rabbitmq的topic 交换

    基于rabbitmq的topic 消息交换模式,弥补了direct exchange和fanout exchange的不足,增加了其灵活性。

    RabbitMQ工具类及测试类(完整版)

    RabbitMQClientUtil是MQ的测试工具类,他封装了fanout、direct、topic三种exchange模式,并包括发送数据和接收数据。 Test1、Test2是测试类 使用maven管理,在pom.xml文件中引入如下代码: &lt;!-- Rabbitmq工具包...

    RabbitMQ集成springboot

    系统集成六大模块,对于RabbitMQ有四种交换机,Direct,topic,headers,Fanout,分别对Direct/topic/Fanout三种交换机进行模拟操作,分别有sender和receiver模块

    java集成rabbitmq多种消费模式

    该项目包括两个子工程,生产者(rabbitmqProvider),消费者(rabbitmqConsumer),采用了direct,fanout,topic三种方式发布及消费。以及持久化分批消费等。工程中rabbitMQ.xml,rabbitMQ1.xml,rabbitMQChanel.xml...

    rabbitmq_demo1.zip

    RabbitMQ 是 Spring Cloud ...本代码简单示例了RabbitMQ 的5中工作模式中的几种 (1)simple 简单模式 (2)Work(竞争者模式) (3)fanout 模式(发布订阅模式) (4)direct(路由键模式) (5)topic(通配符模式)

    springboot-rabbitmq.zip

    RabbitMQ与Springboot整合并实践核心四种模式,包括work模式、fanout模式、direct模式、topic模式

    用Jmeter测试RabbitMQ

    Exchange Type:分别是direct(单播),fanout(广播),topic(组播),headers,每种类型路由的策略不同,公司用direct 和 fanout 这2种路由居多 direct:如果一个队列绑定到该交换机上,并且当前要求路由键为 X ,只有...

    SpringBoot整合RabbitMQ.zip

    ①direct、②fanout、③topic **2.然后讲消息的回调** 四种情况下,确认触发哪个回调函数: ①消息推送到server,但是在server里找不到交换机 ②消息推送到server,找到交换机了,但是没找到队列 ③消息推送到sever...

    RabbitMq+springboot

    demo中介绍了rabbitmq的三种模式,分别为Direct,topic,Fanout并且集成了消息确认机制,消息重发机制集以及集群, 需要用到的同学可以下载看看,少走弯路。

    RabbitMQ入门代码

    rabbitMQ的java入门代码 包括队列持久化、消息持久化、direct、fanout、topic等的基础测试代码

    消息队列RabbitMQ示例6种模式

    简单模式:生产者与消费者 Work模式:发布者与3个订阅者 Topic交换机:生产者与3个消费者 Fanout交换机:生产者与2个消费者 Direct交换机:产生者与两个消费者 RPC远程回调:客户端与服务端

    C# RabbitMQ 实例代码

    C# 编写的RabbitMQ操作实例 功能包括: 1、循环调度 2、消息确认 3、消息持久化 4、公平分发 ... topic (模式匹配的路由规则:支持通配符) fanout (消息广播,将消息分发到exchange上绑定的所有队列上)

    RabbitMQ 学习整理.pdf

    RabbitMQ 学习整理 包括RabbitMQ 配置详解,交换机有四种类型,分别为Direct,topic,headers,Fanout,消息确认机制,多线程处理消息,消费端的限流策略,回调等。都有代码,保证代码正确性都是自己的的从工程中直接...

Global site tag (gtag.js) - Google Analytics