cara menjeda dan membatalkan jeda aliran objek node saat memproses outputnya

Saat ini saya sedang memproses aliran file baris demi baris dengan menjalankannya melalui aliran transformasi yang memancarkan peristiwa 'line'. Saya ingin, setelah mengetahui bahwa baris saat ini cocok dengan beberapa kriteria, menghentikan sementara aliran file masukan, mulai memproses aliran baru, dan ketika selesai, melanjutkan pemrosesan aliran asli baris demi baris. Saya telah meringkasnya menjadi contoh minimal di bawah ini:

tes.kopi:

fs = require 'fs'    
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.pause()
    fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
      console.error "UNPAUSE"
      inStream.resume()
  else
    process.stdout.write line

tes-transformasi.kopi:

Transform = require('stream').Transform

module.exports =
class TestTransform extends Transform
  constructor: ->
    Transform.call @, readableObjectMode: true
    @buffer = ""

  pushLines: ->
    newlineIndex = @buffer.indexOf "\n"
    while newlineIndex isnt -1
      @push @buffer.substr(0, newlineIndex + 1)
      @emit 'line', @buffer.substr(0, newlineIndex + 1)
      @buffer = @buffer.substr(newlineIndex + 1)
      newlineIndex = @buffer.indexOf "\n"

  _transform: (chunk, enc, cb) ->
    @buffer = @buffer + chunk.toString()
    @pushLines()
    cb?()

  _flush: (cb) ->
    @pushLines()
    @buffer += "\n"             # ending newline
    @push @buffer
    @emit 'line', @buffer       # push last line
    @buffer = ""
    cb?()

(Jangan terlalu khawatir tentang aliran Transform, ini hanya sebuah contoh.) Bagaimanapun, output dari coffee test.coffee terlihat seperti:

-->fs = require 'fs'
-->
-->TestTransform = require './test-transform'
-->
-->inStream = new TestTransform
-->
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->
-->inStream.on 'line', (line) ->
-->  process.stdout.write "-->"
-->  if line.match /line\.match/g
PAUSE
-->    process.stdout.write line
-->    console.error "PAUSE"
-->    inStream.pause()
-->    fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
-->      console.error "UNPAUSE"
-->      inStream.unpause()
-->  else
-->    process.stdout.write line
-->
fs = require 'fs'

TestTransform = require './test-transform'

inStream = new TestTransform

fs.createReadStream("./test.coffee").pipe(inStream)

inStream.on 'line', (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.pause()
    fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
      console.error "UNPAUSE"
      inStream.unpause()
  else
    process.stdout.write line

Jadi jelas, pipa tidak dijeda, hanya berlanjut hingga selesai (walaupun PAUSE dijalankan seperti yang diharapkan), dan karena "UNPAUSE" juga tidak pernah ditulis, panggilan balik 'end' tidak pernah diaktifkan. Mengalihkan aliran untuk menjeda/membatalkan jeda ke readStream dari aliran transformasi tampaknya juga tidak berhasil. Saya berasumsi dari perilaku ini bahwa aliran node entah bagaimana tidak menghormati jeda/batalkan jeda dari dalam panggilan balik acara.

Mungkin juga ada cara lain untuk mencapai hal ini tanpa memanggil jeda/batalkan jeda; jika ada cara seperti menunggu akhir aliran dan menjeda rangkaian eksekusi saat ini, itu secara efektif akan melakukan apa yang saya coba lakukan.


person cosmicexplorer    schedule 18.04.2015    source sumber
comment
Apakah Anda harus menyelesaikan pemrosesan sebelum mulai membaca streaming lagi? Apakah tidak cukup memulai pekerjaan pemrosesan baru dan melanjutkan membaca dari aliran? Node pandai melakukan sesuatu secara asinkron.   -  person Interrobang    schedule 18.04.2015
comment
@Interrobang ya, saya mencoba menyalurkan kedua aliran masukan ke aliran keluaran yang sama dan penting agar aliran kedua dibaca seluruhnya ke keluaran sebelum aliran pertama lainnya dimasukkan. Saya tidak ingin kedua aliran itu diselingi dalam keluaran.   -  person cosmicexplorer    schedule 18.04.2015
comment
Jika cukup untuk tidak menyelinginya, Anda dapat menggunakan aliran bergaya buffer seperti concat-stream. Kalau tidak, Anda memerlukan abstraksi di atas aliran. Salah satu cara yang menarik adalah dengan menggunakan sesuatu seperti Dust.js, yang dapat menyisipkan aliran secara asli.   -  person Interrobang    schedule 18.04.2015
comment
Saya sedang memikirkan hal seperti itu. Karena saya tidak berharap berurusan dengan aliran yang panjangnya gigabyte, saya dapat menyalurkan semuanya ke dalam buffer dan kemudian memprosesnya setiap kali aliran lainnya selesai. Namun, saya lebih memilih untuk tidak menyimpan seluruh aliran dalam memori sekaligus. Saya akan melihat debunya, saya belum pernah melihatnya sebelumnya.   -  person cosmicexplorer    schedule 18.04.2015


Jawaban (2)


Jika saya telah memahami pertanyaannya dengan benar, inilah aplikasi Node sederhana menggunakan Dust.js yang memecahkan masalah tersebut.

Dust adalah mesin templating, tetapi salah satu fitur terbaiknya adalah pemahaman aslinya tentang Node Streams. Contoh ini menggunakan Dust 2.7.0.

Saya menggunakan node-byline sebagai pengganti aliran Transform Anda, namun fungsinya sama-- membaca aliran demi baris.

var fs = require('fs'),
    byline = require('byline'),
    dust = require('dustjs-linkedin');

var stream = byline(fs.createReadStream('./test.txt', { encoding: 'utf8' }));

var template = dust.loadSource(dust.compile('{#byline}--> {.|s}{~n}{match}{/byline}'));

dust.stream(template, {
  byline: stream,
  match: function(chunk, context) {
    var currentLine = context.current();

    if(currentLine.match(/line\.match/g)) {
      return fs.createReadStream('./test.txt', 'utf8');
    }
    return chunk;
  }
}).pipe(process.stdout);

Inilah output dari program saya:

$ node index.js
--> fs = require 'fs'
--> TestTransform = require './test-transform'
--> inStream = new TestTransform
--> fs.createReadStream("./test.coffee").pipe(inStream)
--> inStream.on 'line', (line) ->
-->   process.stdout.write "-->"
-->   if line.match /line\.match/g
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.pause()
    fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
      console.error "UNPAUSE"
      inStream.resume()
  else
    process.stdout.write line

-->     process.stdout.write line
-->     console.error "PAUSE"
-->     inStream.pause()
-->     fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
-->       console.error "UNPAUSE"
-->       inStream.resume()
-->   else
-->     process.stdout.write line

Seperti yang Anda lihat, outputnya disisipkan dengan benar. Jika saya dapat menjelaskan lebih lanjut tentang cara kerja bagian Debu, beri tahu saya.

EDIT: Berikut penjelasan khusus template Dust.

{#byline} {! look for the context variable named `byline` !}
{! okay, it's a stream. For each `data` event, output this stuff once !}
-->
{.|s} {! output the current `data`. Use |s to turn off HTML escaping !}
{~n} {! a newline !}
{match} {! look up the variable called `match` !}
{! okay, it's a function. Run it and insert the result !}
{! if the result is a stream, stream it in. !}
{/byline} {! done looping !}
person Interrobang    schedule 18.04.2015
comment
Tampaknya ini masuk akal! (setelah membaca sintaks debu haha) Saya mencari solusi tanpa ketergantungan eksternal, tetapi ini tampaknya cukup ringan. Pada fungsi match yang diberikan pada dust.stream, mengapa baris if line.match /line\.match/g ditulis? Sepertinya dust hanya akan mengembalikan fs.createReadStream alih-alih potongan itu sendiri, dan baris itu akan hilang. - person cosmicexplorer; 18.04.2015
comment
Fungsi pencocokan dipanggil satu kali per baris menggunakan {#match/}. Jika baris saat ini (context.current()) cocok, maka fungsi mengalir di konten test.txt. Jika tidak, itu hanya mengembalikan chunk saat ini, yang memungkinkan streaming untuk melanjutkan. - person Interrobang; 18.04.2015
comment
Itu masuk akal. Tentang apa bagian {.|s} dalam string templat? Saya berasumsi itu menyuruhnya membaca dari atribut stream (yang dimulai dengan s), atau atribut apa pun jika stream tidak ada, tapi itu bisa jadi benar-benar salah. - person cosmicexplorer; 18.04.2015
comment
{.} berarti konteks saat ini, dan |s berarti jangan lolos dari HTML. - person Interrobang; 19.04.2015
comment
Saya memperbarui jawabannya dengan penjelasan templat yang lebih menyeluruh. - person Interrobang; 19.04.2015

Saya sebenarnya menemukan jawaban terpisah untuk ini juga; tidak secantik, tapi juga berhasil.

Pada dasarnya, pause() hanya menjeda keluaran dari aliran yang disalurkan (dalam mode "mengalir"); karena saya mendengarkan acara 'line' tidak mengalir, jadi pause tentu saja tidak melakukan apa pun. Jadi solusi pertama adalah menggunakan removeListener daripada pause, yang secara efektif menghentikan streaming. Filenya sekarang terlihat seperti:

fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
c = (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.removeListener 'line', c
    f = fs.createReadStream("./test.coffee")
    f.on 'end', ->
      console.error "UNPAUSE"
      inStream.on 'line', c
    f.pipe(process.stdout)
  else
    process.stdout.write line
inStream.on 'line', c

Dan ini menghasilkan keluaran yang hampir berfungsi:

-->fs = require 'fs'
-->TestTransform = require './test-transform'
-->inStream = new TestTransform
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->c = (line) ->
-->  process.stdout.write "-->"
-->  if line.match /line\.match/g
PAUSE
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
c = (line) ->
  process.stdout.write "-->"
  if line.match /line\.match/g
    process.stdout.write line
    console.error "PAUSE"
    inStream.removeListener 'line', c
    f = fs.createReadStream("./test.coffee")
    f.on 'end', ->
      console.error "UNPAUSE"
      inStream.on 'line', c
    f.pipe(process.stdout)
  else
    process.stdout.write line
inStream.on 'line', c
UNPAUSE

Namun, sepertinya streaming asli yang dapat dibaca berhenti saat saya menghapus pendengarnya; ini masuk akal (saya kira sampah simpul mengumpulkan alirannya yang dapat dibaca ketika semua pendengar telah dihapus). Jadi solusi kerja terakhir yang saya temukan bergantung pada perpipaan. Karena aliran Transform yang saya tunjukkan di atas juga mendorong keluarannya per baris ke 'data' pendengar mana pun, pause() dapat digunakan secara efektif di sini untuk tujuan aslinya, tanpa hanya mematikan aliran tersebut. Hasil akhir:

fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'data', (chunk) ->
  line = chunk.toString()
  process.stdout.write "-->#{line}"
  if line.match /line\.match/g
    inStream.pause()
    f = fs.createReadStream("./test.coffee")
    f.on 'end', ->
      inStream.resume()
    f.pipe(process.stdout)

dengan keluaran:

-->fs = require 'fs'
-->TestTransform = require './test-transform'
-->inStream = new TestTransform
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->inStream.on 'data', (chunk) ->
-->  line = chunk.toString()
-->  process.stdout.write "-->#{line}"
-->  if line.match /line\.match/g
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'data', (chunk) ->
  line = chunk.toString()
  process.stdout.write "-->#{line}"
  if line.match /line\.match/g
    inStream.pause()
    f = fs.createReadStream("./test.coffee")
    f.on 'end', ->
      inStream.resume()
    f.pipe(process.stdout)
-->    inStream.pause()
-->    f = fs.createReadStream("./test.coffee")
-->    f.on 'end', ->
-->      inStream.resume()
-->    f.pipe(process.stdout)
-->

yang merupakan hasil yang diharapkan.

person cosmicexplorer    schedule 18.04.2015