คำขอ Spring amqp RPC ไปยัง rabbitmq ได้รับการหมดเวลา

ฉันใช้ Func AmqpTemplate.sendAndReceive จากเซิร์ฟเวอร์หลายเครื่อง ซึ่งใช้งานได้และการเชื่อมต่อใหม่ทำงานได้หลังจากที่ rabbitmq รีสตาร์ทหรือเกิดปัญหาเครือข่าย

มันใช้งานได้ดีเป็นเวลาหลายสัปดาห์ แต่ทันใดนั้นหนึ่งในเซิร์ฟเวอร์ของฉันกำลังหมดเวลาสำหรับการโทร sendAndReceive ทุกครั้ง rabbitmq กำลังรับข้อความและกำลังประมวลผล แต่ sendAndReceive ไม่ได้รับการตอบกลับ (การหมดเวลาตอบกลับตั้งไว้ที่ 60 วินาทีและมัน ใช้เวลาเพียงไม่กี่วินาทีในการประมวลผลข้อความ) เซิร์ฟเวอร์อื่นทำงานพร้อมกันด้วยรหัสเดียวกันและใช้คิวเดียวกัน

เซิร์ฟเวอร์กลับมาทำงานได้หลังจากที่ฉันเริ่มบริการใหม่เท่านั้น

ฉันคิดว่ามันเป็นปัญหาการเชื่อมต่อใหม่ (แม้ว่าข้อความที่ส่งไปยัง rabbitmq สำเร็จ) บางทีผู้ฟังการตอบสนองของ AmqpTemplate อาจไม่ได้เชื่อมต่อใหม่หรืออะไรบางอย่าง

ใครรู้ว่าอาจมีปัญหาอะไร? และฉันจะป้องกันไม่ให้มันเกิดขึ้นอีกได้อย่างไร?

การตั้งค่า ConnectionFactory ของฉัน:

setConnectionTimeout(1000);
setRequestedHeartbeat(100);
setTopologyRecoveryEnabled(true);
setAutomaticRecoveryEnabled(true);

แก้ไข:

เวอร์ชัน Spring-AMQP: 1.4.3

<bean id="myConnectionFactory" class="path.to.myConnectionFactoryClass"></bean>

<rabbit:connection-factory id="myRabbitConnectionFactory" connection-factory="myConnectionFactory" channel-cache-size="25" />

<rabbit:template id="myTemplate" connection-factory="myRabbitConnectionFactory" reply-timeout="65000" />

แก้ไข:

มันเกิดขึ้นอีกครั้ง ฉันเห็นว่ามันหยุดทำงานหลังจากที่ฉันได้รับข้อผิดพลาด:

org.springframework.amqp.AmqpConnectException: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:51)
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:110)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1051)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:902)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:894)
at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820)
at MyClass.onMessage(MyClass.java:1234)
at sun.reflect.GeneratedMethodAccessor3558.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy95.onMessage(Unknown Source)
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:237)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$100(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$2.doInTransaction(SimpleMessageListenerContainer.java:975)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$2.doInTransaction(SimpleMessageListenerContainer.java:968)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:968)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException
at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:174)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:496)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.createChannel(AutorecoveringConnection.java:96)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:42)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:747)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:736)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:416)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:392)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$500(CachingConnectionFactory.java:75)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:623)
at com.sun.proxy.$Proxy74.basicCancel(Unknown Source)
at org.springframework.amqp.rabbit.core.RabbitTemplate$7.doInRabbit(RabbitTemplate.java:944)
at org.springframework.amqp.rabbit.core.RabbitTemplate$7.doInRabbit(RabbitTemplate.java:902)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1045)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:902)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:894)
at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820)

ไม่แน่ใจว่าเหตุใดการเชื่อมต่อจึงปิด และเหตุใดจึงไม่เชื่อมต่อใหม่


person Ido Ganzer    schedule 09.06.2015    source แหล่งที่มา
comment
โปรดระบุการกำหนดค่าที่สมบูรณ์และเวอร์ชัน Spring AMQP Spring AMQP มีกลไกการกู้คืนการเชื่อมต่อของตัวเอง (ซึ่งเกิดขึ้นก่อนเวอร์ชัน Rabbit-Client เป็นเวลานาน) เวอร์ชันก่อน 1.4 เข้ากันไม่ได้กับ rabbitmq automaticRecoveryEnabled   -  person Gary Russell    schedule 09.06.2015
comment
ขอบคุณ Gary สำหรับความคิดเห็นของคุณ ฉันใช้ v1.4.3 และฉันแก้ไขโพสต์ด้วยข้อมูลการกำหนดค่าเพิ่มเติม   -  person Ido Ganzer    schedule 09.06.2015
comment
เป็นการตั้งค่าที่ค่อนข้างง่าย ไม่มีอะไรให้กู้คืนในฝั่งรับเนื่องจากใช้การตอบกลับโดยตรง (หรือคิวชั่วคราว ขึ้นอยู่กับเวอร์ชัน rabbitmq) สำหรับการตอบกลับ คุณเห็นอะไรที่เป็นประโยชน์ในบันทึกบ้างไหม? (แอปพลิเคชันนี้และ/หรือบันทึก rabbitmq)   -  person Gary Russell    schedule 09.06.2015
comment
มันเกิดขึ้นอีกครั้ง ฉันเห็นว่ามันหยุดทำงานหลังจากที่ฉันได้รับข้อผิดพลาด: com.rabbitmq.client.AlreadyClosedException: การเชื่อมต่อถูกปิดแล้วเนื่องจากข้อผิดพลาดในการเชื่อมต่อ; สาเหตุ: java.io.EOFException   -  person Ido Ganzer    schedule 23.06.2015
comment
ที่ org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:110) ที่ org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1051)   -  person Ido Ganzer    schedule 23.06.2015
comment
เหตุใดการเชื่อมต่อจึงปิดลง แล้วทำไมมันไม่เชื่อมต่อใหม่ล่ะ?   -  person Ido Ganzer    schedule 23.06.2015
comment
คุณต้องแสดงการติดตามสแต็กที่สมบูรณ์ - อย่าใส่การติดตามสแต็กในความคิดเห็น แก้ไขคำถาม   -  person Gary Russell    schedule 23.06.2015
comment
ฉันเพิ่ม stacktrace แบบเต็มแล้ว ขอบคุณ Gary   -  person Ido Ganzer    schedule 24.06.2015


คำตอบ (2)


ที่ org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820)

ที่ MyClass.onMessage(MyClass.java:1234) ...

ดังนั้นนี่ไม่ใช่การกู้คืนฝั่งเซิร์ฟเวอร์ MyClass ของคุณกำลังเรียกใช้การดำเนินการส่งและรับเทมเพลต Rabbit อื่นหลังจากได้รับข้อความ ดังนั้นจึงทำหน้าที่เป็นลูกค้าในสถานการณ์นี้

อย่างไรก็ตาม โดยปกติแล้ว ข้อยกเว้นนี้จะถูกส่งไปที่คอนเทนเนอร์ และ rabbitmq จะส่งข้อความอีกครั้ง (เว้นแต่คุณจะมีโหมดรับทราบเป็น NONE) ตราบใดที่ผู้ฟังของคุณส่งข้อยกเว้นไปยังคอนเทนเนอร์ หากผู้ฟังของคุณตรวจพบข้อยกเว้นและไม่ทำอะไรเลยนอกจากการบันทึก คุณจะเห็นผลลัพธ์นี้

หากคุณไม่ต้องการให้ Rabbit ส่งข้อความที่ล้มเหลวอีกครั้ง หรือคอนเทนเนอร์ขาเข้าไม่ยอมรับข้อความ คุณสามารถกำหนดค่า RetryTemplate ลงในเทมเพลต Rabbit ขาออกเพื่อกู้คืนการเชื่อมต่อ ดูเอกสารประกอบ.

หากสิ่งนี้ไม่สามารถอธิบายสถานการณ์ของคุณได้ คุณจะต้องแสดงการกำหนดค่าที่สมบูรณ์ รหัสใน MyClass.onMessage() และบันทึกที่สมบูรณ์

person Gary Russell    schedule 24.06.2015

ฉันคิดว่ามันได้รับการแก้ไขแล้ว ฉันไม่พบข้อยกเว้นใน OnMesaage และฉันเชื่อว่ามันฆ่าผู้ฟังของฉัน

person Ido Ganzer    schedule 07.07.2015