1. Java類中在通過MQ發送消息時報「遠程通道的類型對於請求的操作不合適」的異常,請問該怎麼解決
你用代碼操作的話需要在隊列管理器上建立一個伺服器連接通道,而不是你用的接收通道,代碼是通過伺服器連接通道來獲取隊列管理器連接的,你可以將這些概念性的東西再好好的看看會對你有所幫助。
2. activemq幾種傳遞消息方法的效率怎麼樣
作為消息發送
按照JMS規范,為了保證可靠性,所有的消息都應該是發送到broker,然後交由broker來投遞的。也即是說其實JMS是不建議或不支持傳輸文件的。
對於比較小的文件,簡單的處理方式是先讀取所有的文件成byte[],然後使用ByteMessage,把文件數據發送到broker,像正常的message一樣處理。對於大文件,例如1GB以上的文件,這么搞直接把client或是broker給oom掉了。
這種方式僅僅適用於小文件的傳輸。特別是如果broker端使用資料庫作為存儲,message序列化以後存放於blob欄位,文件傳輸頻繁或是稍微有點大,寫入效率極低。
直接傳輸文件
為了解決傳輸大文件的問題,ActiveMQ在jms規范之外引入了jms streams的概念。PTP模式下,連到同一個destination的兩端,可以通過broker中轉來傳輸大文件。
發送端使用connection.createOutputStream打開一個輸出流,往流里寫文件。
OutputStream out =connection.createOutputStream(destination);
接收端則簡單的使用connection.createInputStream拿到一個輸入流,從中讀取文件數據即可。
InputStream in = connection.createInputStream(destination)
3. 使用RabbitMQ非同步調用第三方介面怎麼給前端響應問題
後端可以生成一個以手機號為key的實體狀態信息,存到緩存(設置過期時間)。等待消息返回回調或異常處理並更新狀態信息
前端(輪詢)調用後端寫的一個介面去緩存拿到狀態 根據狀態碼進行提示
4. 怎麼給rabbitmq本地伺服器發送消息 c++
下面是RabbitMQ的消息確認機制:「為了確保消息不會丟失,RabbitMQ支持消息確認機制。客戶端在接受到消息並處理完後,可以發送一個ack消息給RabbitMQ,告訴它該消息可以安全的刪除了。假如客戶端在發送ack之前意外死掉了,那麼RabbitMQ會將消息投遞到下一個consumer客戶端。如果有多個consumer客戶端,RabbitMQ在投遞消息時是輪詢的。RabbitMQ如何判斷客戶端死掉了?唯一根據是客戶端連接是否斷開。這里沒有超時機制,也就是說客戶端可以處理一個消息很長時間,只要沒斷開連接,RabbitMQ就一直等待ack消息。」我現在遇到的問題是這樣的:我這邊有幾條線程去消息隊列里取數據,但是會有異常數據導致線程掛掉,就是上邊的「客戶端在發送ack之前意外死掉了」,RabbitMQ會將消息投遞到下一個consumer客戶端,這樣一條異常數據會把我的所有線程掛掉,我現在想實現這樣的功能:如果有異常數據導致進程掛掉,那麼我不讓RabbitMQ將這條消息投遞到下一個consumer客戶端,而是放到另一個地方或者另外處理,請問該如何實現呢?
5. 如何判斷activemq發送信息是否成功
您好,我來為您解答:
啟用事務,消費者收到消息後給服務端發送一個確認,服務端收到確認後才將消息從伺服器刪除
Connection#createSession(boolean transacted,int acknowledgeMode)第一個參數選擇true
收到Message後,調用Message上的acknowledge方法進行確認
如果我的回答沒能幫助您,請繼續追問。
6. SpringMVC+ActiveMQ發送消息並接收返回值的問題
消息通訊是非同步的,receiver 需要給sender發個確認收到的消息,沒有同步返回的功能。
你可以新創建個ack queue 來給sener 消費。大體是這樣,代碼一大堆,自己搜。
7. rocketmq 怎麼實現發送順序消息
rocketmq的順序消息需要滿足2點:
1.Procer端保證發送消息有序,且發送到同一個隊列。
2.consumer端保證消費同一個隊列。
先看個例子,代碼版本跟前面的一樣。
Procer類:
import Java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.procer.DefaultMQProcer;
import com.alibaba.rocketmq.client.procer.messageQueueSelector;
import com.alibaba.rocketmq.client.procer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
/**
* Procer,發送順序消息
*/
public class Procer {
public static void main(String[] args) throws IOException {
try {
DefaultMQProcer procer = new DefaultMQProcer("please_rename_unique_group_name");
procer.setNamesrvAddr("192.168.0.104:9876");
procer.start();
String[] tags = new String[] { "TagA", "TagC", "TagD" };
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加個時間後綴
String body = dateStr + " Hello RocketMQ " + i;
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = procer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
return mqs.get(id);
}
}, 0);//0是隊列的下標
system.out.println(sendResult + ", body:" + body);
}
procer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.in.read();
}
}
Consumer端:
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* 順序消息消費,帶事務方式(應用可控制Offset什麼時候提交)
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("192.168.0.104:9876");
/**
* 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
for (MessageExt msg: msgs) {
System.out.println(msg + ", content:" + new String(msg.getBody()));
}
try {
//模擬業務邏輯處理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
NameServer和BrokerServer起來後,運行列印,把前面的不重要的去掉了,只看後面的幾列:
content:2015-12-06 17:03:21 Hello RocketMQ 0
content:2015-12-06 17:03:21 Hello RocketMQ 1
content:2015-12-06 17:03:21 Hello RocketMQ 2
content:2015-12-06 17:03:21 Hello RocketMQ 3
content:2015-12-06 17:03:21 Hello RocketMQ 4
content:2015-12-06 17:03:21 Hello RocketMQ 5
content:2015-12-06 17:03:21 Hello RocketMQ 6
content:2015-12-06 17:03:21 Hello RocketMQ 7
content:2015-12-06 17:03:21 Hello RocketMQ 8
content:2015-12-06 17:03:21 Hello RocketMQ 9
可以看到,消息有序的。