Запрос RPC Spring amqp к rabbitmq получает таймауты

Я использую функцию AmqpTemplate.sendAndReceive с нескольких серверов, она работает, и повторное подключение работает после перезапуска rabbitmq или проблем с сетью.

Он отлично работал в течение нескольких недель, но внезапно один из моих серверов получает тайм-аут для каждого вызова sendAndReceive, rabbitmq получает сообщение и обрабатывается, но sendAndReceive не получает ответа (таймаут ответа установлен на 60 секунд, и он занимает всего несколько секунд для обработки сообщения). Другие серверы работали одновременно с тем же кодом и с той же очередью.

Сервер вернулся к работе только после того, как я перезапустил на нем службу.

Я думаю, что это какая-то проблема с повторным подключением (даже если сообщения отправлены на rabbitmq успешно), возможно, прослушиватель ответа AmqpTemplate не подключился повторно или что-то в этом роде.

Кто-нибудь знает, в чем может быть проблема? и как я могу предотвратить это снова?

моя настройка ConnectionFactory:

setConnectionTimeout(1000);
setRequestedHeartbeat(100);
setTopologyRecoveryEnabled(true);
setAutomaticRecoveryEnabled(true);

Редактировать:

Версия Spring-AMQP: 1.4.3

<bean id="myConnectionFactory" class="path.to.myConnectionFactoryClass"></bean>

<rabbit:connection-factory id="myRabbitConnectionFactory" connection-factory="myConnectionFactory" channel-cache-size="25" />

<rabbit:template id="myTemplate" connection-factory="myRabbitConnectionFactory" reply-timeout="65000" />

Изменить:

Это случилось снова, я вижу, что он перестает работать после того, как получаю ошибку:

org.springframework.amqp.AmqpConnectException: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:51)
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:110)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1051)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:902)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:894)
at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820)
at MyClass.onMessage(MyClass.java:1234)
at sun.reflect.GeneratedMethodAccessor3558.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy95.onMessage(Unknown Source)
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:237)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$100(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$2.doInTransaction(SimpleMessageListenerContainer.java:975)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$2.doInTransaction(SimpleMessageListenerContainer.java:968)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:968)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException
at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:174)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:496)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.createChannel(AutorecoveringConnection.java:96)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:42)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:747)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:736)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:416)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:392)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$500(CachingConnectionFactory.java:75)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:623)
at com.sun.proxy.$Proxy74.basicCancel(Unknown Source)
at org.springframework.amqp.rabbit.core.RabbitTemplate$7.doInRabbit(RabbitTemplate.java:944)
at org.springframework.amqp.rabbit.core.RabbitTemplate$7.doInRabbit(RabbitTemplate.java:902)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1045)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:902)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:894)
at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820)

Не уверен, почему соединение закрылось и почему оно не восстановилось.


person Ido Ganzer    schedule 09.06.2015    source источник
comment
Пожалуйста, предоставьте полную конфигурацию и версию Spring AMQP. Spring AMQP имеет собственный механизм восстановления соединения (который намного старше версии кролика и клиента). Версии до 1.4 несовместимы с rabbitmq automaticRecoveryEnabled.   -  person Gary Russell    schedule 09.06.2015
comment
Спасибо Гэри за ваш комментарий, я использую v1.4.3, и я отредактировал сообщение, добавив больше информации о конфигурации.   -  person Ido Ganzer    schedule 09.06.2015
comment
Это довольно простая установка; на принимающей стороне восстанавливать нечего, поскольку она использует прямой ответ (или временную очередь, в зависимости от версии rabbitmq) для ответов; вы видите что-нибудь полезное в логах? (это приложение и / или журналы rabbitmq).   -  person Gary Russell    schedule 09.06.2015
comment
Это случилось снова, я вижу, что он перестает работать после того, как получаю сообщение об ошибке: com.rabbitmq.client.AlreadyClosedException: соединение уже закрыто из-за ошибки соединения; причина: java.io.EOFException   -  person Ido Ganzer    schedule 23.06.2015
comment
в org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException (RabbitAccessor.java:110) в org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute (RabbitTemplate.java:1051)   -  person Ido Ganzer    schedule 23.06.2015
comment
Почему соединение могло быть закрыто? и почему он не подключился заново?   -  person Ido Ganzer    schedule 23.06.2015
comment
Вам нужно показать полную трассировку стека - не помещайте трассировку стека в комментарии; редактировать вопрос.   -  person Gary Russell    schedule 23.06.2015
comment
Я добавил полную трассировку стека, спасибо Гэри.   -  person Ido Ganzer    schedule 24.06.2015


Ответы (2)


в org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive (RabbitTemplate.java:820)

в MyClass.onMessage (MyClass.java:1234) ...

Так что это не восстановление на стороне сервера; ваш MyClass вызывает операцию отправки и получения другого шаблона кролика после получения сообщения. Так что в этой ситуации он действует как клиент.

Однако обычно это исключение будет выброшено в контейнер, и rabbitmq повторно отправит сообщение (если у вас нет режима подтверждения NONE), пока ваш слушатель выбрасывает исключение в контейнер. Если ваш слушатель перехватывает исключение и ничего не делает с ним, кроме ведения журнала, вы увидите этот результат.

Если вы не хотите, чтобы rabbit повторно отправил неудачное сообщение или входящий контейнер не подтверждает сообщения, вы можете настроить RetryTemplate в исходящем шаблоне rabbit для восстановления соединения; см. документацию.

Если это не объясняет вашу ситуацию, вам необходимо показать полную конфигурацию, код в MyClass.onMessage() и полный журнал.

person Gary Russell    schedule 24.06.2015

Я думаю, что это решается. Я не обнаружил исключения в OnMesaage, и я считаю, что это убило моих слушателей.

person Ido Ganzer    schedule 07.07.2015