Шаги удаленного раздела Spring Batch Admin, выполняющие максимум 8 потоков, даже если параллелизм равен 10?

Я использую удаленное разбиение на разделы Spring batch для пакетного процесса. Я запускаю рабочие места с помощью администратора Spring Batch.

У меня есть шаг параллелизма потребителя входящего шлюза до 10, но максимальное количество параллельно работающих разделов - 8.

Позже я хочу увеличить количество одновременных запросов потребителей до 15.

Ниже моя конфигурация,

<task:executor id="taskExecutor" pool-size="50" />

<rabbit:template id="computeAmqpTemplate"
    connection-factory="rabbitConnectionFactory" routing-key="computeQueue"
    reply-timeout="${compute.partition.timeout}">
</rabbit:template>

<int:channel id="computeOutboundChannel">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<int:channel id="computeInboundStagingChannel" />

<amqp:outbound-gateway request-channel="computeOutboundChannel"
    reply-channel="computeInboundStagingChannel" amqp-template="computeAmqpTemplate"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<beans:bean id="computeMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="computeOutboundChannel"
    p:receiveTimeout="${compute.partition.timeout}" />


<beans:bean id="computePartitionHandler"
    class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
    p:stepName="computeStep" p:gridSize="${compute.grid.size}"
    p:messagingOperations-ref="computeMessagingTemplate" />

<int:aggregator ref="computePartitionHandler"
    send-partial-result-on-expiry="true" send-timeout="${compute.step.timeout}"
    input-channel="computeInboundStagingChannel" />

<amqp:inbound-gateway concurrent-consumers="${compute.consumer.concurrency}"
    request-channel="computeInboundChannel" 
    reply-channel="computeOutboundStagingChannel" queue-names="computeQueue"
    connection-factory="rabbitConnectionFactory"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<int:channel id="computeInboundChannel" />

<int:service-activator ref="stepExecutionRequestHandler"
    input-channel="computeInboundChannel" output-channel="computeOutboundStagingChannel" />

<int:channel id="computeOutboundStagingChannel" />

<beans:bean id="computePartitioner"
    class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
    p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
    scope="step" />



<beans:bean id="computeFileItemReader"
    class="org.springframework.batch.item.file.FlatFileItemReader"
    p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
    scope="step" />

<beans:bean id="computeItemWriter"
    class="com.st.batch.foundation.writers.ComputeItemWriter"
    p:symfony-ref="symfonyStepScoped" p:timeout="${compute.item.timeout}"
    p:batchId="#{jobParameters[batch_id]}" scope="step" />


<step id="computeStep">
    <tasklet transaction-manager="transactionManager">
        <chunk reader="computeFileItemReader" writer="computeItemWriter"
            commit-interval="${compute.commit.interval}" />
    </tasklet>
</step>

<flow id="computeFlow">
    <step id="computeStep.master">
        <partition partitioner="computePartitioner"
            handler="computePartitionHandler" />
    </step>
</flow>

<job id="computeJob" restartable="true">
    <flow id="computeJob.computeFlow" parent="computeFlow" />
</job>



compute.grid.size = 112
compute.consumer.concurrency = 10

Input files are splited to 112 equal parts = compute.grid.size = total number of partitions

Number of servers = 4.

Есть 2 проблемы,

i) Несмотря на то, что я установил для параллелизма значение 10, максимальное количество запущенных потоков равно 8.

ii)

некоторые из них работают медленнее, поскольку на них выполняются другие процессы, а некоторые - быстрее, поэтому я хочу убедиться, что выполнение шагов распределяется справедливо, т.е. если более быстрые серверы выполняются с их выполнением, другие оставшиеся выполнения в очереди должны перейти к ним. Он не должен распространяться в стиле кругового роббина.

Я знаю, что в rabbitmq есть настройка подсчета предварительной выборки и режим подтверждения для более быстрого распространения. Для весенней интеграции счетчик предварительной выборки по умолчанию равен 1, а режим подтверждения по умолчанию - АВТО. Но все же на некоторых серверах сохраняется больше разделов, хотя другие серверы уже давно созданы. В идеале серверы не должны простаивать.

Обновление:

Еще одна вещь, которую я сейчас заметил, заключается в том, что для некоторых шагов, которые выполняются параллельно с использованием split (не распределяются с использованием удаленного разбиения), также параллельно выполняется max 8. Это похоже на проблему с ограничением пула потоков, но, как вы можете видеть, taskExecutor имеет размер пула, равный 50.

Есть ли что-нибудь в spring-batch / spring-batch-admin, которое ограничивает количество одновременно выполняемых шагов?

2-е обновление:

И, если в элементах параллельной обработки запущено 8 или более потоков, администратор пакета Spring не загружается. Просто зависает. Если я уменьшу параллелизм, загрузится администратор весенней партии. Я даже протестировал его, установив параллелизм 4 на одном сервере и 8 на другом сервере, администратор весеннего пакета не загружает его. Я использую URL-адрес сервера, на котором запущено 8 потоков, но он работает на сервере, на котором запущено 4 потока.

Менеджер администратора Spring Batch имеет ниже конфигурацию jobLauncher,

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
    <property name="taskExecutor" ref="jobLauncherTaskExecutor" />
</bean>

<task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />

Там размер пула 6, это как-то связано с вышеуказанной проблемой?

Или есть что-нибудь в tomcat 7, которое ограничивает количество выполняемых потоков до 8?


person vishal    schedule 04.07.2014    source источник
comment
Привет, Вишал, у меня такая же проблема. Вы решили эту проблему? Если да, могу я узнать, каково было ваше решение ...   -  person The Guest    schedule 19.09.2016


Ответы (2)


Вы используете базу данных для JobRepository?

Во время выполнения пакетные фреймворки сохраняют выполнение шагов, и количество подключений к базе данных JobRepository может мешать параллельному выполнению шагов.

Параллелизм 8 заставляет меня думать, что вы можете использовать BasicDataSource? Если да, переключитесь на что-нибудь вроде DriverManagerDataSource и посмотрите.

person Parag Kutarekar    schedule 24.05.2016

В замешательстве - вы сказали: «Я установил параллелизм на 10», но затем показываете compute.consumer.concurrency = 8. Итак, он работает, как настроено. Если для свойства установлено значение 10, невозможно иметь только 8 потоков-потребителей.

С точки зрения Rabbit, все потребители равны - если есть 10 потребителей в медленном ящике и 10 потребителей в быстром ящике, и у вас есть только 10 разделов, вполне возможно, что все 10 разделов окажутся в медленном ящике.

RabbitMQ не распределяет работу по серверам, он распределяет работу только между потребителями.

Вы можете улучшить распределение, уменьшив параллелизм. Вам также следует установить более низкий уровень параллелизма на более медленных полях.

person Gary Russell    schedule 04.07.2014
comment
извините за опечатку. Фактически значение в моей конфигурации - 10. - person vishal; 04.07.2014
comment
Да, я установил параллелизм ниже. Но в идеале, если потребители на более медленных серверах заняты, сообщения должны приниматься потребителями на других серверах, как бы то ни было, потребители на более медленном сервере продолжают получать сообщения, даже если потребители более быстрых серверов бездействуют. - person vishal; 04.07.2014
comment
обновленный вопрос с дополнительной информацией, проблема spring-batch / spring-batch-admin - person vishal; 04.07.2014
comment
Если concurrent-consumers равно 10, будет 10 потоков. Период. В SI / SB нет ничего, что ограничивало бы его до 8. Как я уже сказал, Rabbit не знает, находится ли следующий потребитель на занятом или простаивающем сервере. Похоже, у вас слишком много потребителей, подготовленных для ваших нужд. Если для одних пакетных заданий требуется больше разделов, чем для других, рассмотрите возможность использования других конфигураций. - person Gary Russell; 05.07.2014
comment
В идеале он должен работать 10, но это не так. Я добавил еще одно наблюдение. - person vishal; 07.07.2014