Saat ini saya sedang memproses aliran file baris demi baris dengan menjalankannya melalui aliran transformasi yang memancarkan peristiwa 'line'
. Saya ingin, setelah mengetahui bahwa baris saat ini cocok dengan beberapa kriteria, menghentikan sementara aliran file masukan, mulai memproses aliran baru, dan ketika selesai, melanjutkan pemrosesan aliran asli baris demi baris. Saya telah meringkasnya menjadi contoh minimal di bawah ini:
tes.kopi:
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.resume()
else
process.stdout.write line
tes-transformasi.kopi:
Transform = require('stream').Transform
module.exports =
class TestTransform extends Transform
constructor: ->
Transform.call @, readableObjectMode: true
@buffer = ""
pushLines: ->
newlineIndex = @buffer.indexOf "\n"
while newlineIndex isnt -1
@push @buffer.substr(0, newlineIndex + 1)
@emit 'line', @buffer.substr(0, newlineIndex + 1)
@buffer = @buffer.substr(newlineIndex + 1)
newlineIndex = @buffer.indexOf "\n"
_transform: (chunk, enc, cb) ->
@buffer = @buffer + chunk.toString()
@pushLines()
cb?()
_flush: (cb) ->
@pushLines()
@buffer += "\n" # ending newline
@push @buffer
@emit 'line', @buffer # push last line
@buffer = ""
cb?()
(Jangan terlalu khawatir tentang aliran Transform, ini hanya sebuah contoh.) Bagaimanapun, output dari coffee test.coffee
terlihat seperti:
-->fs = require 'fs'
-->
-->TestTransform = require './test-transform'
-->
-->inStream = new TestTransform
-->
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->
-->inStream.on 'line', (line) ->
--> process.stdout.write "-->"
--> if line.match /line\.match/g
PAUSE
--> process.stdout.write line
--> console.error "PAUSE"
--> inStream.pause()
--> fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
--> console.error "UNPAUSE"
--> inStream.unpause()
--> else
--> process.stdout.write line
-->
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.unpause()
else
process.stdout.write line
Jadi jelas, pipa tidak dijeda, hanya berlanjut hingga selesai (walaupun PAUSE
dijalankan seperti yang diharapkan), dan karena "UNPAUSE"
juga tidak pernah ditulis, panggilan balik 'end'
tidak pernah diaktifkan. Mengalihkan aliran untuk menjeda/membatalkan jeda ke readStream dari aliran transformasi tampaknya juga tidak berhasil. Saya berasumsi dari perilaku ini bahwa aliran node entah bagaimana tidak menghormati jeda/batalkan jeda dari dalam panggilan balik acara.
Mungkin juga ada cara lain untuk mencapai hal ini tanpa memanggil jeda/batalkan jeda; jika ada cara seperti menunggu akhir aliran dan menjeda rangkaian eksekusi saat ini, itu secara efektif akan melakukan apa yang saya coba lakukan.
concat-stream
. Kalau tidak, Anda memerlukan abstraksi di atas aliran. Salah satu cara yang menarik adalah dengan menggunakan sesuatu seperti Dust.js, yang dapat menyisipkan aliran secara asli. - person Interrobang   schedule 18.04.2015