ฉันกำลังใช้การแบ่งพาร์ติชันระยะไกลแบบแบตช์สปริงสำหรับกระบวนการแบตช์ ฉันกำลังเปิดตัวงานโดยใช้ผู้ดูแลระบบชุดสปริง
ฉันมีขั้นตอนการทำงานพร้อมกันของผู้บริโภคเกตเวย์ขาเข้าเป็น 10 แต่จำนวนพาร์ติชันสูงสุดที่ทำงานพร้อมกันคือ 8
ฉันต้องการเพิ่มการทำงานพร้อมกันของผู้บริโภคเป็น 15 ในภายหลัง
ด้านล่างคือการกำหนดค่าของฉัน
<task:executor id="taskExecutor" pool-size="50" />
<rabbit:template id="computeAmqpTemplate"
connection-factory="rabbitConnectionFactory" routing-key="computeQueue"
reply-timeout="${compute.partition.timeout}">
</rabbit:template>
<int:channel id="computeOutboundChannel">
<int:dispatcher task-executor="taskExecutor" />
</int:channel>
<int:channel id="computeInboundStagingChannel" />
<amqp:outbound-gateway request-channel="computeOutboundChannel"
reply-channel="computeInboundStagingChannel" amqp-template="computeAmqpTemplate"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<beans:bean id="computeMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="computeOutboundChannel"
p:receiveTimeout="${compute.partition.timeout}" />
<beans:bean id="computePartitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
p:stepName="computeStep" p:gridSize="${compute.grid.size}"
p:messagingOperations-ref="computeMessagingTemplate" />
<int:aggregator ref="computePartitionHandler"
send-partial-result-on-expiry="true" send-timeout="${compute.step.timeout}"
input-channel="computeInboundStagingChannel" />
<amqp:inbound-gateway concurrent-consumers="${compute.consumer.concurrency}"
request-channel="computeInboundChannel"
reply-channel="computeOutboundStagingChannel" queue-names="computeQueue"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<int:channel id="computeInboundChannel" />
<int:service-activator ref="stepExecutionRequestHandler"
input-channel="computeInboundChannel" output-channel="computeOutboundStagingChannel" />
<int:channel id="computeOutboundStagingChannel" />
<beans:bean id="computePartitioner"
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
scope="step" />
<beans:bean id="computeFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader"
p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
scope="step" />
<beans:bean id="computeItemWriter"
class="com.st.batch.foundation.writers.ComputeItemWriter"
p:symfony-ref="symfonyStepScoped" p:timeout="${compute.item.timeout}"
p:batchId="#{jobParameters[batch_id]}" scope="step" />
<step id="computeStep">
<tasklet transaction-manager="transactionManager">
<chunk reader="computeFileItemReader" writer="computeItemWriter"
commit-interval="${compute.commit.interval}" />
</tasklet>
</step>
<flow id="computeFlow">
<step id="computeStep.master">
<partition partitioner="computePartitioner"
handler="computePartitionHandler" />
</step>
</flow>
<job id="computeJob" restartable="true">
<flow id="computeJob.computeFlow" parent="computeFlow" />
</job>
compute.grid.size = 112
compute.consumer.concurrency = 10
Input files are splited to 112 equal parts = compute.grid.size = total number of partitions
Number of servers = 4.
มี 2 ปัญหา คือ
i) แม้ว่าฉันจะตั้งค่าการทำงานพร้อมกันเป็น 10 แต่จำนวนเธรดสูงสุดที่ทำงานอยู่คือ 8
ii)
บางส่วนทำงานช้าลงเนื่องจากกระบวนการอื่นๆ ทำงานบนนั้น และบางส่วนทำงานเร็วกว่า ดังนั้นฉันต้องการให้แน่ใจว่าการดำเนินการตามขั้นตอนมีการกระจายอย่างยุติธรรม กล่าวคือ หากเซิร์ฟเวอร์ที่เร็วกว่าดำเนินการเสร็จสิ้นแล้ว การดำเนินการอื่นๆ ที่เหลือในคิวควรไปที่พวกเขา ไม่ควรแจกแบบกลมร็อบบิ้น
ฉันรู้ว่าใน rabbitmq มีการตั้งค่าการนับการดึงข้อมูลล่วงหน้าและโหมด ack เพื่อกระจายค่าโดยสาร สำหรับการรวมสปริง จำนวนการดึงข้อมูลล่วงหน้าจะเป็น 1 ค่าเริ่มต้น และโหมด ack จะเป็นอัตโนมัติตามค่าเริ่มต้น แต่เซิร์ฟเวอร์บางตัวยังคงใช้งานพาร์ติชั่นมากขึ้นแม้ว่าเซิร์ฟเวอร์อื่น ๆ จะทำงานเป็นเวลานานก็ตาม ตามหลักการแล้ว เซิร์ฟเวอร์ไม่ควรไม่ได้ใช้งาน
อัปเดต:
อีกสิ่งหนึ่งที่ฉันสังเกตเห็นก็คือ สำหรับบางขั้นตอนที่ทำงานแบบขนานโดยใช้การแยก (ไม่กระจายโดยใช้การแบ่งพาร์ติชันระยะไกล) ก็รันสูงสุด 8 แบบขนานด้วย ดูเหมือนปัญหาขีดจำกัดของเธรดพูล แต่อย่างที่คุณเห็นว่า TaskExecutor มีขนาดพูลที่ตั้งค่าไว้ที่ 50
มีอะไรใน spring-batch/spring-batch-admin ซึ่งจำกัดจำนวนขั้นตอนการทำงานพร้อมกันหรือไม่
อัปเดตครั้งที่ 2:
และหากมี 8 เธรดขึ้นไปที่ทำงานในรายการการประมวลผลแบบขนาน ผู้ดูแลระบบชุดสปริงจะไม่โหลด มันแค่แฮงค์ ถ้าฉันลดการทำงานพร้อมกัน ผู้ดูแลระบบชุดสปริงจะโหลด ฉันยังทดสอบด้วยการตั้งค่าการทำงานพร้อมกัน 4 บนเซิร์ฟเวอร์หนึ่งและ 8 บนเซิร์ฟเวอร์อื่น ผู้ดูแลระบบชุดสปริงไม่โหลด ฉันใช้ URL ของเซิร์ฟเวอร์ที่มี 8 เธรดกำลังทำงานอยู่ แต่ใช้งานได้บนเซิร์ฟเวอร์ที่มี 4 เธรดกำลังทำงานอยู่
ผู้จัดการผู้ดูแลระบบชุดสปริงมีการกำหนดค่า jobLauncher ต่ำกว่า
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor" ref="jobLauncherTaskExecutor" />
</bean>
<task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />
ที่นั่นมีขนาด 6 สระ เกี่ยวข้องกับปัญหาข้างต้นหรือไม่
หรือมีอะไรใน Tomcat 7 ที่จำกัดจำนวนเธรดที่ทำงานอยู่ที่ 8 หรือไม่