使用java操作RabbitMQ-定义消息消费者
1、消费者的处理流程与生产者的形式类似,因为消费者一定也要通过特定的队列进行处理操作。
2、定义消息消费者Recv。
package org.myrabbitmq.base.consumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv {
private final static String QUEUE_NAME = "myrabbit";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.102");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

3、执行消费者程序查看消费结果。

4、查看RabbitMQ服务控制台。

5、查看RabbitMQ连接。消息已经正确

6、RabbitMQ工作队列。
现在的程序只定义一个生产者和一个消费者,如果说一个生产者对应有多个消费者?那么就表示工作队列,工作队列的最大特征在于:若干个消费者一起完成工作。
启动三个消费者程序,也就是说执行三次main方法。



7、修改生产者程序:发送10条消息给消费者:
package org.myrabbitmq.base.provider;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "myrabbit";
public static void main(String[] args) throws Exception {
//连接到RabbitMQ服务器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.102");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for(int x = 0; x<10;x++) {
String message = "我是帅哥:" + x;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
//System.out.println(" [发送完毕] Sent '" + message + "'");
channel.close();
connection.close();
}
}

8、查看各个消费者的消费情况。



9、当启动了多个消费者之后,这些消费者会在一起共同完成所发出的消费处理,那么这样的处理可以保证消息处理的速度。但是千万要记住,此时使用的是普通队列消息。