Saya menggunakan partisi jarak jauh batch pegas untuk proses batch. Saya meluncurkan pekerjaan menggunakan admin batch musim semi.
Saya memiliki langkah konkurensi konsumen gateway masuk ke 10 tetapi jumlah maksimum partisi yang berjalan secara paralel adalah 8.
Saya ingin meningkatkan konkurensi konsumen menjadi 15 nanti.
Di bawah ini adalah konfigurasi saya,
<task:executor id="taskExecutor" pool-size="50" />
<rabbit:template id="computeAmqpTemplate"
connection-factory="rabbitConnectionFactory" routing-key="computeQueue"
reply-timeout="${compute.partition.timeout}">
</rabbit:template>
<int:channel id="computeOutboundChannel">
<int:dispatcher task-executor="taskExecutor" />
</int:channel>
<int:channel id="computeInboundStagingChannel" />
<amqp:outbound-gateway request-channel="computeOutboundChannel"
reply-channel="computeInboundStagingChannel" amqp-template="computeAmqpTemplate"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<beans:bean id="computeMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="computeOutboundChannel"
p:receiveTimeout="${compute.partition.timeout}" />
<beans:bean id="computePartitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
p:stepName="computeStep" p:gridSize="${compute.grid.size}"
p:messagingOperations-ref="computeMessagingTemplate" />
<int:aggregator ref="computePartitionHandler"
send-partial-result-on-expiry="true" send-timeout="${compute.step.timeout}"
input-channel="computeInboundStagingChannel" />
<amqp:inbound-gateway concurrent-consumers="${compute.consumer.concurrency}"
request-channel="computeInboundChannel"
reply-channel="computeOutboundStagingChannel" queue-names="computeQueue"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<int:channel id="computeInboundChannel" />
<int:service-activator ref="stepExecutionRequestHandler"
input-channel="computeInboundChannel" output-channel="computeOutboundStagingChannel" />
<int:channel id="computeOutboundStagingChannel" />
<beans:bean id="computePartitioner"
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
scope="step" />
<beans:bean id="computeFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader"
p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
scope="step" />
<beans:bean id="computeItemWriter"
class="com.st.batch.foundation.writers.ComputeItemWriter"
p:symfony-ref="symfonyStepScoped" p:timeout="${compute.item.timeout}"
p:batchId="#{jobParameters[batch_id]}" scope="step" />
<step id="computeStep">
<tasklet transaction-manager="transactionManager">
<chunk reader="computeFileItemReader" writer="computeItemWriter"
commit-interval="${compute.commit.interval}" />
</tasklet>
</step>
<flow id="computeFlow">
<step id="computeStep.master">
<partition partitioner="computePartitioner"
handler="computePartitionHandler" />
</step>
</flow>
<job id="computeJob" restartable="true">
<flow id="computeJob.computeFlow" parent="computeFlow" />
</job>
compute.grid.size = 112
compute.consumer.concurrency = 10
Input files are splited to 112 equal parts = compute.grid.size = total number of partitions
Number of servers = 4.
Ada 2 masalah,
i) Meskipun saya telah menyetel konkurensi ke 10, jumlah maksimum thread yang berjalan adalah 8.
ii)
ada yang lebih lambat karena proses lain berjalan di dalamnya dan ada pula yang lebih cepat jadi saya ingin memastikan langkah eksekusi didistribusikan secara adil yaitu jika server yang lebih cepat selesai dengan eksekusinya, eksekusi lain yang tersisa dalam antrian harus diberikan kepada mereka. Ini tidak boleh didistribusikan secara round robbin.
Saya tahu di Rabbitmq ada pengaturan hitungan prefetch dan mode ack untuk mendistribusikan tarif. Untuk integrasi pegas, hitungan prefetch adalah 1 default dan mode ack adalah AUTO secara default. Namun masih ada beberapa server yang tetap menjalankan lebih banyak partisi meskipun server lain sudah selesai dalam waktu lama. Idealnya tidak ada server yang menganggur.
Pembaruan:
Satu hal lagi yang sekarang saya amati adalah, untuk beberapa langkah yang berjalan secara paralel menggunakan split (tidak didistribusikan menggunakan partisi jarak jauh) juga dijalankan maksimal 8 secara paralel. Ini terlihat seperti masalah batas kumpulan utas tetapi seperti yang Anda lihat taskExecutor memiliki ukuran kumpulan yang disetel ke 50.
Apakah ada sesuatu di spring-batch/spring-batch-admin yang membatasi jumlah langkah yang berjalan secara bersamaan?
Pembaruan ke-2:
Dan, jika ada 8 atau lebih thread yang berjalan dalam item pemrosesan paralel, admin batch pegas tidak memuat. Itu hanya hang. Jika saya mengurangi konkurensi, admin batch musim semi akan memuat. Saya bahkan mengujinya dengan mengatur konkurensi 4 di satu server dan 8 di server lain, admin batch musim semi tidak memuatnya. Saya menggunakan URL server tempat 8 utas berjalan tetapi berfungsi di server tempat 4 utas berjalan.
Manajer admin batch musim semi memiliki konfigurasi jobLauncher di bawah,
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor" ref="jobLauncherTaskExecutor" />
</bean>
<task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />
Ukuran kolamnya ada 6, ada hubungannya dengan masalah di atas?
Atau adakah sesuatu di Tomcat 7 yang membatasi jumlah utas yang berjalan hingga 8?