步骤
创建消费者线程–创建连接工厂–工厂对象配置服务器地址、端口、用户名、密码、虚拟主机名–创建连接–创建通道–声明交换机名、队列名–将队列名、交换机名、路由key绑定到通道–创建消费者对象
public void create(final Context context, final int deviceType, final MQConnParam connParam, final MQCallback callback) {new Thread(new Runnable() {@Overridepublic void run() {mCallback = callback;boolean retFlag = true;int retriedCnt = 0;if (mMqExecutor == null) {mMqExecutor = new ThreadPoolExecutor(1, 1, 1,TimeUnit.MILLISECONDS, workingQueue, rejectedExecutionHandler);}do {try {// 连接工厂ConnectionFactory factory = new ConnectionFactory();// ip 端口 用户名 密码factory.setHost(connParam.serverIp);factory.setPort(connParam.port);factory.setUsername(connParam.username); // 新的账户和密码factory.setPassword(connParam.passwd);factory.setVirtualHost(connParam.virtualHost);// 异常重连配置factory.setAutomaticRecoveryEnabled(true); // 设置网络异常重连factory.setNetworkRecoveryInterval(10000);// 设置每个10s ,重试一次factory.setTopologyRecoveryEnabled(true);// 设置重新声明交换器,队列等信息factory.setRequestedHeartbeat(20); // 心跳包设置// 连接和通道mConnection = factory.newConnection();mChannel = mConnection.createChannel();// 声明交换机、队列、并绑定在一起mExchangeName = connParam.exchangeName;mQueueName = \"deviceCd.\" + Constants.DEVICE_CODE;mChannel.queueDeclare(mQueueName, false, false, false, null);mRoutingKey = mExchangeName.substring(6) + \".\" + deviceType + \".\" + Constants.DEVICE_CODE;mChannel.queueBind(mQueueName, mExchangeName, mRoutingKey);// 声明消息的消费者并消费Consumer consumer = new DefaultConsumer(mChannel) {@Overridepublic void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {logger.error(\"handleShutdownSignal - consumerTag:\" + consumerTag + \", reason:\" + sig.getLocalizedMessage());MQ_STATE = MsgCode.MSG_MQ_OFFLINE;sendMessage(context, MsgCode.ACTION_BROADCAST_RABBITMQ, MsgCode.MQ_STATUS, MsgCode.MSG_MQ_OFFLINE);}@Overridepublic void handleRecoverOk(String consumerTag) {logger.info(\"handleRecoverOk - consumerTag:\" + consumerTag);MQ_STATE = MsgCode.MSG_MQ_ONLINE_REPLY_HEALTH;sendMessage(context, MsgCode.ACTION_BROADCAST_RABBITMQ, MsgCode.MQ_STATUS, MsgCode.MSG_MQ_ONLINE_REPLY_HEALTH);}@Overridepublic void handleConsumeOk(String consumerTag) {logger.info(\"handleConsumeOk - consumerTag:\" + consumerTag);MQ_STATE = MsgCode.MSG_MQ_ONLINE_REPLY_HEALTH;sendMessage(context, MsgCode.ACTION_BROADCAST_RABBITMQ, MsgCode.MQ_STATUS, MsgCode.MSG_MQ_ONLINE_REPLY_HEALTH);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, \"UTF-8\");executeMqCmd(context, msg);}};mChannel.basicConsume(mQueueName, true, consumer); // autoAck = true, no need to ack handly// 推送建立logger.info(\"MQ: Establish push success.\");MQ_STATE = MsgCode.MSG_MQ_ONLINE_REPLY_HEALTH;sendMessage(context, MsgCode.ACTION_BROADCAST_RABBITMQ, MsgCode.MQ_STATUS, MsgCode.MSG_MQ_ONLINE_REPLY_HEALTH);retFlag = true;retriedCnt = 0;} catch (Exception e) { // 连接异常retFlag = false;mChannel = null;if (mConnection != null){try {mConnection.close();mConnection.abort();mConnection = null;} catch (IOException e1) {e1.printStackTrace();}}if (retriedCnt < 3) { // 只输出前三次出错的logretriedCnt++; // 重试次数多了就不计数了logger.error(e.getLocalizedMessage(), e);}}if (!retFlag) { // 连接异常时,通知MQ离线状态MQ_STATE = MsgCode.MSG_MQ_OFFLINE;sendMessage(context, MsgCode.ACTION_BROADCAST_RABBITMQ, MsgCode.MQ_STATUS, MsgCode.MSG_MQ_OFFLINE);try {Thread.sleep(500); // 500ms重连一次} catch (InterruptedException e) {e.printStackTrace();}}} while (!retFlag); // MQ初始连接不上时,重连!}}).start();}