您当前的位置:首页 > 新闻 > 科技

阿里P7架构师的RabbitMQ学习笔记告诉你什么叫做消息队列王者

时间:2020-04-03 17:50:33  来源:  作者:

 

阿里P7架构师的RabbitMQ学习笔记告诉你什么叫做消息队列王者

 

一、简单的发送与接收消息 HelloWorld

1. 发送消息

发送消息首先要获取与rabbitmq-server的连接,然后从渠道(chann)中指定的queue发送消息 , 不能定义两个queue名字相同,但属性不同

示例:

package com.zf.rabbitmq01;
import JAVA.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 发送消息
 * @author zhanghuan
 *
 */
public class Sender01 {
    public static void main(String[] args) throws IOException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        //RabbitMQ-Server安装在本机,所以直接用127.0.0.1
        connFac.setHost("127.0.0.1");
        //创建一个连接
        Connection conn = connFac.newConnection() ;
        //创建一个渠道
        Channel channel = conn.createChannel() ;
        //定义Queue名称
        String queueName = "queue01" ;
        //为channel定义queue的属性,queueName为Queue名称
        channel.queueDeclare( queueName , false, false, false, null) ;
        String msg = "Hello World!";
        //发送消息
        channel.basicPublish("", queueName , null , msg.getBytes());
        System.out.println("send message[" + msg + "] to "+ queueName +" success!");
        channel.close();
        conn.close();
    }
}

package com.zf.rabbitmq01;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
 * 接收消息
 * @author zhanghuan
 *
 */
public class Recv01 {
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("127.0.0.1");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        String queueName = "queue01";
        channel.queueDeclare(queueName, false, false, false, null) ;
        //上面的部分,与Sender01是一样的
        //配置好获取消息的方式
        QueueingConsumer consumer = new QueueingConsumer(channel) ;
        channel.basicConsume(queueName, true, consumer) ;
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            Delivery delivery = consumer.nextDelivery() ;
            String msg = new String(delivery.getBody()) ;
            System.out.println("received message[" + msg + "] from " + queueName);
        }
    }
}

 

阿里P7架构师的RabbitMQ学习笔记告诉你什么叫做消息队列王者

 

二、消息确认与公平调度消费者

从本节开始称Sender为生产者 , Recv为消费者

1. 消息确认

为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者默认是开启的,在消费者端通过下面的方式开启消息确认, 首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认,类似JDBC的autocommit一样

QueueingConsumer consumer = new QueueingConsumer(channel);
Boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);

在前面的例子中使用的是channel.basicConsume(channelName, true, consumer) ; 在接收到消息后,就会自动反馈一个消息给服务器。

下面这个例子来测试消息确认的功能。

package com.zf.rabbitmq03;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 发送消息
 * @author zhanghuan
 *
 */
public class Sender03 {
    public static void main(String[] args) throws IOException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        //RabbitMQ-Server安装在本机,所以直接用127.0.0.1
        connFac.setHost("127.0.0.1");
        //创建一个连接
        Connection conn = connFac.newConnection() ;
        //创建一个渠道
        Channel channel = conn.createChannel() ;
        //定义Queue名称
        String queueName = "queue01" ;
        //为channel定义queue的属性,queueName为Queue名称
        channel.queueDeclare( queueName , false, false, false, null) ;
        String msg = "Hello World!";
        //发送消息
        channel.basicPublish("", queueName , null , msg.getBytes());
        System.out.println("send message[" + msg + "] to "+ queueName +" success!");
        channel.close();
        conn.close();
    }
}

与Sender01.java一样,没有什么区别。

package com.zf.rabbitmq03;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
 * 接收消息
 * @author zhanghuan
 *
 */
public class Recv03 {
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("127.0.0.1");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        String channelName = "channel01";
        channel.queueDeclare(channelName, false, false, false, null) ;
        //配置好获取消息的方式
        QueueingConsumer consumer = new QueueingConsumer(channel) ;
        //取消 autoAck
        Boolean autoAck = false ;
        channel.basicConsume(channelName, autoAck, consumer) ;
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            Delivery delivery = consumer.nextDelivery() ;
            String msg = new String(delivery.getBody()) ;
            //确认消息,已经收到
            channel.basicAck(delivery.getEnvelope().getDeliveryTag()
                                , false);
            System.out.println("received message[" + msg + "] from " + channelName);
        }
    }
}

注意:一旦将autoAck关闭之后,一定要记得处理完消息之后,向服务器确认消息。否则服务器将会一直转发该消息如果将上面的channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);注释掉, Sender03.java只需要运行一次 , Recv03.java每次运行将都会收到HelloWorld消息

注意:但是这样还是不够的,如果rabbitMQ-Server突然挂掉了,那么还没有被读取的消息还是会丢失 ,所以我们可以让消息持久化。 只需要在定义Queue时,设置持久化消息就可以了,方法如下:

boolean durable = true;channel.queueDeclare(channelName, durable, false, false, null);

这样设置之后,服务器收到消息后就会立刻将消息写入到硬盘,就可以防止突然服务器挂掉,而引起的数据丢失了。 但是服务器如果刚收到消息,还没来得及写入到硬盘,就挂掉了,这样还是无法避免消息的丢失。

阿里P7架构师的RabbitMQ学习笔记告诉你什么叫做消息队列王者

 

2. 公平调度

上一个例子能够实现发送一个Message与接收一个Message

从上一个Recv01中可以看出,必须处理完一个消息,才会去接收下一个消息。如果生产者众多,那么一个消费者肯定是忙不过来的。此时就可以用多个消费者来对同一个Channel的消息进行处理,并且要公平的分配任务给多个消费者。不能部分很忙,部分总是空闲

实现公平调度的方式就是让每个消费者在同一时刻会分配一个任务。 通过channel.basicQos(1);可以设置

列如:

当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要做其它的事情,且会消耗很长的时间,在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者宕掉了,这时候就要使用消息接收确认机制,可以让其它的消费者再次执行刚才宕掉的消费者没有完成的事情。另外,在默认情况下,我们创建的消息队列以及存放在队列里面的消息,都是非持久化的,也就是说当RabbitMQ宕掉了或者是重启了,创建的消息队列以及消息都不会保存,为了解决这种情况,保证消息传输的可靠性,我们可以使用RabbitMQ提供的消息队列的持久化机制。

阿里P7架构师的RabbitMQ学习笔记告诉你什么叫做消息队列王者

 

生产者:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class ClientSend1 {
   public static final String queue_name="my_queue";
   public static final Boolean durable=true;
   //消息队列持久化 
   public static void main(String[] args) 
   throws java.io.IOException{
       ConnectionFactory factory=new ConnectionFactory();
       //创建连接工厂
       factory.setHost("localhost");
       factory.setVirtualHost("my_mq");
       factory.setUsername("zhxia");
       factory.setPassword("123456");
       Connection connection=factory.newConnection();
       //创建连接
       Channel channel=connection.createChannel();
       //创建信道
       channel.queueDeclare(queue_name, durable, false, false, null);
       //声明消息队列,且为可持久化的
       String message="Hello world"+Math.random();
       //将队列设置为持久化之后,还需要将消息也设为可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
       channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
       System.out.println("Send message:"+message);
       channel.close();
       connection.close();
   }
}

说明:行17 和行20 需要同时设置,也就是将队列设置为持久化之后,还需要将发送的消息也要设置为持久化才能保证队列和消息一直存在

消费者:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class ClientReceive1 {
    public static final String queue_name="my_queue";
    public static final Boolean autoAck=false;
    public static final Boolean durable=true;
    public static void main(String[] args)
    throws java.io.IOException,java.lang.InterruptedException{
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("my_mq");
        factory.setUsername("zhxia");
        factory.setPassword("123456");
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
        channel.queueDeclare(queue_name, durable, false, false, null);
        System.out.println("Wait for message");
        channel.basicQos(1);
        //消息分发处理
        QueueingConsumer consumer=new QueueingConsumer(channel);
        channel.basicConsume(queue_name, autoAck, consumer);
        while(true){
            Thread.sleep(500);
            QueueingConsumer.Delivery deliver=consumer.nextDelivery();
            String message=new String(deliver.getBody());
            System.out.println("Message received:"+message);
            channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
        }
    }
}

说明:行22: 设置RabbitMQ调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完并且已经对刚才处理的消息进行确认之后, 才发送下一条消息,防止消费者太过于忙碌。如下图所示:

阿里P7架构师的RabbitMQ学习笔记告诉你什么叫做消息队列王者

 

三、发布/订阅消息

前面都是一条消息只会被一个消费者处理。

如果要每个消费者都处理同一个消息,rabbitMq也提供了相应的方法。

在以前的程序中,不管是生产者端还是消费者端都必须知道一个指定的QueueName才能发送、获取消息。 而rabbitMQ消息模型的核心思想是生产者不会将消息直接发送给队列。

因为,生产者通常不会知道消息将会被哪些消费者接收。

生产者的消息虽然不是直接发送给Queue,但是消息会交给Exchange,所以需要定义Exchange的消息分发模式 ,之前的程序中,有如下一行代码:

channel.basicPublish("", queueName , null , msg.getBytes());

第一个参数为空字符串,其实第一个参数就是ExchangeName,这里用空字符串,就表示消息会交给默认的Exchange。

下面我们将自己定义Exchange的属性。

package com.zf.rabbitmq04;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 发送消息
 * @author zhanghuan
 *
 */
public class Sender04 {
    public static void main(String[] args) throws IOException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        //RabbitMQ-Server安装在本机,所以直接用127.0.0.1
        connFac.setHost("127.0.0.1");
        //创建一个连接
        Connection conn = connFac.newConnection() ;
        //创建一个渠道
        Channel channel = conn.createChannel() ;
        //定义ExchangeName,第二个参数是Exchange的类型,fanout表示消息将会分列发送给多账户
        String exchangeName = "news" ;
        channel.exchangeDeclare(exchangeName, "fanout") ;
        String msg = "Hello World!";
        //发送消息,这里与前面的不同,这里第一个参数不再是字符串,而是ExchangeName ,第二个参数也不再是queueName,而是空字符串
        channel.basicPublish( exchangeName , "" , null , msg.getBytes());
        System.out.println("send message[" + msg + "] to exchange "+ exchangeName +" success!");
        channel.close();
        conn.close();
    }
}

Send04.java 发送消息时没有指定的queueName 用的空字符串代替的。 Exchange的类型有direct, topic, headers 、 fanout四种,上面用的是fanout类型

package com.zf.rabbitmq04;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
 * 接收消息
 * @author zhanghuan
 *
 */
public class Recv04_01 {
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("127.0.0.1");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        String exchangeName = "news" ;
        channel.exchangeDeclare(exchangeName, "fanout") ;
        //这里使用没有参数的queueDeclare方法创建Queue并获取QueueName
        String queueName = channel.queueDeclare().getQueue() ;
        //将queue绑定到Exchange中
        channel.queueBind( queueName, exchangeName, "") ;
        //配置好获取消息的方式
        QueueingConsumer consumer = new QueueingConsumer(channel) ;
        channel.basicConsume(queueName, true, consumer) ;
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            Delivery delivery = consumer.nextDelivery() ;
            String msg = new String(delivery.getBody()) ;
            System.out.println("received message[" + msg + "] from " + queueName);
        }
    }
}

Recv04_01.java 使用channel.queueDeclare()方法创建了一个Queue,该Queue有系统创建,并分配了一个随机的名称。 然后将该Queue与与Exchange绑定在一起。 该Queue就能从Exchange中后去消息了。

测试

将Recv04_01.java 文件复制几份 Recv04_02.java Recv04_03.java,然后执行Recv04_01 与 Recv04_02,接下来执行Sender04发送消息,可以看到Recv04_01 与Recv04_02都接收到了消息。然后执行Recv04_03,没有获取到任何消息。接下来再执行Sender04发送消息,可以看到Recv04_01 、Recv04_02与Recv04_03都接收到了消息。

说明Exchange在收到生产者的消息后,会将消息发送给当前已经与它绑定了的所有Queue 。 然后被移除。

阿里P7架构师的RabbitMQ学习笔记告诉你什么叫做消息队列王者

 

四、消息路由

生产者会生产出很多消息 , 但是不同的消费者可能会有不同的需求,只需要接收指定的消息,其他的消息需要被过滤掉。 这时候就可以对消息进行过滤了。 在消费者端设置好需要接收的消息类型。

如果不使用默认的Exchange发送消息,而是使用我们自定定义的Exchange发送消息,那么下面这个方法的第二个参数就不是QueueName了,而是消息的类型。

channel.basicPublish( exchangeName , messageType , null , msg.getBytes());

示例:

package com.zf.rabbitmq05;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 发送消息
 * @author zhanghuan
 *
 */
public class Sender05 {
    public static void main(String[] args) throws IOException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        //RabbitMQ-Server安装在本机,所以直接用127.0.0.1
        connFac.setHost("127.0.0.1");
        //创建一个连接
        Connection conn = connFac.newConnection() ;
        //创建一个渠道
        Channel channel = conn.createChannel() ;
        String exchangeName = "exchange02";
        String messageType = "type01";
        channel.exchangeDeclare(exchangeName, "direct") ;
        //定义Queue名
        String msg = "Hello World!";
        //发送消息
        channel.basicPublish( exchangeName , messageType , null , msg.getBytes());
        System.out.println("send message[" + msg + "] to "+ exchangeName +" success!");
        channel.close();
        conn.close();
    }
}
package com.zf.rabbitmq05;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
 * 接收消息
 * @author zhanghuan
 *
 */
public class Recv05_01 {
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("127.0.0.1");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        String exchangeName = "exchange02";
        channel.exchangeDeclare(exchangeName, "direct") ;
        String queueName = channel.queueDeclare().getQueue() ;
        //第三个参数就是type,这里表示只接收type01类型的消息。
        channel.queueBind(queueName, exchangeName, "type01") ;
        //也可以选择接收多种类型的消息。只需要再下面再绑定一次就可以了
        channel.queueBind(queueName, exchangeName, "type02") ;
        //配置好获取消息的方式
        QueueingConsumer consumer = new QueueingConsumer(channel) ;
        channel.basicConsume(queueName, true, consumer) ;
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            Delivery delivery = consumer.nextDelivery() ;
            String msg = new String(delivery.getBody()) ;
            System.out.println("received message[" + msg + "] from " + exchangeName);
        }
    }
}

这时,启动Recv05_01.java 然后启动Sender05.java ,消费者端就会收到消息。然后将Sender05.java 中的messageType分别改为type02 type03 然后发送消息 , 可以看到消费者端能接收到type02的消息,但是不能接收到type03的消息。

阿里P7架构师的RabbitMQ学习笔记告诉你什么叫做消息队列王者

 

五、Topic类型消息

上一节中使用了消息路由,消费者可以选择性的接收消息。 但是这样还是不够灵活。

比如某个消费者要订阅娱乐新闻消息 。 包括新浪、网易、腾讯的娱乐新闻。那么消费者就需要绑定三次,分别绑定这三个网站的消息类型。 如果新闻门户更多了,那么消费者将要绑定个更多的消息类型, 其实消费者只是需要订阅娱乐新闻,不管是哪个网站的新闻,都需要。 那么在rabbitMQ中可以使用topic类型。 模糊匹配消息类型。

模糊匹配中的 *代表一个 #代表零个或1个

示例:

package com.zf.rabbitmq06;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
 * 接收消息
 * @author zhanghuan
 *
 */
public class Recv06_01 {
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("127.0.0.1");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        String exchangeName = "exchange03";
        channel.exchangeDeclare(exchangeName, "topic") ;
        String queueName = channel.queueDeclare().getQueue() ;
        //第三个参数就是type,这里表示只接收type01类型的消息。
        channel.queueBind(queueName, exchangeName, "#.type01") ;
        //配置好获取消息的方式
        QueueingConsumer consumer = new QueueingConsumer(channel) ;
        channel.basicConsume(queueName, true, consumer) ;
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            Delivery delivery = consumer.nextDelivery() ;
            String msg = new String(delivery.getBody()) ;
            System.out.println("received message[" + msg + "] from " + exchangeName);
        }
    }
}
package com.zf.rabbitmq06;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 发送消息
 * @author zhanghuan *
 */
public class Sender06 {
    public static void main(String[] args) throws IOException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        //RabbitMQ-Server安装在本机,所以直接用127.0.0.1
        connFac.setHost("127.0.0.1");
        //创建一个连接
        Connection conn = connFac.newConnection() ;
        //创建一个渠道
        Channel channel = conn.createChannel() ;
        String exchangeName = "exchange03";
        String messageType = "fs.type01";
        channel.exchangeDeclare(exchangeName, "topic") ;
        //定义Queue名
        String msg = "Hello World!";
        //发送消息
        channel.basicPublish( exchangeName , messageType , null , msg.getBytes());
        System.out.println("send message[" + msg + "] to "+ exchangeName +" success!");
        channel.close();
        conn.close();
    }
}

使用topic之后 。不管Sender端发送的消息类型是fs.type01 还是 xx.type01 还是 type01 ,消费者都会收到消息

六、RPC 远程过程调用

当客户端想要调用服务器的某个方法来完成某项功能时,就可以使用rabbitMQ支持的PRC服务。

其实RPC服务与普通的收发消息的区别不大, RPC的过程其实就是客户端向服务端定义好的Queue发送消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue。

示例:

package com.zf.rabbitmq07;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class RPCServer {
    public static final String RPC_QUEUE_NAME = "rpc_queue";
    public static String sayHello(String name){
        return "hello " + name ;
    }
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("localhost");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ;
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false , consumer) ;
        while(true){
            System.out.println("服务端等待接收消息..");
            Delivery deliver = consumer.nextDelivery() ;
            System.out.println("服务端成功收到消息..");
            BasicProperties props =  deliver.getProperties() ;
            String message = new String(deliver.getBody() , "UTF-8") ;
            String responseMessage = sayHello(message) ;
            BasicProperties responseProps = new BasicProperties.Builder()
                        .correlationId(props.getCorrelationId())  
                        .build() ;
            //将结果返回到客户端Queue
            channel.basicPublish("", props.getReplyTo() , responseProps , responseMessage.getBytes("UTF-8") ) ;
            //向客户端确认消息
            channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
            System.out.println("服务端返回消息完成..");
        }
    }
}
package com.zf.rabbitmq07;
import java.io.IOException;
import java.util.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class RPCClient {
    public static final String RPC_QUEUE_NAME = "rpc_queue";
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("localhost");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        //响应QueueName ,服务端将会把要返回的信息发送到该Queue
        String responseQueue = channel.queueDeclare().getQueue() ;
        String correlationId = UUID.randomUUID().toString() ;
        BasicProperties props = new BasicProperties.Builder()
                .replyTo(responseQueue)
                .correlationId(correlationId)
                .build();
        String message = "is_zhoufeng";
        channel.basicPublish( "" , RPC_QUEUE_NAME , props ,  message.getBytes("UTF-8"));
        QueueingConsumer consumer = new QueueingConsumer(channel)   ;
        channel.basicConsume( responseQueue , consumer) ;
        while(true){
            Delivery delivery = consumer.nextDelivery() ;
            if(delivery.getProperties().getCorrelationId().equals(correlationId)){
                String result = new String(delivery.getBody()) ;
                System.out.println(result);
            }
        }
    }
}

写在最后:

  • 针对于Java程序员,笔者最近整理了一些面试真题,思维导图,程序人生等PDF学习资料;
  • 关注私信我"86",即可获取!
  • 希望读到这的您能点个小赞和关注下我,以后还会更新技术干货,谢谢您的支持!
阿里P7架构师的RabbitMQ学习笔记告诉你什么叫做消息队列王者


Tags:阿里   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除。
▌相关推荐
随着数据湖的发展和日渐增长的需求,对数据湖进行统一元数据和存储管理也显得日趋重要。本文将分享阿里云在数据湖统一元数据与存储管理方面的实践。一、云上数据湖架构首先介...【详细内容】
2023-05-26  Tags: 阿里  点击:(3)  评论:(0)  加入收藏
阿里巴巴是中国最著名的互联网企业之一,其成功的背后离不开高质量的代码。然而,随着近年来大量公司源代码泄露事件的发生,安全问题也逐渐成为了程序员们关注的焦点。本文将从阿...【详细内容】
2023-05-19  Tags: 阿里  点击:(18)  评论:(0)  加入收藏
云服务市场迎来新的变局。5月16日,腾讯云宣布对多款核心云产品降价,部分产品线最高降幅达40%,降价政策将在6月1日正式生效。腾讯集团副总裁、云与智慧产业事业群COO、腾讯云总...【详细内容】
2023-05-17  Tags: 阿里  点击:(25)  评论:(0)  加入收藏
“随着去年ChatGPT的推出,生成人工智能将成为改变科技行业的重要因素。”蔡崇信表示,“尤其是在我所从事的在线零售、电子商务领域,生成式AI将会变得很重要。”5月10日,在第三届...【详细内容】
2023-05-11  Tags: 阿里  点击:(28)  评论:(0)  加入收藏
头部AI大公司均在尝试以大模型为机器人注入灵魂。微软团队正探索如何将 OpenAI研发的ChatGPT扩展到机器人领域,旨在让人类用自然语言控制如机械臂、无人机、家庭辅助机器人等...【详细内容】
2023-05-06  Tags: 阿里  点击:(15)  评论:(0)  加入收藏
4月27日,在第六届数字中国建设峰会上,阿里巴巴董事会主席兼CEO、阿里云智能集团CEO张勇透露:阿里云工程师正在实验将千问大模型接入工业机器人,在钉钉对话框输入一句人类语言,即...【详细内容】
2023-04-28  Tags: 阿里  点击:(19)  评论:(0)  加入收藏
4月26日,今天上午,阿里云宣布史上最大规模降价,核心产品价格全线下调15%至50%,存储产品最高降幅达50%。阿里云表示,作为中国第一、全球前三的云计算厂商,此次降价,将进一步扩大公共...【详细内容】
2023-04-26  Tags: 阿里  点击:(53)  评论:(0)  加入收藏
Hello,大家好,我是人月聊IT。前不久阿里的逍遥子在内部的一次会议上面,提出了阿里最新的”1+6+N“的组织机构改革,具体新构建哪些组织机构不是重点,但是里面提到了一个关键点,就是...【详细内容】
2023-04-21  Tags: 阿里  点击:(35)  评论:(0)  加入收藏
出品|虎嗅科技组作者|齐健编辑|陈伊凡头图|阿里云AI大转型的浪潮,似乎正将所有互联网厂商拉到同一起跑线上。“面对AI时代,所有产品都值得用大模型重做一次。”在4月11日的2023阿...【详细内容】
2023-04-12  Tags: 阿里  点击:(32)  评论:(0)  加入收藏
是大厂的游戏,但不能只是大厂的游戏文|《中国企业家》记者 赵东山 邓双琳编辑|李薇头图摄影|邓攀一场AI大模型追逐赛,激战正酣。4月11日,阿里云旗下大模型产品通义千问面世。阿...【详细内容】
2023-04-12  Tags: 阿里  点击:(26)  评论:(0)  加入收藏
▌简易百科推荐
编译丨千山作为机圈巨头,苹果的一举一动都备受瞩目。而6月,则是苹果的主场!今年以来,在ChatGPT引爆的AI大战中,相较微软、谷歌、亚马逊等大厂纷纷上阵布局、唯恐落于人后的姿态,苹...【详细内容】
2023-05-26    51CTO  Tags:苹果   点击:(3)  评论:(0)  加入收藏
站长之家(ChinaZ.com) 5月26日消息:在 Google I/O 之后注册 Search Labs 的美国 Google 用户现在可以开始使用一些早期实验,包括 SGE(搜索生成体验)、代码提示和添加到表格。如...【详细内容】
2023-05-26    站长之家  Tags:谷歌   点击:(6)  评论:(0)  加入收藏
路透社5月25日消息,以色列人工智能公司Watchful Technologies称,TikTok正测试一款AI聊天机器人,该工具可与用户交流短视频相关问题,并帮助他们发现内容。Watchful Technologies...【详细内容】
2023-05-26    界面新闻  Tags:TikTok   点击:(4)  评论:(0)  加入收藏
2023年5月21日,北京地铁大兴机场线刷掌乘车发布会举行,标志着北京轨道交通开启“刷掌”乘车时代。北京市交通委在接受媒体采访时称,此次刷掌乘车服务在大兴机场线的试点是掌纹...【详细内容】
2023-05-26  界面新闻    Tags:刷掌   点击:(5)  评论:(0)  加入收藏
新浪科技讯 北京时间5月25日早间消息,亚马逊(116.75, 1.76, 1.53%)AWS云计算业务的客户正在关注该公司6周前推出的类似ChatGPT的生成式人工智能技术。然而到目前为止,许多客户...【详细内容】
2023-05-25    新浪科技  Tags:AI工具   点击:(5)  评论:(0)  加入收藏
新智元报道作者:王嘉宁编辑:LRS【新智元导读】一站式NLP工具箱,你想要的全都有!近日,华师大HugAILab团队研发了HugNLP框架,这是一个面向研究者和开发者的全面统一的NLP训练框架,可...【详细内容】
2023-05-24    新智元  Tags:ChatGPT   点击:(10)  评论:(0)  加入收藏
新浪科技讯 北京时间5月24日早间消息,据报道,微软(315.26, -5.92, -1.84%)周二开始向用户提供大批人工智能升级,包括对ChatGPT、搜索引擎必应以及云计算服务的升级,希望借此缩小...【详细内容】
2023-05-24    新浪科技  Tags:ChatGPT   点击:(8)  评论:(0)  加入收藏
智东西作者 | ZeR0编辑 | 漠影智东西5月24日报道,作为加速计算的“守门员”,自ChatGPT引爆生成式AI热潮后,NVIDIA(英伟达)凭借其旗舰计算芯片A100和H100 GPU在AI训练领域无出其右...【详细内容】
2023-05-24    智东西  Tags:英伟达   点击:(4)  评论:(0)  加入收藏
(与OpenAI全面合作是今年Build大会的主题)  新浪科技 郑峻发自美国硅谷  回来了,都回来了。疫情一页已经翻过,科技大厂的年度大会今年全面回归线下。继两周之前谷歌总部举...【详细内容】
2023-05-24    新浪科技  Tags:微软   点击:(7)  评论:(0)  加入收藏
  机器之心报道  机器之心编辑部今年的微软 Build 大会,高度聚焦生成式 AI,联手OpenAI打造一个大宇宙。  最近几个月,微软一直忙于在自身的许多产品和服务中构建生成式 A...【详细内容】
2023-05-24    机器之心  Tags:Windows Copilot   点击:(6)  评论:(0)  加入收藏
站内最新
栏目相关
  • · 多款绝杀!苹果年度大招来袭!
  • · 谷歌宣布对更多用户开放搜索中对新的生成式 AI 功能的访问
  • · TikTok据悉正测试AI聊天机器人“Tako”,可帮助用户发现短视频内容
  • · “刷掌”支付上线,安全吗?
  • · 亚马逊AWS匆忙推生成式AI工具 被指尚不成熟
  • · 可直训ChatGPT类模型!华师大、NUS开源HugNLP框架:一键刷榜,全面统一NLP训练
  • · 微软全面更新AI产品线:ChatGPT可抓取必应搜索结果
  • · 让企业更易上手生成式AI!英伟达连宣多项重磅合作,从Azure云到本地化部署
  • · 微软吹响AI集结号:全面打通ChatGPT携手冲击谷歌
  • · Windows Copilot登场,ChatGPT默认用必应搜索,微软联手OpenAI的大宇宙来
  • · ChatGPT重塑Windows!微软王炸更新:操作系统全面接入,Bing也能用插件了
  • · AIGC 时代:“技术焦虑”无用,产品升级才是王道
  • · 7 月 17 日起,亚马逊中国将不再提供应用商店服务
  • · 美国《大西洋月刊》:ChatGPT才6个月大,就已经变得有些过时了
  • · TikTok正式起诉美国蒙大拿州:封杀令违宪且毫无根据
  • · ChatGPT 4通过注册会计师和审计师考试
  • · 北京移动宣布推出2000M宽带 可8秒下载1部2GB电影
  • · OpenAI正探索AI集体决策,提出类似维基百科条目模式
  • · 比尔·盖茨:AI助理将带来颠覆性变化 谷歌、亚马逊都会玩完儿
  • · Meta收到创纪录“天价”罚单 美欧数据传输博弈战再升级
  • 站内热门
    相关头条
  • · TikTok据悉正测试AI聊天机器人“Tako”,可帮助用户发现短视频内容
  • · 可直训ChatGPT类模型!华师大、NUS开源HugNLP框架:一键刷榜,全面统一NLP训练
  • · 微软全面更新AI产品线:ChatGPT可抓取必应搜索结果
  • · 让企业更易上手生成式AI!英伟达连宣多项重磅合作,从Azure云到本地化部署
  • · 微软吹响AI集结号:全面打通ChatGPT携手冲击谷歌
  • · ChatGPT重塑Windows!微软王炸更新:操作系统全面接入,Bing也能用插件了
  • · OpenAI正探索AI集体决策,提出类似维基百科条目模式
  • · 比尔·盖茨:AI助理将带来颠覆性变化 谷歌、亚马逊都会玩完儿
  • · 学流氓软件?Windows 10将强制升级最新版
  • · 苹果发布 tvOS 16.5 更新,引入“多视图体育”功能:同时播放 4 个直播源
  • · 微软称GPT-4或具备人类逻辑,人工智能终将成“人类智能”?
  • · 投资者看好AI搜索 谷歌两位创始人财富猛增183亿美元
  • · 谷歌员工评论I/O大会:赞美工程师但嘲讽公司高管
  • · 谷歌更新AI聊天机器人Bard,取消等待名单
  • · 谷歌推出 AI 视频对口型技术:根据翻译语言改变人物说话方式,但不开放使用
  • · 量子处理器上首次造出任意子 有望促进容错量子计算机开发
  • · 谷歌AI大动作来了!最新大语言模型、升级版Bard,还有……
  • · 谷歌开发者大会将发布AI新语言模型:编程数学题做作文全拿下 支持100多种语言
  • · 苹果iPad Pro 2024款屏幕升级:首次使用OLED面板
  • · 助力中小企业基础网络升级 CNNIC再降IPv6资费
  • 站内头条