鍒嗗竷寮忎簨鍔′箣濡備綍鍩轰簬RocketMQ鐨勪簨鍔℃秷鎭壒鎬у疄鐜板垎甯冨紡绯荤粺鐨勬渶缁堜竴鑷存€э紵

Posted 鏃犳晫鐮佸啘

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了鍒嗗竷寮忎簨鍔′箣濡備綍鍩轰簬RocketMQ鐨勪簨鍔℃秷鎭壒鎬у疄鐜板垎甯冨紡绯荤粺鐨勬渶缁堜竴鑷存€э紵相关的知识,希望对你有一定的参考价值。


1

瀵艰


鍦ㄤ箣鍓嶇殑鏂囩珷涓垜浠粙缁嶄簡濡備綍鍩轰簬RocketMQ鎼缓鐢熶骇绾ф秷鎭泦缇わ紝浠ュ強2PC銆?PC鍜孴CC绛変笌鍒嗗竷寮忎簨鍔$浉鍏崇殑鍩烘湰姒傚康锛堟病鏈夎杩囩殑璇昏€呰瑙侌煈?span>鎺ㄨ崘闃呰锛夈€傚湪杩欑瘒鏂囩珷涓垜浠皢浠嬬粛RocketMQ鐨勪簨鍔℃秷鎭浉鍏崇殑鍐呭锛屽苟閫氳繃涓€浜涘疄璺靛拰澶у涓€璧锋潵鎺㈢储涓?span>浜嬪姟娑堟伅濡備綍瑙e喅鍒嗗竷寮忕郴缁熶腑鐨勫垎甯冨紡浜嬪姟闂銆?/span>


2

浜嬪姟娑堟伅鍘熺悊


浜嬪姟娑堟伅鐗规€у彲浠ョ湅浣滄槸涓ら樁娈靛崗璁殑娑堟伅瀹炵幇鏂瑰紡锛岀敤浠ョ‘淇濆湪浠?span>娑堟伅涓棿浠惰В鑰?/strong>鐨勫垎甯冨紡绯荤粺涓?span>鏈湴浜嬪姟鐨勬墽琛屽拰娑堟伅鐨勫彂閫侊紝鍙互浠ュ師瀛愮殑鏂瑰紡杩涜銆?/span>


涓句釜渚嬪瓙锛屼互鏌愪簰鑱旂綉鍏徃鐨勭敤鎴蜂綑棰濆厖鍊间负渚嬶紝鍥犱负鏈夊厖杩旀椿鍔紙鍏呭€?00鍏冭禒閫?0鍏冿級锛屼紭鎯犳瘮杈冨ぇ锛岀敤鎴?span>Joe绂佷笉浣忚鎯戠敤鏀粯瀹濆悜鑷繁鐨勪綑棰濊处鎴峰厖鍊间簡100鍏冿紝鏀粯鎴愬姛鍚嶫oe鐨勪綑棰濊处鎴锋湁浜?20鍏冮挶銆?/span>


鑰岃鍏徃鐨勫叧浜庣敤鎴蜂綑棰濆厖鍊肩殑绯荤粺璁捐鏄繖鏍风殑锛?/span>

鍒嗗竷寮忎簨鍔′箣濡備綍鍩轰簬RocketMQ鐨勪簨鍔℃秷鎭壒鎬у疄鐜板垎甯冨紡绯荤粺鐨勬渶缁堜竴鑷存€э紵


鍦ㄨ繖涓璁℃祦绋嬩腑锛岃鍏徃閫氳繃鑷缓鏀粯绯荤粺瀹屾垚鐢ㄦ埛Joe鐨勬敮浠樺疂鎵f鎿嶄綔锛屾垚鍔熷悗闇€瑕佹洿鏂版敮浠樻祦姘寸殑鐘舵€侊紝鍥犱负鐢ㄦ埛鐨?span>浣欓璐︽埛绯荤粺涓庢敮浠樼郴缁熶箣闂撮€氳繃MQ瑙h€?/strong>浜嗭紝鎵€浠ユ敮浠樼郴缁熷湪瀹屾垚鏀粯娴佹按鐘舵€佹洿鏂板悗闇€瑕侀€氳繃鍙戦€丮Q娑堟伅鍒版秷鎭腑闂翠欢鏈嶅姟锛岀劧鍚庣敤鎴蜂綑棰濈郴缁熶綔涓烘秷璐硅€呴€氳繃娑堟伅娑堣垂鐨勬柟寮忓畬鎴愮敤鎴蜂綑棰濈殑澧炲姞鎿嶄綔銆?br>


杩欓噷鏈変釜闂锛氣€?span>鏀粯绯荤粺濡備綍纭繚杩欑瑪浣欓鍏呭€兼秷鎭竴瀹氫細鎴愬姛鍙戦€佸埌MQ锛屽苟涓旂敤鎴蜂綑棰濈郴缁熶竴瀹氳兘澶勭悊鎴愬姛鍛?/strong>鈥濓紵濡傛灉鏀粯绯荤粺鍦ㄥ畬鎴愭敮浠樿鍗曠姸鎬佹洿鏂板悗锛孧Q娑堟伅鍙戦€佸け璐?/strong>鎴栬€呯敤鎴?span>浣欓绯荤粺娑堟伅澶勭悊澶辫触鐨勮瘽锛岄兘浼氬鑷碕oe鏀粯鎵f鎴愬姛锛岃€岃嚜宸辩殑浣欓璐︽埛鍗存病鍒拌处鐨勬儏鍐靛彂鐢熴€?/span>


涓轰簡瑙e喅杩欎釜闂锛屾寜鐓х洰鍓嶇殑绯荤粺璁捐鏄渶瑕?span>鈥滄敮浠樼郴缁?MQ鏈嶅姟-鐢ㄦ埛浣欓绯荤粺鈥?/strong>涓夎€呯殑澶勭悊婊¤冻鏁版嵁鐨勪竴鑷存€ц姹傘€備緥濡傦紝濡傛灉鏀粯绯荤粺鎰熺煡鍒版秷鎭彂閫佸け璐ュ悗杩樺彲浠ヨ繘琛岄噸鏂版姇閫掞紝浠庤€岀‘淇濇敮浠樼郴缁熶笌鐢ㄦ埛浣欓鏁版嵁鐨勬渶缁堜竴鑷存€с€?/span>


鑰屼笂杩伴棶棰樺氨鏄簨鍔℃秷鎭瑙e喅鐨勯棶棰橈紝鍦ㄥ叿浣撲簡瑙ocketMQ鎻愪緵鐨勪簨鍔℃秷鎭満鍒朵箣鍓嶏紝鎴戜滑鍏堟潵鐪嬩笅鍦≧ocketMQ鐨勬棭鏈熺増鏈笉鏀寔浜嬪姟娑堟伅锛屾垨鑰呭洜涓哄巻鍙插師鍥犻€夋嫨鐨勬秷鎭腑闂翠欢鏈韩灏变笉鏀寔浜嬪姟娑堟伅鐨勬儏鍐典笅锛屼竴浜涘ぇ鍏徃鏄€庝箞瑙e喅杩欎釜闂鐨勶紵


鏃╂湡涓轰簡瀹炵幇鍩轰簬MQ寮傛璋冪敤鐨勫涓湇鍔¢棿锛屼笟鍔¢€昏緫鎵ц瑕佷箞涓€璧锋垚鍔熴€佽涔堜竴璧峰け璐ワ紝鍏峰浜嬪姟鐗圭偣锛岄€氬父浼氶噰鐢?span>鍙潬娑堟伅鏈€缁堜竴鑷存€ф柟妗?/strong>锛屾潵瀹炵幇鍒嗗竷寮忎簨鍔°€傝繕鏄互Joe鍏呭€艰繖浠朵簨鏉ヤ妇渚嬶紝鍙潬娑堟伅鏂规瀹炵幇杩囩▼濡備笅锛?/span>


鍒嗗竷寮忎簨鍔′箣濡備綍鍩轰簬RocketMQ鐨勪簨鍔℃秷鎭壒鎬у疄鐜板垎甯冨紡绯荤粺鐨勬渶缁堜竴鑷存€э紵


鍦ㄥ彲闈犳秷鎭渶缁堜竴鑷存€ф柟妗堜腑锛屼负浜嗗疄鐜板垎甯冨紡浜嬪姟锛岄渶瑕佺‘淇濅笂娓告湇鍔℃湰鍦颁簨鍔$殑澶勭悊涓嶮Q娑堟伅鐨勬姇閫掑叿鏈夊師瀛愭€э紝涔熷氨鏄涓婃父鏈嶅姟鏈湴浜嬪姟澶勭悊鎴愬姛鍚庤纭繚娑堟伅涓€瀹氳鎴愬姛鎶曢€掑埌MQ鏈嶅姟锛屽惁鍒欐秷鎭氨涓嶅簲璇ヨ鎶曢€掑埌MQ鏈嶅姟锛涘悓鏍凤紝琚垚鍔熸姇閫掑埌MQ鏈嶅姟鐨勬秷鎭紝涔熶竴瀹氳琚笅娓告湇鍔℃垚鍔熷鐞嗭紝鍚﹀垯灏遍渶瑕侀噸鏂版姇閫扢Q娑堟伅銆?/span>


涓轰簡瀹炵幇鍙屽悜鐨勫師瀛愭€э紝鍙潬娑堟伅鏈嶅姟闇€瑕佸娑堟伅杩涜鐘舵€佹爣璁?/strong>锛屼笌姝ゅ悓鏃惰繕闇€瑕佸娑堟伅杩涜鐘舵€佹鏌ワ紝浠庤€屽疄鐜伴噸鏂版姇閫掑強娑堟伅鐘舵€佺殑鏈€缁堜竴鑷存€с€?span>鏍稿績娴佺▼璇存槑濡備笅锛?/span>


1銆佷笂娓告湇鍔★紙鏀粯绯荤粺锛夊浣曠‘淇濆畬鎴愯嚜韬敮浠樻垚鍔熺姸鎬佹洿鏂板悗娑堟伅100%鐨勮兘澶熸姇閫掑埌涓嬫父鏈嶅姟锛堢敤鎴蜂綑棰濈郴缁燂級鎸囧畾鐨凾opic涓紵


鍦ㄨ繖涓祦绋嬩腑涓婃父鏈嶅姟鍦ㄨ繘琛屾湰鍦版暟鎹簱浜嬪姟鎿嶄綔鍓嶏紝浼氬厛鍙戦€佷竴涓姸鎬佷负鈥滃緟纭鈥?/strong>鐨勬秷鎭嚦鍙潬娑堟伅鏈嶅姟锛岃€屼笉鏄洿鎺ュ皢娑堟伅鎶曢€掑埌MQ鏈嶅姟鐨勬寚瀹歍opic銆傚彲闈犳秷鎭湇鍔℃鏃朵細灏嗚娑堟伅璁板綍鍒拌嚜韬湇鍔$殑娑堟伅鏁版嵁搴撲腑锛?span>娑堟伅鐘舵€佷负->寰呯‘璁?/strong>锛夛紝瀹屾垚鍚庡彲闈犳秷鎭湇鍔′細鍥炶皟涓婃父鏈嶅姟琛ㄧず鏀跺埌浜嗘秷鎭紝浣犱滑鍙互杩涜鏈湴浜嬪姟鐨勬搷浣滀簡銆?/span>


涔嬪悗涓婃父鏈嶅姟灏变細寮€鍚湰鍦版暟鎹簱浜嬪姟鎵ц涓氬姟閫昏緫鎿嶄綔锛岃繖閲屾敮浠樼郴缁熷氨浼氬皢璇ョ瑪鏀粯璁㈠崟鐘舵€佹洿鏂颁负鈥滃凡鎴愬姛鈥濄€傦紙娉ㄦ剰锛岃繖閲屽彧鏄妇涓ず渚嬪満鏅紝鍦ㄧ湡姝g殑瀹炶返涓竴鑸槸涓嶄細鎶婃敮浠樿鍗曟湰韬殑鐘舵€佷笌涓氬姟绔洖璋冩斁鍦ㄤ竴涓簨鍔℃祦绋嬩腑鐨勶紝鍏充簬杩欓儴鍒嗙殑璇︾粏璇存槑鎴戜滑鍦ㄤ笅闈㈢殑鍦烘櫙璇存槑涓啀璁ㄨ锛夈€?/span>


濡傛灉涓婃父鏈嶅姟鏈湴鏁版嵁搴撲簨鍔℃墽琛屾垚鍔燂紝鍒欑户缁悜鍙潬娑堟伅鏈嶅姟鍙戦€?span>娑堟伅纭娑堟伅锛屾鏃跺彲闈犳秷鎭湇鍔″氨浼氭寮忓皢娑堟伅鎶曢€掑埌MQ鏈嶅姟锛屽苟涓斿悓鏃?span>鏇存柊娑堟伅鏁版嵁搴撲腑鐨勬秷鎭姸鎬佷负鈥滃凡鍙戦€佲€?/strong>銆傦紙娉ㄦ剰锛岃繖閲屽彲闈犳秷鎭湇鍔℃洿鏂版秷鎭姸鎬佷笌鎶曢€掓秷鎭嚦MQ涔熷繀椤绘槸鍦ㄤ竴涓師瀛愭搷浣滀腑锛屽嵆娑堟伅鎶曢€掓垚鍔熷垯涓€瀹氳灏嗘秷鎭姸鎬佹洿鏂颁负鈥滃凡鍙戦€佲€?/strong>锛屾墍浠ュ湪缂栫▼鐨勭粏鑺備腑锛屽彲闈犳秷鎭湇鍔′竴鑸細鍏堟洿鏂版秷鎭姸鎬侊紝鐒跺悗鍐嶈繘琛屾秷鎭姇閫掞紝杩欐牱鍗充娇娑堟伅鎶曢€掑け璐ワ紝涔熷彲浠ュ娑堟伅鐘舵€佽繘琛屽洖婊?>鈥滃緟纭鈥?/strong>锛岀浉鍙嶅鏋滃厛杩涜娑堟伅鎶曢€掑啀鏇存柊娑堟伅鐘舵€侊紝鍙兘灏变笉濂芥帶鍒朵簡锛夈€?br>


鐩稿弽锛屽鏋滀笂娓告湰鍦版暟鎹簱浜嬪姟鎵ц澶辫触锛屽垯闇€瑕佸悜鍙潬娑堟伅鏈嶅姟鍙戦€?span>娑堟伅鍒犻櫎娑堟伅锛屽彲闈犳秷鎭湇鍔℃鏃跺氨浼氬皢娑堟伅鍒犻櫎锛岃繖鏍峰氨鎰忓懗鐫€浜嬪姟鍦ㄤ笂娓告秷鎭姇閫掕繃绋嬩腑灏辫鍥炴粴浜嗭紝鑰屾祦绋嬩篃灏辨缁撴潫浜嗭紝姝ゆ椂涓婃父鏈嶅姟鍙互闇€瑕侀€氳繃涓氬姟閫昏緫鐨勮璁¤繘琛岄噸鍙戯紝杩欎釜灏变笉鍐嶅垎甯冨紡浜嬪姟鐨勮璁鸿寖鐣翠簡銆?/span>


璇村埌杩欓噷锛屽ぇ瀹跺彲鑳戒細鏈夌枒闂簡锛佸洜涓哄湪涓婅堪鎻忚堪涓紝鍗充娇涓婃父鏈嶅姟鏈湴鏁版嵁搴撲簨鍔℃墽琛屾垚鍔熶簡锛屼絾鏄湪鍙戦€佺‘璁ゆ秷鎭嚦鍙潬娑堟伅鏈嶅姟鐨勮繃绋嬩腑锛屼互鍙婂彲闈犳秷鎭湇鍔″湪鎶曢€掓秷鎭嚦MQ鏈嶅姟鐨勮繃绋嬩腑锛岃繕鏄細瀛樺湪澶辫触鐨勯闄╋紝杩欐牱鐨勮瘽杩樻槸浼氬鑷存敮浠樻湇鍔℃洿鏂颁簡鐘舵€侊紝浣嗘槸鐢ㄦ埛浣欓绯荤粺杩炴秷鎭兘娌℃湁鏀跺埌鐨勬儏鍐靛彂鐢燂紵


瀹為檯涓婏紝瀹炵幇鏁版嵁涓€鑷存€ф槸涓€涓鏉傜殑娲汇€傚湪杩欎釜鏂规涓彲闈犳秷鎭湇鍔′綔涓哄熀纭€鎬х殑鏈嶅姟闄や簡鎵ц姝e父鐨勯€昏緫澶栵紝杩樺緱澶勭悊澶嶆潅鐨勫紓甯稿満鏅€傚湪瀹炵幇杩囩▼涓彲闈犳秷鎭湇鍔¢渶瑕佸惎鍔ㄧ浉搴旂殑鍚庡彴绾跨▼锛屼笉鏂疆璁秷鎭殑鐘舵€侊紝杩欓噷浼氳疆璁秷鎭姸鎬佷负鈥滃緟纭鈥?/strong>鐨勬秷鎭紝骞跺垽鏂娑堟伅鐨勭姸鎬佺殑鎸佺画鏃堕棿鏄惁瓒呰繃浜嗚瀹氱殑鏃堕棿锛屽鏋滆秴杩囪瀹氭椂闂寸殑娑堟伅杩樺浜庘€滃緟纭鈥濈殑鐘舵€侊紝灏变細瑙﹀彂涓婃父鏈嶅姟鐘舵€佽闂満鍒?/strong>銆?/span>


鍙潬娑堟伅鏈嶅姟灏变細璋冪敤涓婃父鏈嶅姟鎻愪緵鐨勭浉鍏冲€熷彛锛岃闂繖绗旀秷鎭殑澶勭悊鎯呭喌锛屽鏋滆繖绗旀秷鎭湪涓婃父鏈嶅姟澶勭悊鎴愬姛锛屽垯鍚庡彴绾跨▼灏变細缁х画瑙﹀彂涓婂浘涓殑姝ラ5锛屾洿鏂版秷鎭姸鎬佷负鈥滃凡鍙戦€佲€?/strong>骞舵姇閫掓秷鎭嚦MQ鏈嶅姟锛涘弽涔嬪鏋滆繖绗旀秷鎭笂娓告湇鍔″鐞嗗け璐ワ紝鍙潬娑堟伅鏈嶅姟鍒欎細杩涜娑堟伅鍒犻櫎銆傞€氳繃杩欐牱浠ヤ笂鏈哄埗灏辩‘淇濅簡鈥?span>涓婃父鏈嶅姟鏈湴浜嬪姟鎴愬姛澶勭悊+娑堟伅鎴愬姛鎶曢€?/strong>鈥濆浜庝竴涓師瀛愭搷浣滀簡銆?br>


2銆佷笅娓告湇鍔★紙鐢ㄦ埛浣欓绯荤粺锛夊浣曠‘淇濆MQ鏈嶅姟Topic娑堟伅鐨勬秷璐?00%閮借兘澶勭悊鎴愬姛锛?/strong>


鍦?鐨勮繃绋嬩腑锛岀‘淇濅簡涓婃父鏈嶅姟閫昏緫澶勭悊涓嶮Q娑堟伅鐨勬姇閫掑叿澶囧師瀛愭€э紝閭d箞褰撴秷鎭鎴愬姛鎶曢€掑埌浜哅Q鏈嶅姟鐨勬寚瀹歍opic鍚庯紝涓嬫父鏈嶅姟濡備綍鎵嶈兘纭繚娑堟伅鐨勬秷璐逛竴瀹氳兘琚垚鍔熷鐞嗗憿锛?/span>


鍦ㄦ甯哥殑娴佺▼涓紝涓嬫父鏈嶅姟绛夊緟娑堣垂Topic鐨勬秷鎭苟杩涜鑷韩鏈湴鏁版嵁搴撲簨鍔$殑澶勭悊锛屽鏋?span>澶勭悊鎴愬姛鍒欎細涓诲姩閫氱煡鍙潬娑堟伅鏈嶅姟锛屽彲闈犳秷鎭湇鍔℃鏃跺氨浼氬皢娑堟伅鐨勭姸鎬佹洿鏂颁负鈥滃凡瀹屾垚鈥?/strong>锛涘弽涔嬶紝澶勭悊澶辫触涓嬫父鏈嶅姟灏辨棤娉曞啀涓诲姩鍚戝彲闈犳秷鎭湇鍔″彂閫侀€氱煡娑堟伅浜嗐€?/span>


姝ゆ椂锛屼笌娑堟伅鎶曢€掕繃绋嬩腑鐨勫紓甯搁€昏緫涓€鏍凤紝鍙潬娑堟伅鏈嶅姟涔熶細鍚姩鐩稿簲鐨勫悗鍙扮嚎绋嬶紝杞涓€鐩村浜庘€?span>宸插彂閫佲€?/strong>鐘舵€佺殑娑堟伅锛屽垽鏂姸鎬佹寔缁椂闂存槸鍚﹁秴杩囦簡瑙勫畾鏃堕棿锛屽鏋滆秴鏃讹紝鍙潬娑堟伅鏈嶅姟灏变細鍐嶆鍚慚Q鏈嶅姟鎶曢€掓娑堟伅锛屼粠鑰岀‘淇濇秷鎭兘琚啀娆℃秷璐瑰鐞嗐€傦紙娉ㄦ剰锛屼篃鍙兘鍑虹幇涓嬫父鏈嶅姟澶勭悊鎴愬姛锛屼絾鏄€氱煡娑堟伅鍙戦€佸け璐ョ殑鎯呭喌锛屾墍浠ヤ负浜嗙‘淇濆箓绛夛紝涓嬫父鏈嶅姟涔熼渶瑕佸湪涓氬姟閫昏緫涓婂仛濂界浉搴旂殑闃查噸澶勭悊锛夈€?/span>


3

RocketMQ浜嬪姟娑堟伅鏈哄埗


鍦煈嗛潰绗?灏忚妭鐨勫唴瀹逛腑锛屾垜浠紨绀轰簡涓€涓嚜缂栧啓鐨勪腑闂存湇鍔?MQ鏉ュ疄鐜颁簨鍔℃秷鎭殑绀轰緥銆備絾鏄湪鐜板疄鐨勫伐浣滃満鏅腑锛屽紑鍙戝拰缁存姢涓€濂楀彲闈犳秷鎭湇鍔℃槸涓€浠跺緢鑰楄垂璧勬簮鍜屾垚鏈殑浜嬫儏锛屽疄闄呬笂锛孯ocketMQ鐨勬渶鏂扮増鏈紙4.3.0+锛変腑宸茬粡瀹炵幇浜嗗彲闈犳秷鎭湇鍔$殑鎵€鏈夊姛鑳斤紝骞朵笖鍦ㄤ繚璇侀珮骞跺彂銆侀珮鍙敤銆侀珮鎬ц兘鏂归潰鍋氫簡鏇翠负浼樼鐨勬灦鏋勫疄鐜般€?/span>


浠庤璁¢€昏緫涓婄湅RocketMQ鎵€鏀寔鐨勫垎甯冨紡浜嬪姟鐗规€т笌涓婅妭涓槓杩扮殑鍙潬娑堟伅鏈嶅姟鍩烘湰涓婃槸涓€鑷寸殑銆傚彧鏄疪ocketMQ鍦ㄥ疄鐜颁笂鐩告瘮杈冧簬鍙潬娑堟伅鏈嶅姟鑰岃█鍋氫簡鏇翠负澶嶆潅鐨勮璁★紝骞朵笖鍥犱负澶╃劧涓嶮Q鏈嶅姟鏈韩绱у瘑缁撳悎锛屾墍浠ュ湪楂樺彲鐢ㄣ€佸彲闈犳€с€佹€ц兘绛夋柟闈㈢洿鎺ョ户鎵夸簡MQ鏈嶅姟鏈韩鐨勬灦鏋勪紭鍔裤€?/span>


涓嬮潰鎴戜滑灏辩粨鍚堟祦绋嬪苟閫氳繃绀轰緥浠g爜鐨勫垎鏋愭潵鍜屽ぇ瀹朵竴璧风悊瑙d笅鍒╃敤RocketMQ鏄浣曞疄鐜板垎甯冨紡浜嬪姟鎿嶄綔鐨勶紵


鍒嗗竷寮忎簨鍔′箣濡備綍鍩轰簬RocketMQ鐨勪簨鍔℃秷鎭壒鎬у疄鐜板垎甯冨紡绯荤粺鐨勬渶缁堜竴鑷存€э紵

鍦ㄥ簲鐢ㄥ満鏅腑鍒嗗竷寮忔湇鍔¢€氳繃MQ閫氫俊鐨勮繃绋嬩腑锛屽彂閫佹秷鎭殑涓€鏂规垜浠О涔嬩负Producer锛屾帴鏀舵秷璐规秷鎭殑涓€鏂规垜浠О涔嬩负Consumer銆傚鏋淧roducer鑷韩涓氬姟閫昏緫鏈湴浜嬪姟鎵ц鎴愬姛涓庡惁甯屾湜鍜屾秷鎭殑鍙戦€佷繚鎸佷竴涓師瀛愭€э紙涔熷氨鏄濡傛灉Producer鏈湴浜嬪姟鎵ц鎴愬姛锛岄偅涔堣繖绗旀秷鎭氨涓€瀹氳琚垚鍔熺殑鍙戦€佸埌RocketMQ鏈嶅姟鐨勬寚瀹歍opic锛屽苟涓擟onsumer涓€瀹氳琚秷璐规垚鍔燂紱鍙嶄箣锛屽鏋淧roducer鏈湴浜嬪姟鎵ц澶辫触锛岄偅涔堣繖绗旀秷鎭氨搴旇琚玆ocketMQ鏈嶅姟鍣ㄤ涪寮冿級鐨勮瘽锛孯ocketMQ鏄€庝箞鍋氱殑鍛紵


1銆丳roducer閫夋嫨浣跨敤RockerMQ鎻愪緵鐨勪簨鍔℃秷鎭柟娉曞悜RocketMQ鏈嶅姟鍙戦€佷簨鍔℃秷鎭?璁剧疆娑堟伅灞炴€?/span>TRAN_MSG=TRUE)锛?/span>


2銆丷ocketMQ鏈嶅姟绔湪鏀跺埌娑堟伅鍚庝細鍒ゆ柇娑堟伅鐨勫睘鎬ф槸鍚︿负浜嬪姟娑堟伅锛屽鏋滄槸鏅€氭秷鎭氨鐩存帴Push缁機onsumer锛涘鏋滄槸浜嬪姟娑堟伅灏变細瀵硅娑堟伅杩涜鐗规畩澶勭悊璁剧疆浜嬪姟ID锛屽苟鏆傛椂璁剧疆璇ユ秷鎭Consumer涓嶅彲瑙侊紝涔嬪悗鍚慞roducer杩斿洖Pre娑堟伅鍙戦€佺姸鎬?em>(SEND_OK)銆?br>


3銆佷箣鍚嶱roducer灏变細寮€濮嬫墽琛屾湰鍦颁簨鍔¢€昏緫锛屽苟璁剧疆鏈湴浜嬪姟澶勭悊鐘舵€佸悗鍚慠ocketMQ鏈嶅姟鍣ㄥ彂閫佽浜嬪姟娑堟伅鐨勭‘璁?鍥炴粴娑堟伅(COMMIT_MESSAGE锛廟OLLBACK_MESSAGE)銆?br>


4銆丷ocketMQ鏈嶅姟鍣ㄦ牴鎹绗斾簨鍔℃秷鎭殑鏈湴浜嬪姟鎵ц鐘舵€佸喅瀹氭槸鍚﹀皢娑堟伅Push缁機onsumer杩樻槸鍒犻櫎璇ユ秷鎭€?/span>


5銆佷箣鍚嶤onsumer灏变細娑堣垂璇ユ秷鎭紝鎵цConsumer鐨勬湰鍦颁簨鍔¢€昏緫锛屽鏋滄墽琛屾垚鍔熷垯鍚慠ocketMQ杩斿洖鈥?/span>CONSUME_SUCCESS鈥濓紱鍙嶄箣鍑虹幇寮傚父鍒欓渶瑕佽繑鍥炩€?/span>RECONSUME_LATER鈥濓紝浠ヤ究RocketMQ鍐嶆Push璇ユ秷鎭紝杩欎竴鐐瑰湪瀹為檯缂栫▼涓渶瑕佹帶鍒跺ソ銆?/strong>


姝e父鎯呭喌涓嬩互涓婂氨鏄疪ocketMQ浜嬪姟娑堟伅鐨勫熀鏈繍琛屾祦绋嬩簡锛屼絾鏄粠寮傚父鎯呭喌鑰冭檻锛岀悊璁轰笂涔熸槸瀛樺湪Producer杩熻繜涓嶅彂閫佺‘璁ゆ垨鍥炴粴娑堟伅鐨勬儏鍐点€備笌鍙潬娑堟伅鏈嶅姟涓€鏍凤紝RocketMQ鏈嶅姟绔篃浼氳缃悗鍙扮嚎绋嬪幓鎵弿娑堟伅鐘舵€侊紝涔嬪悗浼氳皟鐢≒roducer鐨勬湰鍦?span>checkLocalTransaction鍑芥暟鑾峰彇鏈湴浜嬪姟鐘舵€佸悗缁х画杩涜绗?姝?/strong>鎿嶄綔銆?/span>


鐩镐俊鐪嬪埌杩欓噷锛屽ぇ瀹跺浜嶳ocketMQ鐨勫垎甯冨紡浜嬪姟娑堟伅鐨勭悊瑙e簲璇ユ湁浜嗕竴涓浉瀵规竻鏅扮殑姒傚康浜嗭紝閭d箞鍦ㄤ唬鐮佷腑濡備綍缂栧啓鍛紵


鍦ㄥ紑鍙戜腑浣跨敤RocketMQ鐨勫垎甯冨紡浜嬪姟娑堟伅Consumer鐨勪唬鐮佷笉闇€瑕佹湁浠€涔堢壒鍒殑鍙樺寲涓庢櫘閫氭秷鎭疌onsumer浠g爜涓€鑷村氨鍙互銆?/span>


Consumer绀轰緥浠g爜锛?/span>

public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_PAY_ACCOUNT");

        // Specify name server addresses.
        consumer.setNamesrvAddr("10.211.55.4:9876;10.211.55.5:9876;10.211.55.6:9876");

        // Subscribe one more more topics to consume.
        consumer.subscribe("PAY_ACCOUNT""*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context)
 
{
                for (MessageExt messageExt : msgs) {
                    System.out.println(new String(messageExt.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }


涓昏鐨勬敼鍙樻槸鍦≒roducer浠g爜锛屾垜浠渶瑕侀澶栫紪鍐欎竴涓疄鐜版墽琛屾湰鍦颁簨鍔¢€昏緫锛屼互鍙婃鏌ユ湰鍦颁簨鍔$姸鎬佺殑绫汇€傜ず渚嬩唬鐮佸涓嬶細

public class TransactionListenerImpl implements TransactionListener {

    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}


Producer绀轰緥浠g爜锛?/span>

public class TransactionProducerTest {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("CID_PAY_ACCOUNT");
        producer.setNamesrvAddr("10.211.55.4:9876;10.211.55.5:9876;10.211.55.6:9876");

        ExecutorService executorService = new ThreadPoolExecutor(25100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA""TagB""TagC""TagD""TagE"};

        try {
            Map<String, String> paramMap = new HashMap<>();
            paramMap.put("type""6");
            paramMap.put("bizOrderId""15414012438257823");
            paramMap.put("payOrderId""15414012438257823");
            paramMap.put("amount""10");
            paramMap.put("userId""200001");
            paramMap.put("tradeType""charge");
            paramMap.put("financeStatus""0");//璐㈠姟鐘舵€侊紝搴旀敹
            paramMap.put("channel""a");//浣欓
            paramMap.put("tradeTime""20190101202022");
            paramMap.put("nonce_str""xkdkskskdksk");

            //鎷煎噾娑堟伅浣?/span>
            Message msg = new Message("PAY_ACCOUNT""pre",paramMap.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf("%s%n", sendResult);

            Thread.sleep(10);
        } catch (MQClientException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        Thread.sleep(10*1000);
        producer.shutdown();
    }
}


涓庨潪浜嬪姟娑堟伅鐩存帴璋冪敤RocketMQ Client鐨?em>send鏂规硶涓嶅悓锛屼簨鍔℃秷鎭彂閫侀渶瑕佽缃簨鍔$洃鍚櫒绫伙紝骞惰皟鐢?em>sendMessageInTransaction鏂规硶锛?/em>鑰岃繖涓柟娉曠殑鍏蜂綋閫昏緫涔熷氨鏄笂杩版祦绋嬩腑鎻忚堪鐨勯偅鏍凤紝鍏蜂綋澶у鍙互鐪嬩笅銆?/span>


浠ヤ笂浠g爜鍙槸绀轰緥浠g爜锛屽湪瀹為檯鐨勯」鐩腑鎴戜滑鏄渶瑕佽繘琛屼竴浜涘皝瑁呰璁$殑锛屼互渚夸笌椤圭洰涓婁笅鏂囩幆澧冮泦鎴?/strong>銆備緥濡傚浜嶴pringboot椤圭洰锛屾垜浠竴鑸細缂栧啓涓€涓猻tater宸ョ▼杩涜闆嗘垚銆傚ぇ瀹舵劅鍏磋叮鍙互鍏虫敞涓嬫垜鐨刧ithub椤圭洰锛屽悗闈㈡垜浼氫互鐪熷疄鐨勯」鐩満鏅仛涓€浜涢泦鎴愮ず鑼冦€?/span>

https://github.com/qiaojiang2/springboot-starter


4

鍦烘櫙璇存槑


鐩墠RocketMQ娑堟伅涓棿浠剁殑浣跨敤鍦烘櫙姣旇緝骞挎硾锛屽浜庨渶瑕侀€氳繃MQ杩涜寮傛瑙h€︾殑鍒嗗竷寮忓簲鐢ㄧ郴缁熸潵璇达紝RocketMQ鏃犵枒鏄竴涓笉閿欑殑鎶€鏈€夋嫨銆傛帴涓嬫潵锛屾垜浠氨浠?span>瀵规暟鎹竴鑷存€ц姹傞潪甯搁珮鐨勫垎甯冨紡鏀粯绯荤粺涓轰緥锛屾潵鐪嬬湅鍩轰簬RocketMQ鐨勪簨鍔℃秷鎭€傜敤浜庡摢浜涚壒瀹氬満鏅紝浠庤€屽疄鐜版敮浠樼郴缁熸暟鎹殑楂樺害涓€鑷存€с€?/span>


浜嬪疄涓婏紝鏀粯绯荤粺鐨勬暟鎹竴鑷存€ф槸涓€涓鏉傜殑闂锛屽師鍥犲湪浜庢敮浠樻祦绋嬬殑鍚勪釜鐜妭閮藉瓨鍦ㄥ紓姝ョ殑涓嶇‘瀹氭€э紝渚嬪鏀粯绯荤粺闇€瑕佽窡绗笁鏂规笭閬撹繘琛屼氦浜掞紝涓嶅悓鐨勬敮浠樻笭閬撲氦浜掓祦绋嬪瓨鍦ㄥ樊寮傦紝骞朵笖鏈夊紓姝ユ敮浠樼粨鏋滃洖璋冪殑鎯呭喌銆?/span>


闄ゆ浠ュ锛屾敮浠樼郴缁熷唴閮ㄦ湰韬張鏄敱澶氫釜涓嶅悓瀛愮郴缁熺粍鎴愶紝闄ゆ牳蹇冩敮浠樼郴缁熷锛岃繕鏈夎处鍔$郴缁熴€佸晢鎴烽€氱煡绯荤粺绛夌瓑锛岃€屾牳蹇冩敮浠樼郴缁熸湰韬篃浼氳鎷嗗垎涓哄涓笉鍚岀殑鏈嶅姟妯″潡锛屽椋庢帶銆佽矾鐢辩瓑鐢ㄤ互瀹炵幇涓嶅悓鐨勫姛鑳介€昏緫銆?span>鏌愪簺鍦烘櫙鎴戜滑鏃犳硶閫氳繃鍒嗗竷寮忎簨鍔℃潵瀹炵幇鏁版嵁涓€鑷存€э紝鍙兘閫氳繃棰濆鐨勪笟鍔¤ˉ鍋挎墜娈?/strong>锛屽浜屾杞銆佹敮浠樺璐︾瓑鏉ュ疄鐜?span>鏁版嵁鏈€缁堜竴鑷存€?/strong>銆?/span>


缁间笂鎵€杩帮紝鏀粯绯荤粺鏄竴涓鏉傜殑绯荤粺锛岃瀹屽叏瀹炵幇鏁版嵁鐨勪竴鑷存€у崟闈犳煇涓€绉嶆墜娈垫槸鏃犳硶瀹炵幇鐨?/strong>锛屽ぇ閮ㄥ垎鎯呭喌涓嬫垜浠彲浠ラ€氳繃棰濆鐨勪笟鍔¤ˉ鍋块€昏緫鏉ュ疄鐜版暟鎹渶缁堜竴鑷存€э紝鍙槸杩欐牱琛ュ伩閫昏緫闇€瑕佷互鏇村鐨勪笟鍔″紑鍙戦€昏緫涓轰唬浠凤紝骞朵笖鍦ㄦ椂鏁堟€т笂浼氬瓨鍦ㄥ欢杩熺殑闂銆?/span>


涓句釜渚嬪瓙锛屾敮浠樻牳蹇冪郴缁熸敮浠樻垚鍔熷悗浼氭洿鏂拌嚜宸辩殑璁㈠崟鐘舵€佷负鏀粯鎴愬姛锛屾暣涓牳蹇冧氦鏄撴祦绋嬫槸涓€涓瘮杈冨疄鏃跺悓姝ョ殑鍦烘櫙锛屽鏋滃嚭鐜版暟鎹笉涓€鑷达紝浼氭湁棰濆鐨勮ˉ鍋块€昏緫濡備簩娆℃敮浠樿鍗曠姸鎬佽疆璇€乀+1鏃ュ璐?/strong>绛夌敤浠ョ‘淇濇敮浠樼姸鎬佹暟鎹殑鏈€缁堜竴鑷存€с€備絾鏄櫎浜嗘牳蹇冩敮浠樺锛屾敮浠樻垚鍔熺殑缁撴灉鏄渶瑕侀€氱煡鍒版敮浠樿处鍔$郴缁熴€佷互鍙婁笟鍔$绯荤粺锛岃€屼负浜嗙‘淇濇€ц兘锛屼竴鑸悗缁殑閫氱煡灏变笉浼氫笌涓绘祦绋嬩竴鏍疯璁℃垚瀹炴椂鍚屾锛岃€屾槸閫氳繃MQ寮傛瑙h€﹀彂閫佹秷鎭粰鐙珛鐨?span>鈥滈€氱煡鍝嶅簲妯″潡鈥?/strong>锛岃€屸€滈€氱煡鍝嶅簲妯″潡鈥濇鏃跺氨鍙互閫氳繃鍒嗗竷寮忎簨鍔℃秷鎭潵涓庢敮浠樿处鎴风郴缁熴€佷笟鍔$绛夌郴缁熷疄鐜版暟鎹竴鑷存€э紝浠庤€?span>鍑忓皯闇€瑕佽ˉ鍋挎墜娈靛鐞嗙殑鑼冨洿锛屾彁楂樼郴缁熺殑鏁版嵁涓€鑷存€х瓑绾у拰鐏垫晱搴?/strong>銆?/span>


鎺ㄨ崘闃呰锛?/span>





鍙傝€冭祫鏂欙細


https://blog.csdn.net/qq_27529917/article/details/79802406



鈥斺€斺€斺€斺€擡ND鈥斺€斺€斺€斺€?/span>



璇嗗埆鍥剧墖浜岀淮鐮侊紝鍏虫敞鈥?/span>鏃犳晫鐮佸啘鈥濊幏鍙栫簿褰╁唴瀹?/span>

以上是关于鍒嗗竷寮忎簨鍔′箣濡備綍鍩轰簬RocketMQ鐨勪簨鍔℃秷鎭壒鎬у疄鐜板垎甯冨紡绯荤粺鐨勬渶缁堜竴鑷存€э紵的主要内容,如果未能解决你的问题,请参考以下文章

鍒嗗竷寮忎簨鍔$殑CAP鐞嗚

鍒嗗竷寮忎簨鍔$殑BASE鐞嗚

鍒嗗竷寮忎腑鐨勪竴鑷存€х畻娉曚箣Raft绠楁硶

鍩轰簬SpringBoot鎼缓涓€涓甫鏁版嵁搴撹闂殑WEB椤圭洰(璁板綍鎵€闇€鐨勪緷璧?閰嶇疆,椹卞姩瀹夎绛夋敞鎰忎簨椤?

[Nutch]Hadoop澶氭満瀹屽叏鍒嗗竷寮忔ā寮廻adoop閰嶇疆

涓€鏂囧交搴曟悶鎳?zookeeper 鏍稿績鐭ヨ瘑鐐?