ขั้นตอนพาร์ติชันระยะไกลของผู้ดูแลระบบชุดสปริงที่ใช้งานสูงสุด 8 เธรดแม้ว่าการทำงานพร้อมกันคือ 10

ฉันกำลังใช้การแบ่งพาร์ติชันระยะไกลแบบแบตช์สปริงสำหรับกระบวนการแบตช์ ฉันกำลังเปิดตัวงานโดยใช้ผู้ดูแลระบบชุดสปริง

ฉันมีขั้นตอนการทำงานพร้อมกันของผู้บริโภคเกตเวย์ขาเข้าเป็น 10 แต่จำนวนพาร์ติชันสูงสุดที่ทำงานพร้อมกันคือ 8

ฉันต้องการเพิ่มการทำงานพร้อมกันของผู้บริโภคเป็น 15 ในภายหลัง

ด้านล่างคือการกำหนดค่าของฉัน

<task:executor id="taskExecutor" pool-size="50" />

<rabbit:template id="computeAmqpTemplate"
    connection-factory="rabbitConnectionFactory" routing-key="computeQueue"
    reply-timeout="${compute.partition.timeout}">
</rabbit:template>

<int:channel id="computeOutboundChannel">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<int:channel id="computeInboundStagingChannel" />

<amqp:outbound-gateway request-channel="computeOutboundChannel"
    reply-channel="computeInboundStagingChannel" amqp-template="computeAmqpTemplate"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<beans:bean id="computeMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="computeOutboundChannel"
    p:receiveTimeout="${compute.partition.timeout}" />


<beans:bean id="computePartitionHandler"
    class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
    p:stepName="computeStep" p:gridSize="${compute.grid.size}"
    p:messagingOperations-ref="computeMessagingTemplate" />

<int:aggregator ref="computePartitionHandler"
    send-partial-result-on-expiry="true" send-timeout="${compute.step.timeout}"
    input-channel="computeInboundStagingChannel" />

<amqp:inbound-gateway concurrent-consumers="${compute.consumer.concurrency}"
    request-channel="computeInboundChannel" 
    reply-channel="computeOutboundStagingChannel" queue-names="computeQueue"
    connection-factory="rabbitConnectionFactory"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<int:channel id="computeInboundChannel" />

<int:service-activator ref="stepExecutionRequestHandler"
    input-channel="computeInboundChannel" output-channel="computeOutboundStagingChannel" />

<int:channel id="computeOutboundStagingChannel" />

<beans:bean id="computePartitioner"
    class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
    p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
    scope="step" />



<beans:bean id="computeFileItemReader"
    class="org.springframework.batch.item.file.FlatFileItemReader"
    p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
    scope="step" />

<beans:bean id="computeItemWriter"
    class="com.st.batch.foundation.writers.ComputeItemWriter"
    p:symfony-ref="symfonyStepScoped" p:timeout="${compute.item.timeout}"
    p:batchId="#{jobParameters[batch_id]}" scope="step" />


<step id="computeStep">
    <tasklet transaction-manager="transactionManager">
        <chunk reader="computeFileItemReader" writer="computeItemWriter"
            commit-interval="${compute.commit.interval}" />
    </tasklet>
</step>

<flow id="computeFlow">
    <step id="computeStep.master">
        <partition partitioner="computePartitioner"
            handler="computePartitionHandler" />
    </step>
</flow>

<job id="computeJob" restartable="true">
    <flow id="computeJob.computeFlow" parent="computeFlow" />
</job>



compute.grid.size = 112
compute.consumer.concurrency = 10

Input files are splited to 112 equal parts = compute.grid.size = total number of partitions

Number of servers = 4.

มี 2 ​​ปัญหา คือ

i) แม้ว่าฉันจะตั้งค่าการทำงานพร้อมกันเป็น 10 แต่จำนวนเธรดสูงสุดที่ทำงานอยู่คือ 8

ii)

บางส่วนทำงานช้าลงเนื่องจากกระบวนการอื่นๆ ทำงานบนนั้น และบางส่วนทำงานเร็วกว่า ดังนั้นฉันต้องการให้แน่ใจว่าการดำเนินการตามขั้นตอนมีการกระจายอย่างยุติธรรม กล่าวคือ หากเซิร์ฟเวอร์ที่เร็วกว่าดำเนินการเสร็จสิ้นแล้ว การดำเนินการอื่นๆ ที่เหลือในคิวควรไปที่พวกเขา ไม่ควรแจกแบบกลมร็อบบิ้น

ฉันรู้ว่าใน rabbitmq มีการตั้งค่าการนับการดึงข้อมูลล่วงหน้าและโหมด ack เพื่อกระจายค่าโดยสาร สำหรับการรวมสปริง จำนวนการดึงข้อมูลล่วงหน้าจะเป็น 1 ค่าเริ่มต้น และโหมด ack จะเป็นอัตโนมัติตามค่าเริ่มต้น แต่เซิร์ฟเวอร์บางตัวยังคงใช้งานพาร์ติชั่นมากขึ้นแม้ว่าเซิร์ฟเวอร์อื่น ๆ จะทำงานเป็นเวลานานก็ตาม ตามหลักการแล้ว เซิร์ฟเวอร์ไม่ควรไม่ได้ใช้งาน

อัปเดต:

อีกสิ่งหนึ่งที่ฉันสังเกตเห็นก็คือ สำหรับบางขั้นตอนที่ทำงานแบบขนานโดยใช้การแยก (ไม่กระจายโดยใช้การแบ่งพาร์ติชันระยะไกล) ก็รันสูงสุด 8 แบบขนานด้วย ดูเหมือนปัญหาขีดจำกัดของเธรดพูล แต่อย่างที่คุณเห็นว่า TaskExecutor มีขนาดพูลที่ตั้งค่าไว้ที่ 50

มีอะไรใน spring-batch/spring-batch-admin ซึ่งจำกัดจำนวนขั้นตอนการทำงานพร้อมกันหรือไม่

อัปเดตครั้งที่ 2:

และหากมี 8 เธรดขึ้นไปที่ทำงานในรายการการประมวลผลแบบขนาน ผู้ดูแลระบบชุดสปริงจะไม่โหลด มันแค่แฮงค์ ถ้าฉันลดการทำงานพร้อมกัน ผู้ดูแลระบบชุดสปริงจะโหลด ฉันยังทดสอบด้วยการตั้งค่าการทำงานพร้อมกัน 4 บนเซิร์ฟเวอร์หนึ่งและ 8 บนเซิร์ฟเวอร์อื่น ผู้ดูแลระบบชุดสปริงไม่โหลด ฉันใช้ URL ของเซิร์ฟเวอร์ที่มี 8 เธรดกำลังทำงานอยู่ แต่ใช้งานได้บนเซิร์ฟเวอร์ที่มี 4 เธรดกำลังทำงานอยู่

ผู้จัดการผู้ดูแลระบบชุดสปริงมีการกำหนดค่า jobLauncher ต่ำกว่า

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
    <property name="taskExecutor" ref="jobLauncherTaskExecutor" />
</bean>

<task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />

ที่นั่นมีขนาด 6 สระ เกี่ยวข้องกับปัญหาข้างต้นหรือไม่

หรือมีอะไรใน Tomcat 7 ที่จำกัดจำนวนเธรดที่ทำงานอยู่ที่ 8 หรือไม่


person vishal    schedule 04.07.2014    source แหล่งที่มา
comment
สวัสดี Vishal ฉันมีปัญหาเดียวกัน คุณได้รับการแก้ไขนี้แล้วหรือยัง? ถ้าเป็นเช่นนั้นฉันขอทราบวิธีแก้ปัญหาของคุณได้ไหม .....   -  person The Guest    schedule 19.09.2016


คำตอบ (2)


คุณใช้ฐานข้อมูลสำหรับ JobRepository หรือไม่?

ในระหว่างการดำเนินการ ชุดเฟรมเวิร์กยังคงมีการดำเนินการตามขั้นตอน และจำนวนการเชื่อมต่อกับฐานข้อมูล JobRepository อาจรบกวนการดำเนินการขั้นตอนคู่ขนานได้

การเกิดขึ้นพร้อมกันของ 8 ทำให้ฉันคิดว่าคุณอาจใช้ BasicDataSource? หากเป็นเช่นนั้น ให้เปลี่ยนไปใช้บางอย่างเช่น DriverManagerDataSource และดู

person Parag Kutarekar    schedule 24.05.2016

สับสน - คุณพูดว่า "ฉันได้ตั้งค่าการทำงานพร้อมกันเป็น 10" แต่กลับแสดง compute.consumer.concurrency = 8 ดังนั้นจึงทำงานตามที่กำหนดค่าไว้ เป็นไปไม่ได้ที่จะมีเธรดผู้บริโภคเพียง 8 รายการหากตั้งค่าคุณสมบัติเป็น 10

จากมุมมองของ Rabbit ผู้บริโภคทุกคนเท่าเทียมกัน - หากมีผู้บริโภค 10 รายในกล่องที่ช้าและ 10 ผู้บริโภคในกล่องที่เร็ว และคุณมีพาร์ติชั่นเพียง 10 พาร์ติชั่น ก็เป็นไปได้ที่ทั้ง 10 พาร์ติชั่นจะไปจบลงที่กล่องที่ช้า

RabbitMQ ไม่กระจายงานข้ามเซิร์ฟเวอร์ แต่จะกระจายงานไปยังผู้บริโภคเท่านั้น

คุณอาจได้รับการกระจายที่ดีขึ้นโดยการลดการทำงานพร้อมกัน คุณควรตั้งค่าการทำงานพร้อมกันให้ต่ำลงในกล่องที่ช้ากว่า

person Gary Russell    schedule 04.07.2014
comment
ขออภัยที่พิมพ์ผิดในคำถาม ค่าในการกำหนดค่าของฉันคือ 10 จริง ๆ แล้ว - person vishal; 04.07.2014
comment
ใช่ ฉันตั้งค่าการทำงานพร้อมกันให้ต่ำลง แต่ตามหลักการแล้ว หากผู้บริโภคบนเซิร์ฟเวอร์ที่ช้ากว่าไม่ว่าง ผู้บริโภคบนเซิร์ฟเวอร์อื่นควรรับข้อความ ดูเหมือนว่าผู้บริโภคบนเซิร์ฟเวอร์ที่ช้ากว่าจะได้รับข้อความต่อไป แม้ว่าผู้ใช้เซิร์ฟเวอร์ที่เร็วกว่าจะนั่งเฉยๆ - person vishal; 04.07.2014
comment
อัปเดตคำถามพร้อมข้อมูลเพิ่มเติม ปัญหา spring-batch/spring-batch-admin - person vishal; 04.07.2014
comment
หาก concurrent-consumers เป็น 10 จะมี 10 เธรด ระยะเวลา. ไม่มีสิ่งใดใน SI/SB ที่จะจำกัดไว้ที่ 8 อย่างที่ฉันบอกไปแล้ว Rabbit ไม่รู้ว่าผู้บริโภครายต่อไปอยู่บนเซิร์ฟเวอร์ที่ไม่ว่างหรือไม่ได้ใช้งานหรือไม่ ดูเหมือนว่าคุณมีผู้บริโภคที่จัดสรรไว้สำหรับความต้องการของคุณมากเกินไป หากงานแบตช์บางงานต้องการพาร์ติชันมากกว่างานอื่น ให้พิจารณาใช้การกำหนดค่าที่แตกต่างกัน - person Gary Russell; 05.07.2014
comment
ตามหลักการแล้ว มันควรจะรัน 10 แต่มันไม่ทำงาน ผมเพิ่มข้อสังเกตอีกอย่างหนึ่ง - person vishal; 07.07.2014