使用java操作RabbitMQ-定义消息消费者

2025-10-28 05:45:08

1、消费者的处理流程与生产者的形式类似,因为消费者一定也要通过特定的队列进行处理操作。

2、定义消息消费者Recv。

package org.myrabbitmq.base.consumer;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Consumer;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

public class Recv {

private final static String QUEUE_NAME = "myrabbit";

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

   factory.setHost("192.168.1.102");

   Connection connection = factory.newConnection();

   Channel channel = connection.createChannel();

   channel.queueDeclare(QUEUE_NAME, false, false, false, null);

   System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

   

   Consumer consumer = new DefaultConsumer(channel) {

     public void handleDelivery(String consumerTag, Envelope envelope,

                                AMQP.BasicProperties properties, byte[] body)

         throws IOException {

       String message = new String(body, "UTF-8");

       System.out.println(" [x] Received '" + message + "'");

     }

    };

    channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

使用java操作RabbitMQ-定义消息消费者

3、执行消费者程序查看消费结果。

使用java操作RabbitMQ-定义消息消费者

4、查看RabbitMQ服务控制台。

使用java操作RabbitMQ-定义消息消费者

5、查看RabbitMQ连接。消息已经正确

使用java操作RabbitMQ-定义消息消费者

6、RabbitMQ工作队列。

现在的程序只定义一个生产者和一个消费者,如果说一个生产者对应有多个消费者?那么就表示工作队列,工作队列的最大特征在于:若干个消费者一起完成工作。

启动三个消费者程序,也就是说执行三次main方法。

使用java操作RabbitMQ-定义消息消费者

使用java操作RabbitMQ-定义消息消费者

使用java操作RabbitMQ-定义消息消费者

7、修改生产者程序:发送10条消息给消费者:

package org.myrabbitmq.base.provider;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class Send {

private final static String QUEUE_NAME = "myrabbit";

public static void main(String[] args) throws Exception {

//连接到RabbitMQ服务器

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("192.168.1.102");

factory.setPort(5672);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

for(int x = 0; x<10;x++) {

String message = "我是帅哥:" + x;

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

}

//System.out.println(" [发送完毕] Sent '" + message + "'");

channel.close();

connection.close();

}

}

使用java操作RabbitMQ-定义消息消费者

8、查看各个消费者的消费情况。

使用java操作RabbitMQ-定义消息消费者

使用java操作RabbitMQ-定义消息消费者

使用java操作RabbitMQ-定义消息消费者

9、当启动了多个消费者之后,这些消费者会在一起共同完成所发出的消费处理,那么这样的处理可以保证消息处理的速度。但是千万要记住,此时使用的是普通队列消息。

声明:本网站引用、摘录或转载内容仅供网站访问者交流或参考,不代表本站立场,如存在版权或非法内容,请联系站长删除,联系邮箱:site.kefu@qq.com。
猜你喜欢