當前位置:首頁 » 網頁前端 » mq給前端發送消息
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

mq給前端發送消息

發布時間: 2022-08-30 18:23:01

1. Java類中在通過MQ發送消息時報「遠程通道的類型對於請求的操作不合適」的異常,請問該怎麼解決

你用代碼操作的話需要在隊列管理器上建立一個伺服器連接通道,而不是你用的接收通道,代碼是通過伺服器連接通道來獲取隊列管理器連接的,你可以將這些概念性的東西再好好的看看會對你有所幫助。

2. activemq幾種傳遞消息方法的效率怎麼樣

作為消息發送
按照JMS規范,為了保證可靠性,所有的消息都應該是發送到broker,然後交由broker來投遞的。也即是說其實JMS是不建議或不支持傳輸文件的。
對於比較小的文件,簡單的處理方式是先讀取所有的文件成byte[],然後使用ByteMessage,把文件數據發送到broker,像正常的message一樣處理。對於大文件,例如1GB以上的文件,這么搞直接把client或是broker給oom掉了。
這種方式僅僅適用於小文件的傳輸。特別是如果broker端使用資料庫作為存儲,message序列化以後存放於blob欄位,文件傳輸頻繁或是稍微有點大,寫入效率極低。
直接傳輸文件
為了解決傳輸大文件的問題,ActiveMQ在jms規范之外引入了jms streams的概念。PTP模式下,連到同一個destination的兩端,可以通過broker中轉來傳輸大文件。
發送端使用connection.createOutputStream打開一個輸出流,往流里寫文件。
OutputStream out =connection.createOutputStream(destination);
接收端則簡單的使用connection.createInputStream拿到一個輸入流,從中讀取文件數據即可。
InputStream in = connection.createInputStream(destination)

3. 使用RabbitMQ非同步調用第三方介面怎麼給前端響應問題

後端可以生成一個以手機號為key的實體狀態信息,存到緩存(設置過期時間)。等待消息返回回調或異常處理並更新狀態信息
前端(輪詢)調用後端寫的一個介面去緩存拿到狀態 根據狀態碼進行提示

4. 怎麼給rabbitmq本地伺服器發送消息 c++

下面是RabbitMQ的消息確認機制:「為了確保消息不會丟失,RabbitMQ支持消息確認機制。客戶端在接受到消息並處理完後,可以發送一個ack消息給RabbitMQ,告訴它該消息可以安全的刪除了。假如客戶端在發送ack之前意外死掉了,那麼RabbitMQ會將消息投遞到下一個consumer客戶端。如果有多個consumer客戶端,RabbitMQ在投遞消息時是輪詢的。RabbitMQ如何判斷客戶端死掉了?唯一根據是客戶端連接是否斷開。這里沒有超時機制,也就是說客戶端可以處理一個消息很長時間,只要沒斷開連接,RabbitMQ就一直等待ack消息。」我現在遇到的問題是這樣的:我這邊有幾條線程去消息隊列里取數據,但是會有異常數據導致線程掛掉,就是上邊的「客戶端在發送ack之前意外死掉了」,RabbitMQ會將消息投遞到下一個consumer客戶端,這樣一條異常數據會把我的所有線程掛掉,我現在想實現這樣的功能:如果有異常數據導致進程掛掉,那麼我不讓RabbitMQ將這條消息投遞到下一個consumer客戶端,而是放到另一個地方或者另外處理,請問該如何實現呢?

5. 如何判斷activemq發送信息是否成功

您好,我來為您解答:

啟用事務,消費者收到消息後給服務端發送一個確認,服務端收到確認後才將消息從伺服器刪除

Connection#createSession(boolean transacted,int acknowledgeMode)第一個參數選擇true

收到Message後,調用Message上的acknowledge方法進行確認

如果我的回答沒能幫助您,請繼續追問。

6. SpringMVC+ActiveMQ發送消息並接收返回值的問題

消息通訊是非同步的,receiver 需要給sender發個確認收到的消息,沒有同步返回的功能。
你可以新創建個ack queue 來給sener 消費。大體是這樣,代碼一大堆,自己搜。

7. rocketmq 怎麼實現發送順序消息

rocketmq的順序消息需要滿足2點:

1.Procer端保證發送消息有序,且發送到同一個隊列。

2.consumer端保證消費同一個隊列。

先看個例子,代碼版本跟前面的一樣。

Procer類:

import Java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.procer.DefaultMQProcer;
import com.alibaba.rocketmq.client.procer.messageQueueSelector;
import com.alibaba.rocketmq.client.procer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

/**
* Procer,發送順序消息
*/
public class Procer {
public static void main(String[] args) throws IOException {
try {
DefaultMQProcer procer = new DefaultMQProcer("please_rename_unique_group_name");

procer.setNamesrvAddr("192.168.0.104:9876");

procer.start();

String[] tags = new String[] { "TagA", "TagC", "TagD" };

Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加個時間後綴
String body = dateStr + " Hello RocketMQ " + i;
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes());

SendResult sendResult = procer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
return mqs.get(id);
}
}, 0);//0是隊列的下標

system.out.println(sendResult + ", body:" + body);
}

procer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.in.read();
}
}

Consumer端:

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
* 順序消息消費,帶事務方式(應用可控制Offset什麼時候提交)
*/
public class Consumer {

public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("192.168.0.104:9876");
/**
* 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

Random random = new Random();

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
for (MessageExt msg: msgs) {
System.out.println(msg + ", content:" + new String(msg.getBody()));
}
try {
//模擬業務邏輯處理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}

}

NameServer和BrokerServer起來後,運行列印,把前面的不重要的去掉了,只看後面的幾列:

content:2015-12-06 17:03:21 Hello RocketMQ 0

content:2015-12-06 17:03:21 Hello RocketMQ 1

content:2015-12-06 17:03:21 Hello RocketMQ 2

content:2015-12-06 17:03:21 Hello RocketMQ 3

content:2015-12-06 17:03:21 Hello RocketMQ 4

content:2015-12-06 17:03:21 Hello RocketMQ 5

content:2015-12-06 17:03:21 Hello RocketMQ 6

content:2015-12-06 17:03:21 Hello RocketMQ 7

content:2015-12-06 17:03:21 Hello RocketMQ 8

content:2015-12-06 17:03:21 Hello RocketMQ 9

可以看到,消息有序的。