『壹』 java 工程怎麼設置activemq消費者並發數
馬來盤-0.9 它的輸贏是這樣算的,輸就是-0.9乘以金額,贏就是下多少得的多少,這個得的呢是連你的本金的。-0.9下1000 輸是1000*0.9=900 還有100是返回到你的賬號上面了,-0.9下1000贏1000,因為負水就是正水大於1以上的,所以你可以直接把贏的看成水位是1的,然後再去乘以你的金額就得到了結果。
『貳』 actice Mq 消費者 IP
可以,通過系統消息可以獲得消費者的ip
『叄』 activemq怎麼實現多個生產者一個消費者
消費者有兩個,1和2,如果收到A類型,就用1消費者發簡訊通知用戶,
如果收到B類型,就用2消費者發郵件通知。
我的問題是:
當然,我也可以定義兩個procer:AP和BP,AP發A消息,BP發B消息,那我的新問題是:
------解決方案--------------------------------------------------------一個統一的接收者從activemq上接收消息,接受者收到消息後,根據類型判斷,轉發給不同的消費者。------解決方案--------------------------------------------------------建議你不同的消息類型放在不同的Queue里,這樣你的業務就不會混亂了
『肆』 ActiveMQ 多個隊列一個消費者 怎麼接收信息
一、WebSphere MQ命令行命令 1、停止隊列管理器 endmqm [-z] [([-c -w -i -p] [-r] [-s]) -x] QMgrName endmqm mqm_name 使用默認選項停止隊列管理器需要等待當前的應用連接完成並斷開。 -i 立即停止隊列管理器。 -w 需要等待所有的應用停止以後才會真正關閉隊列管理器 -p 使用以上參數都無法正常停止隊列管理器的情況下可以使用該參數 2、啟動隊列管理器 strmqm [-z] [-a -c -r -x] [-d noneminimalall] [-f] [-ns] QMgrName 隊列管理器必須在完全停止時才能被啟動。 3、創建隊列管理器 crtmqm [-z] [-q] [-c Text] [-d DefXmitQ] [-h MaxHandles] [-md DataPath] [-g ApplicationGroup] [-t TrigInt] [-u DeadQ] [-x MaxUMsgs] [-lp LogPri] [-ls LogSec] [-lc -ll] [-lf LogFileSize] [-ld LogPath] QMgrName 隊列管理器名大小寫敏感且不支持空字元串,長度為48位元組,同一網路中不能有重名。 創建隊列管理器的時侯最好創建死信隊列用於存放無法發送的信息,保證通道不會因為無法發送信息而被關閉。 crtmqm -u deadq_name mqm_name 4、刪除隊列管理器 dltmqm [-z] QMgrName 刪除隊列管理器會完全刪除其所擁有的對象和相關信息,並且是不可恢復的。要刪除一個隊列管理器首先要保證他是停止的。 二、WebSphere MQ Script. (MQSC) commands WebSphere MQ Script. (MQSC) commands是常常用來管理隊列管理器對象的。這些對象包括隊列管理器本身,隊列,名稱列表,通道,客戶端通道,監聽,服務等。使用runmqsc 隊列管理名來啟動,可以運行單個的命令,也可以通過命令集的腳本來運行。 本地隊列管理器的作用是接收遠程或本地的信息流,並將本地隊列中的信息流取出以供應用程序使用。在做這些工作之前需要定義相關的隊列管理器,隊列和通道等,而這些工作是由WebSphere MQ Script. (MQSC) commands來完整的。在Windows及Linux環境下也可以通過WebSphere MQ Explorer來完成。 啟動WebSphere MQ Script. (MQSC) :runmqsc [-e] [-v] [-w WaitTime [-x] [-m LocalQMgrName]] [QMgrName] runmqsc mqm_name。通過命令runmqsc啟動隊列管理器的命令伺服器。WebSphere MQ Explorer也能完成相同的任務。runmqsc的相關命令有三種運行方式,Verify a command without running it,Run a command on a local queue manager,Run a command on a remote queue manager。runmqsc的相關命令在解釋的時候都會以大寫來解釋,比如DEFINE,ALTER,RESET等。但是這些命令並不是大小寫敏感的。每行runmqsc命令最長只能到8個字元,可以通過-或者+連接下一行,-是從下一行的第一個字元開始,+是從下一行的第一個非空字元開始。而且所有命令與平台無關。runmqsc的標准輸入是鍵盤,標准輸出時屏幕,我們可以通過<,>重定向。例如從腳本輸入命令runmqsc </path/filename.in,將結果重定向到文件runmqsc>/path/filename.out。 1、顯示隊列管理器屬性(DISPLAY QMGR): DISPLAY QMGR顯示當前隊列管理器的所有屬性,也可以使用DISPLAY QMGR 屬性名,單獨查看當前隊列管理器的某個特定屬性。 2、 更改隊列管理器屬性(ALTER QMGR ): ALTER QMGR 用於更改隊列管理器的相關屬性,例如 ALTER QMGR MAXHANDS(255),這個命令將默認隊列管理器的MAXHANDS由256更改為255。 3、創建本地隊列(DEFINE QLOCAL ): DEFINE QLOCAL Q_LOCAL_NAME,在創建隊列的時候可以定義相關屬性的值,如果沒有定義則使用默認值。也可以全部使用默認值,最後通過ALTER QLOCAL命令來修改相關屬性。在同一個隊列管理器中不能有同名的隊列,可以使用REPLACE關鍵字重建已有的隊列。 4、修改本地隊列屬性(ALTER QLOCAL): ALTER QLOCAL Q_LOCAL_NAME NEW_ATTRIBUTE。已經定義了的本地隊列可以使用ALTER QLOCAL 命令對其屬性進行修改。 5、顯示本地隊列屬性(DISPLAY QLOCAL): DISPLAY QLOCAL Q_LOCAL_NAME ATTRIBUTE。此命令用於顯示本地隊列的屬性,可以使用默認的顯示全部屬性,也可以顯示指定的屬性。 6、復制本地隊列(DEFINE QLOCAL NEW LIKE OLD): DEFINE QLOCAL NEW LIKE OLD。此命令可以創建一個屬性與OLD完全一樣的本地隊列。當然,我們也可以在語句後面指定屬性的詳細信息,沒有指定的則繼承OLD的對應屬性,指定了的則使用新的屬性。 7、清除本地隊列中的消息(CLEAR QLOCAL): CLEAR QLOCAL Q_LOCAL_QUEUE。此命令用於清除本地隊列中存儲的信息。在清除信息的時候系統不會給出任何提示,而是直接把信息刪除。在一下兩種情況下不能使用CLEAR QLOCAL,本地隊列中存儲的有在最近一次隊列同步以後未提交的信息,有應用程序打開使用本地此隊列。 8、刪除本地隊列(DELETE QLOCAL): DELETE QLOCAL Q_LOCAL_QUEUE。此命令用於刪除本地隊列,當本地隊列中存在有沒有提交的數據此隊列不能刪除。如果隊列中存在數據,且數據是提交了的,可以使用PURGE關鍵字刪除本地隊列。例如DELETE QLOCAL (Q_LOCAL_QUEUE) PURGE,在刪除的時候可以指定NOPURGE關鍵字代替PURGE以保護刪除的隊列中可能存在的已提交數據。 三、PCF commands PCF commands允許管理員通過編程的方式將MQ的日常管理任務集成在程序中。包括創建隊列,預定義隊列,更改隊列管理器等, PCF commands與MQSC鎖實現的功能是相同。
『伍』 關於ActiveMQ的配備怎麼解決
關於ActiveMQ的配置
目前常用的消息隊列組建無非就是MSMQ和ActiveMQ,至於他們的異同,這里不想做過多的比較。簡單來說,MSMQ內置於微軟操作系統之中,在部署上包含一個隱性條件:Server需要是微軟操作系統。(對於這點我並去調研過MSMQ是否可以部署在非微軟系統,比如:Linux,只是拍腦袋想了想,感覺上是不可以)。對於ActiveMQ,微軟系統和Linux都是可以部署的。從功能方面來說,一般最常用的就是:消息的收/發,感覺差異不大。從性能上來說,一般的說法是ActiveMQ略高。在穩定性上,個人感覺MSMQ更好。如果這兩種常用隊列都用過的同學,應該來說最大的差異在於:MSMQ如果要訪問遠程隊列(比如機器A上的程序訪問機器B上的隊列),會比較惡心。在數據量比較大的情況之下,一般來說隊列伺服器會專門的一台或者多台(多台的話,用程序去做熱備+負載比較方便,也不需要額外的硬體成本。簡單來說做法可以這樣:消息發送的時候隨機向著多台隊列伺服器上發送消息;接受的時候開多個線程去分別監聽;熱備方面,可以維護一個帶狀態的隊列連接池,如果消息收發失敗那麼將狀態置為不可用,然後起一個線程去定時監測壞的連接是否可用,這個過程一般情況下可以不用加鎖,為什麼,大家根據各自需要去取捨吧)。最近搞完了短彩信的網關連接服務,這兩種隊列我均使用了。大致的過程是這樣的:上層應用如果要發端彩信,那麼將消息直接發送至ActiveMQ(目前用的就是上面說的多台熱備+負載,因為實際中下行量非常大5千萬條/天以上),然後端彩信網關連接服務部署多套,每套均依賴本機的MSMQ。為什麼呢?用ActiveMQ的原因是:上層應用程序和網關連接服務彼此獨立,消息需要跨機訪問。用MSMQ的原因是:ActiveMQ中的數據是一條不分省的大隊列,網關連接服務需要按省流控,所以端彩信網關連接服務:首先把消息從ActiveMQ取出來,然後存至本機上的分省MSMQ,這樣做另外的一個好處就是:ActiveMQ不至於過多擠壓,他的數據會分攤到N台短彩信網關連接服務所部署的機器上的MSMQ之中,也就說MSMQ可以起到分攤數據和緩沖的作用。
在之前的隨筆中,已經介紹過MSMQ,現在先介紹一下ActiveMQ一些配置,目前好像ActiveMQ配置上的介紹還比較少。以下是自己總結一些相關資料,貼出來給大家共享
一)問題分析和解決
1)KahaDb和AMQ Message Store兩種持久方式如何選擇?
官方:
From 5.3 onwards - we recommend you use KahaDB - which offers improved scalability and recoverability over the AMQ Message Store.
The AMQ Message Store which although faster than KahaDB - does not scales as well as KahaDB and recovery times take longer.
非官方:
kaha文件系統實際上上是一個文件索引系統,有兩部分組成,一個是數據文件系統,由一個個獨立的文件組成,預設文件大小是32M大(可配置),另外一個是索引文件系統,記錄消息在數據文件中的位置信息以及數據文件中的空閑塊信息。數據文件是存儲到硬碟上的,索引文件是緩存在內存中的。所以這個存儲系統對大消息存儲有利,象我們的memberId之類的文本消息,實際上是浪費,索引比消息還大,哈。
我方分析:
推薦: Amq持久方式
理由:雖然官方推薦使用KahaDB持久方式,但其提到的優勢:可伸縮性和恢復性較好,對於我們實際的應用意義不大。從我們自己的使用經驗來看,KahaDB持久方式,Data文件是一個大文件(感覺文件過大後,造成隊列服務癱死的可能性會增大),從官網的相關配置(附錄1)也找不到哪裡可以設置數據的文件的最大Size。)而Amq持久方式可以設置Data文件最大Size,這樣可以保證即時消息積壓很多,Data文件也不至於過大。
2)錯誤:Channel was inactive for too long
解決方法:
在建立連接的Uri中加入: wireFormat.maxInactivityDuration=0
參考資源:
http://jinguo.iteye.com/blog/243153
You can do the following to fix the issues:
1) Append max inactivity ration to your Uri in the format below: wireFormat.maxInactivityDuration=0
2) Use the same Uri at the client side as well as at the server side
Regards,
如果不這樣設置,對應的錯誤會出現:
2008-05-07 09:22:56,343 [org.apache.activemq.ActiveMQConnection]-[WARN] Async exception with no exception listener: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616
org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616
ActiveMQ的tcp url:tcp://localhost:61616後面要加入?wireFormat.maxInactivityDuration=0 這樣的參數,否則當一段時間沒有消息發送時會拋出 "Channel was inactive for too long"異常
3)錯誤:Wire format negotiation timeout: peer did not send his wire format.
解決方法:
1)關閉ActiveMqLog4j
打開:conf/log4j.properties
將:log4j.rootLogger=INFO, console, logfile
修改為:log4j.rootLogger=OFF
2)在建立連接的Uri中加入: =30000
例如北京的測試環境連接Uri:
tcp://192.168.199.80:61616?wireFormat.maxInactivityDuration=0&=30000&connection.AsyncSend=true
參考資源:
http://activemq.apache.org/javaxjmsjmsexception-wire-format-negociation-timeout-peer-did-not-send-his-wire-format.html
If you get exception like this,it can mean one of three things:
1. You're connecting to the port not used by ActiveMQ TCP transport
Make sure to check that you're connecting to the appropriate host:port
2. You're using log4j JMS appender and doesn't filter out ActiveMQ log messages
Be sure to read How do I use log4j JMS appender with ActiveMQ and more importantly to never send ActiveMQ log messages to JMS appender
3. Your broker is probably under heavy load (or network connection is unreliable), so connection setup cannot be completed in a reasonable time
If you experience sporadic exceptions like this, the best solution is to use failover transport, so that your clients can try connecting again if the first attempt fails. If you're getting these kind of exceptions more frequently you can also try extending wire format negotiation period (default 10 sec). You can do that by using wireFormat. property on the connection URL in your client.
For example
tcp://localhost:61616?wireFormat.=30000
will use 30 sec timeout.(貌似有問題!!!)
4)錯誤:Out of memory
解決方法:
1) 設置Java最大內存限制為合適大小:
Bin/activemq.bat 中ACTIVEMQ_OPTS=-Xmx512M(默認是512)
2)Activemq.xml配置節:systemUsage/ systemUsage配置大小合適,並且特別注意:大於所有rable desitination設置的memoryUsage之和。
備註:
1)尖括弧:「>」代表通配符
2)ACTIVEMQ_OPTS的配置〉=memoryUsage中配置〉=所有rable desitination設置之和
3)SystemUsage配置設置了一些系統內存和硬碟容量,當系統消耗超過這些容量設置時,amq會「slow down procer」,還是很重要的。
參考資料:
http://m.oschina.net/blog/26216
參考-- http://activemq.apache.org/javalangoutofmemory.html
對於MQ的內容實用是可管理和可配置的。首先需要判斷的是MQ的哪部分系統因內存不足而導致泄漏,是JVM,broker還是消費者、生產者?
一、內存管理
JVM內存管理:
1. 用bin/activemq命令在獨立JVM中運行broker。用-Xmx和-Xss命令即可(activemq.bat文件中修改ACTIVEMQ_OPTS選項參數即可);
2. 默認情況下,MQ用512M的JVM;
broker內存管理:
1. broker使用的內存並不是由JVM的內存決定的。雖然受到JVM的限制,但broker確實獨立管理器內存;
2. systemUsage和destination的內存限制與broker內存息息相關;
3. MQ中內存的關系是:JVM->Broker->broker features;
4. 所有destination的內存總量不能超過broker的總內存;
消費者:
1. 由於消息大小可以配置,prefetch limit往往是導致內存溢出的主要原因;
2. 減少prefetch limit的大小,會減少消費者內存中存儲的消息數量;
生產者:
1. 除非消息數量超過了broker資源的限制,否則生產者不會導致內存溢出;
2. 當內存溢出後,生產者會收到broker的阻塞信息提示;
二、其他
將消息緩沖之硬碟:
1. 只有當消息在內存中存儲時,才允許消息的快速匹配與分發,而當消費者很慢或者離開時,內存可能會耗盡;
2. 當destination到達它的內存臨界值時,broker會用消息游標來緩存非持久化的消息到硬碟。
3. 臨界值在broker中通過memoryUsage和systemUsage兩個屬性配置,請參考activemq.xml;
4. 對於緩慢的消費者,當尚未耗盡內存或者轉變為生產者並發控制模式前,這個特性允許生產者繼續發送消息到broker;
5. 當有多個destination的時候,默認的內存臨界值可能被打破,而這種情況將消息緩存到硬碟就顯得很有意義;
6. precentUsage配置:使用百分比來控制內存使用情況;
多個線程:
1. 默認情況下,MQ每個destination都對應唯一的線程;
2. -Dorg.apache.activema.UseDedicatedTaskRunner=false(activemq.bat文件中修改ACTIVEMQ_OPTS選項參數即可),用線程池來限制線程的數量,從而減少內存消耗;
大數據傳輸:
1. destination policies--maxPageSize:控制進入內存中的消息數量;lazyDispatch:增加控制使用當前消費者列表的預取值;
2. 使用blogMessage或者streamsMessage類型來進行大量文件的傳輸;
泄漏JMS資源:
1. 當session或者procer或者consumer大量存在而沒有關閉的時候;
2. 使用PooledConnectionFactory;
『陸』 rabbitmq服務怎樣配置調用本地
AMQP(高級消息隊列協議) 是一個非同步消息傳遞所使用的應用層協議規范,作為線路層協議,而不是API(例如JMS),AMQP 客戶端能夠無視消息的來源任意發送和接受信息。AMQP的原始用途只是為金融界提供一個可以彼此協作的消息協議,而現在的目標則是為通用消息隊列架構提供通用構建工具。因此,面向消息的中間件 (MOM)系統,例如發布/訂閱隊列,沒有作為基本元素實現。反而通過發送簡化的AMQ實體,用戶被賦予了構建例如這些實體的能力。這些實體也是規范的一 部分,形成了在線路層協議頂端的一個層級:AMQP模型。這個模型統一了消息模式,諸如之前提到的發布/訂閱,隊列,事務以及流數據,並且添加了額外的特性,例如更易於擴展,基於內容的路由。
AMQP當中有四個概念非常重要
virtual host,虛擬主機
exchange,交換機
queue,隊列
binding,綁定
一個虛擬主機持有一組交換機、隊列和綁定。
為什麼需要多個虛擬主機呢?因為RabbitMQ當中,用戶只能在虛擬主機的粒度進行許可權控制。因此,如果需要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創建一個虛擬主機。每一個RabbitMQ伺服器都有一個默認的虛擬主機/。
何謂虛擬主機(virtual host),交換機(exchange),隊列(queue)和綁定(binding)
隊列(Queues)是你的消息(messages)的終點,可以理解成裝消息的容器。消息就一直在裡面,直到有客戶端(也就是消費者,Consumer)連接到這個隊列並且將其取走為止。不過,也可以將一個隊列配置成這樣的:一旦消息進入這個隊列,此消息就被刪除。
隊列是由消費者(Consumer)通過程序建立的,不是通過配置文件或者命令行工具。這沒什麼問題,如果一個消費者試圖創建一個已經存在的隊列,RabbitMQ會直接忽略這個請求。因此我們可以將消息隊列的配置寫在應用程序的代碼裡面。
而要把一個消息放進隊列前,需要有一個交換機(Exchange)。
交換機(Exchange)可以理解成具有路由表的路由程序。每個消息都有一個稱為路由鍵(routing key)的屬性,就是一個簡單的字元串。交換機當中有一系列的綁定(binding),即路由規則(routes)。(例如,指明具有路由鍵 「X」 的消息要到名為timbuku的隊列當中去。)
消費者程序(Consumer)要負責創建你的交換機。交換機可以存在多個,每個交換機在自己獨立的進程當中執行,因此增加多個交換機就是增加多個進程,可以充分利用伺服器上的CPU核以便達到更高的效率。例如,在一個8核的伺服器上,可以創建5個交換機來用5個核,另外3個核留下來做消息處理。類似的,在RabbitMQ的集群當中,你可以用類似的思路來擴展交換機一邊獲取更高的吞吐量。
交換機如何判斷要把消息送到哪個隊列?你需要路由規則,即綁定(binding)。一個綁定就是一個類似這樣的規則:將交換機「desert(沙漠)」當中具有路由鍵「阿里巴巴」的消息送到隊列「hideout(山洞)」裡面去。換句話說,一個綁定就是一個基於路由鍵將交換機和隊列連接起來的路由規則。例如,具有路由鍵「audit」的消息需要被送到兩個隊列,「log-forever」和「alert-the-big-de」。要做到這個,就需要創建兩個綁定,每個都連接一個交換機和一個隊列,兩者都是由「audit」路由鍵觸發。在這種情況下,交換機會復制一份消息並且把它們分別發送到兩個隊列當中。交換機不過就是一個由綁定構成的路由表。
交換機有多種類型。他們都是做路由的,但是它們接受不同類型的綁定。為什麼不創建一種交換機來處理所有類型的路由規則呢?因為每種規則用來做匹配分子的CPU開銷是不同的。例如,一個「topic」類型的交換機試圖將消息的路由鍵與類似「dogs.*」的模式進行匹配。匹配這種末端的通配符比直接將路由鍵與「dogs」比較(「direct」類型的交換機)要消耗更多的CPU。如果你不需要「topic」類型的交換機帶來的靈活性,你可以通過使用「direct」類型的交換機獲取更高的處理效率。
『柒』 activeMQ 消費者問題
ActiveMQ伺服器消費者:從消息服務接收消息。
1、 ActiveMQ伺服器工作模式:通過ActiveMQ消息服務交換消息。消息生產者將消息發送至消息服務,消息消費者則從消息服務接收這些消息。這些消息傳送操作是使用一組實現 ActiveMQ應用編程介面 (API) 的對象來執行的。
2、 ActiveMQ客戶端使用 ConnectionFactory 對象創建一個連接,向消息服務發送消息以及從消息服務接收消息均是通過此連接來進行。Connection 是客戶端與消息服務的活動連接。創建連接時,將分配通信資源以及驗證客戶端。這是一個相當重要的對象,大多數客戶端均使用一個連接來進行所有的消息傳送。 連接用於創建會話。Session 是一個用於生成和使用消息的單線程上下文。它用於創建發送的生產者和接收消息的消費者,並為所發送的消息定義發送順序。會話通過大量確認選項或通過事務來支持可靠傳送。 客戶端使用 MessageProcer 向指定的物理目標(在 API 中表示為目標身份對象)發送消息。生產者可指定一個默認傳送模式(持久性消息與非持久性消息)、優先順序和有效期值,以控制生產者向物理目標發送的所有消息。
3、客戶端使用 MessageConsumer 對象從指定的物理目標(在 API 中表示為目標對象)接收消息。消費者可使用消息選擇器,藉助它,消息服務可以只向消費者發送與選擇標准匹配的那些消息。 消費者可以支持同步或非同步消息接收。非同步使用可通過向消費者注冊 MessageListener 來實現。當會話線程調用 MessageListener 對象的 onMessage 方法時,客戶端將使用消息。
『捌』 activemq與spring整合,tomcat啟動,控制台報如下錯誤,activemq管理界面看不到消費者
日誌信息, 表示在載入 Spring . 不表示 Spring 中就沒有任何錯誤。
『玖』 rocketMQ如何設置消費線程數
能選擇的有三種:
1. ActiveMQ/ApolloMQ
優點:老牌的消息隊列,使用Java語言編寫。對JMS支持最好,採用多線程並發,資源消耗比較大。如果你的主語言是Java,可以重點考慮。
缺點:由於歷史悠久,歷史包袱較多,版本更新很緩慢。集群模式需要依賴Zookeeper實現。最新架構的產品被命名為Apollo,號稱下一代ActiveMQ,目前案例較少。
2. RocketMQ/Kafka
優點:專為海量消息傳遞打造,主張使用拉模式,天然的集群、HA、負載均衡支持。話說還是那句話,適合不適合看你有沒有那麼大的量。
缺點:所謂魚和熊掌不可兼得,放棄了一些消息中間件的靈活性,使用的場景較窄,需關注你的業務模式是否契合,否則山寨變相使用很別扭。除此之外,RocketMQ沒有.NET下的客戶端可用。RocketMQ身出名門,但使用者不多,生態較小,畢竟消息量能達到這種體量的公司不多,你也可以直接去購買阿里雲的消息服務。Kafka生態完善,其代碼是用Scala語言寫成,可靠性比RocketMQ低一些。
3. RabbitMQ
『拾』 RabbitMQ怎樣能實現多個隊列由一個消費者來接收消息
1、笨拙點方法,就是輪循,consume的阻塞監聽可以設置timeout,通過設置一個較小的timeout,可以輪流監聽幾個channel,變相實現監聽多個queue,對性能要求不是很高,可以使用這種方法
2、還有個辦法就是先取出一個隊列的消息數,然後循環的都讀出後,轉去讀另一個隊列,所有隊列如果都沒有消息了,就這樣循環等待著
3、還有一個專業人士的回答,但是我還沒有完全理解:
消費者(consumer)這是個業務層的概念,而消費或者說訂閱(也就是 consume)是 AMQP 協議層的東西,所以,你問一個消費者能否訂閱多個queue,答案是當然可以。方案也就一種,按照協議的流程分別向不同的 queue 進行 consume。至於是使用多線程方式來處理,還是使用事件驅動的方式(單線程)來處理這就取決於實現了。
如果對多線程式控制制能力不是很強,建議不要用這種方式,太專業了
4、這個問題後來我仔細想過,也許可以在應用層進行一下重新設計,可以用線程池作為多個consumer只讀取出消息,不進行處理,然後publish進另一個隊列,然後用由一個consumer來處理消息