当前位置:首页 » 网页前端 » mq给前端发送消息
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

mq给前端发送消息

发布时间: 2022-08-30 18:23:01

1. Java类中在通过MQ发送消息时报“远程通道的类型对于请求的操作不合适”的异常,请问该怎么解决

你用代码操作的话需要在队列管理器上建立一个服务器连接通道,而不是你用的接收通道,代码是通过服务器连接通道来获取队列管理器连接的,你可以将这些概念性的东西再好好的看看会对你有所帮助。

2. activemq几种传递消息方法的效率怎么样

作为消息发送
按照JMS规范,为了保证可靠性,所有的消息都应该是发送到broker,然后交由broker来投递的。也即是说其实JMS是不建议或不支持传输文件的。
对于比较小的文件,简单的处理方式是先读取所有的文件成byte[],然后使用ByteMessage,把文件数据发送到broker,像正常的message一样处理。对于大文件,例如1GB以上的文件,这么搞直接把client或是broker给oom掉了。
这种方式仅仅适用于小文件的传输。特别是如果broker端使用数据库作为存储,message序列化以后存放于blob字段,文件传输频繁或是稍微有点大,写入效率极低。
直接传输文件
为了解决传输大文件的问题,ActiveMQ在jms规范之外引入了jms streams的概念。PTP模式下,连到同一个destination的两端,可以通过broker中转来传输大文件。
发送端使用connection.createOutputStream打开一个输出流,往流里写文件。
OutputStream out =connection.createOutputStream(destination);
接收端则简单的使用connection.createInputStream拿到一个输入流,从中读取文件数据即可。
InputStream in = connection.createInputStream(destination)

3. 使用RabbitMQ异步调用第三方接口怎么给前端响应问题

后端可以生成一个以手机号为key的实体状态信息,存到缓存(设置过期时间)。等待消息返回回调或异常处理并更新状态信息
前端(轮询)调用后端写的一个接口去缓存拿到状态 根据状态码进行提示

4. 怎么给rabbitmq本地服务器发送消息 c++

下面是RabbitMQ的消息确认机制:“为了确保消息不会丢失,RabbitMQ支持消息确认机制。客户端在接受到消息并处理完后,可以发送一个ack消息给RabbitMQ,告诉它该消息可以安全的删除了。假如客户端在发送ack之前意外死掉了,那么RabbitMQ会将消息投递到下一个consumer客户端。如果有多个consumer客户端,RabbitMQ在投递消息时是轮询的。RabbitMQ如何判断客户端死掉了?唯一根据是客户端连接是否断开。这里没有超时机制,也就是说客户端可以处理一个消息很长时间,只要没断开连接,RabbitMQ就一直等待ack消息。”我现在遇到的问题是这样的:我这边有几条线程去消息队列里取数据,但是会有异常数据导致线程挂掉,就是上边的“客户端在发送ack之前意外死掉了”,RabbitMQ会将消息投递到下一个consumer客户端,这样一条异常数据会把我的所有线程挂掉,我现在想实现这样的功能:如果有异常数据导致进程挂掉,那么我不让RabbitMQ将这条消息投递到下一个consumer客户端,而是放到另一个地方或者另外处理,请问该如何实现呢?

5. 如何判断activemq发送信息是否成功

您好,我来为您解答:

启用事务,消费者收到消息后给服务端发送一个确认,服务端收到确认后才将消息从服务器删除

Connection#createSession(boolean transacted,int acknowledgeMode)第一个参数选择true

收到Message后,调用Message上的acknowledge方法进行确认

如果我的回答没能帮助您,请继续追问。

6. SpringMVC+ActiveMQ发送消息并接收返回值的问题

消息通讯是异步的,receiver 需要给sender发个确认收到的消息,没有同步返回的功能。
你可以新创建个ack queue 来给sener 消费。大体是这样,代码一大堆,自己搜。

7. rocketmq 怎么实现发送顺序消息

rocketmq的顺序消息需要满足2点:

1.Procer端保证发送消息有序,且发送到同一个队列。

2.consumer端保证消费同一个队列。

先看个例子,代码版本跟前面的一样。

Procer类:

import Java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.procer.DefaultMQProcer;
import com.alibaba.rocketmq.client.procer.messageQueueSelector;
import com.alibaba.rocketmq.client.procer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

/**
* Procer,发送顺序消息
*/
public class Procer {
public static void main(String[] args) throws IOException {
try {
DefaultMQProcer procer = new DefaultMQProcer("please_rename_unique_group_name");

procer.setNamesrvAddr("192.168.0.104:9876");

procer.start();

String[] tags = new String[] { "TagA", "TagC", "TagD" };

Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间后缀
String body = dateStr + " Hello RocketMQ " + i;
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes());

SendResult sendResult = procer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
return mqs.get(id);
}
}, 0);//0是队列的下标

system.out.println(sendResult + ", body:" + body);
}

procer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.in.read();
}
}

Consumer端:

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class Consumer {

public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("192.168.0.104:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

Random random = new Random();

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
for (MessageExt msg: msgs) {
System.out.println(msg + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}

}

NameServer和BrokerServer起来后,运行打印,把前面的不重要的去掉了,只看后面的几列:

content:2015-12-06 17:03:21 Hello RocketMQ 0

content:2015-12-06 17:03:21 Hello RocketMQ 1

content:2015-12-06 17:03:21 Hello RocketMQ 2

content:2015-12-06 17:03:21 Hello RocketMQ 3

content:2015-12-06 17:03:21 Hello RocketMQ 4

content:2015-12-06 17:03:21 Hello RocketMQ 5

content:2015-12-06 17:03:21 Hello RocketMQ 6

content:2015-12-06 17:03:21 Hello RocketMQ 7

content:2015-12-06 17:03:21 Hello RocketMQ 8

content:2015-12-06 17:03:21 Hello RocketMQ 9

可以看到,消息有序的。