RabbitMQ安装及简单操作

/ Java / 0 条评论 / 490 浏览

安装及基础配置

下载安装包

wget http://file.lianglianglee.com/rabbitmq-server-3.6.13-1.el7.noarch.rpm
wget http://file.lianglianglee.com/erlang-19.0.4-1.el7.centos.x86_64.rpm

下载地址为个人搭建的静态文件服务,原地址如下:

http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

https://dl.bintray.com/rabbitmq/rabbitmq-server-rpm/rabbitmq-server-3.6.13-1.el7.noarch.rpm

安装

rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm
yum install rabbitmq-server-3.6.13-1.el7.noarch.rpm

启动并验证启动情况

rabbitmq-server --detached & ps aux | grep rabbitmq
# 启用维护插件
rabbitmq-plugins enable rabbitmq_management

查看端口是否打开

netstat -tunlp

开放端口:5672,25692,15672

设置一个可登陆账户

# 新增 
rabbitmqctl add_user admin admin
# 设置角色 
rabbitmqctl set_user_tags admin administrator

自带的访客账户只能使用localhost登陆

其他命令

# 修改 
rabbitmqctl change_password admin admin123
# 用户列表 
rabbitmqctl  list_users
# 设置角色 
rabbitmqctl set_user_tags admin administrator monitoring policymaker management
# 设置用户权限 
rabbitmqctl  set_permissions  -p  VHostPath  admin  ConfP  WriteP  ReadP
# 查询所有权限 
rabbitmqctl  list_permissions  [-p  VHostPath]
# 指定用户权限 
rabbitmqctl  list_user_permissions  admin
# 清除用户权限 
rabbitmqctl  clear_permissions  [-p VHostPath]  admin

权限可以通过界面更改 http://ip:15672

JAVA 客户端

引入JAR

    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.6.5</version>
    </dependency>

编写生产者

package com.lianglianglee.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
  public final static String QUEUE_NAME = "rabbitMQ.test";

  public static void main(String[] args) throws InterruptedException, IOException, TimeoutException {
    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //设置RabbitMQ相关信息
    factory.setHost("172.17.99.227");
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setPort(5672);
    //创建一个新的连接
    Connection connection = factory.newConnection();
    //创建一个通道
    Channel channel = connection.createChannel();
    for (int i = 0; i < 1000000; i++) {
      //  声明一个队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      String message = "Hello RabbitMQ " + i;
      //发送消息到队列中
      channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
      System.out.println("Producer Send +'" + message + "'");
      //Thread.sleep(100);
    }

    //关闭通道和连接
    channel.close();
    connection.close();
  }
}

编写消费者

package com.lianglianglee.rabbit;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Customer {
  private final static String QUEUE_NAME = "rabbitMQ.test";

  public static void main(String[] args) throws IOException, TimeoutException {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //设置RabbitMQ地址
    //设置RabbitMQ相关信息
    factory.setHost("172.17.99.227");
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setPort(5672);
    //创建一个新的连接
    Connection connection = factory.newConnection();
    //创建一个通道
    Channel channel = connection.createChannel();
    //声明要关注的队列
    channel.queueDeclare(QUEUE_NAME, false, false, true, null);
    System.out.println("Customer Waiting Received messages");
    //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
    // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body)
        throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Customer Received '" + message + "'");
      }
    };
    //自动回复队列应答 -- RabbitMQ中的消息确认机制
    channel.basicConsume(QUEUE_NAME, true, consumer);
  }
}

踩坑

客户端如出现以下异常:

ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - access to vhost '/' refused for user 'admin', class-id=10, method-id=40)

虚拟主机设置不正确,登陆界面->admin->Virtual Hosts->name->Set permission

Virtual Hosts 在界面右侧,name为表格中的一个单元格

搭建集群

修改host

需要先关闭服务

hostname rabbit01
vim /etc/hostname
#修改为  rabbit01
vim /etc/sysconfig/network
# 修改为 HOSTNAME=rabbit01
vim /etc/hosts
# 把所有节点的IP加入,如下,不需要#号
# 172.17.99.227 rabbit01
# 172.17.99.231 rabbit02

重启操作系统

重启后在所有节点部署RabbitMQ

准备

选出一个主节点,以rabbit01为例

推送rabbit01的erlangcookie到其他节点 ,文件在:/var/lib/rabbitmq/.erlang.cookie

scp /var/lib/rabbitmq/.erlang.cookie root@rabbit02:/var/lib/rabbitmq/.erlang.cookie

有几个节点推送几次,注意节点名称

启动所有节点

在子节点中执行:

rabbitmqctl stop_app 
rabbitmqctl reset 
rabbitmqctl join_cluster rabbit@rabbit01

如修改hostname和host后没有重启,则会导致加入失败

登陆主节点: