转换流以将字符串添加到每一行
Posted
技术标签:
【中文标题】转换流以将字符串添加到每一行【英文标题】:Transform stream to prepend string to each line 【发布时间】:2017-11-23 15:44:24 【问题描述】:我像这样生成一个子进程:
const n = cp.spawn('bash');
n.stdout.pipe(process.stdout);
n.stderr.pipe(process.stderr);
我正在寻找一个转换流,以便我可以在子进程的每一行开头添加类似“[子进程]”的内容,因此我知道 stdio 来自子进程而不是父进程。
所以它看起来像:
const getTransformPrepender = function() : Transform
return ...
n.stdout.pipe(getTransformPrepender('[child]')).pipe(process.stdout);
n.stderr.pipe(getTransformPrepender('[child]')).pipe(process.stderr);
有谁知道是否有这样的现有转换包或如何编写?
我有这个:
import * as stream from 'stream';
export default function(pre: string)
let saved = '';
return new stream.Transform(
transform(chunk, encoding, cb)
cb(null, String(pre) + String(chunk));
,
flush(cb)
this.push(saved);
cb();
);
但我担心它在边缘情况下不起作用 - 一个块突发可能不包括整行(对于很长的行)。
看起来这个问题的答案在这里:https://strongloop.com/strongblog/practical-examples-of-the-new-node-js-streams-api/
但是有了这个附录: https://twitter.com/the1mills/status/886340747275812865
【问题讨论】:
【参考方案1】:您可以使用以下命令添加到流中:
https://github.com/ORESoftware/prepend-transform
但它旨在像这样解决手头的问题:
import pt from 'prepend-transform';
import * as cp from 'child_process';
const n = cp.spawn('bash');
n.stdout.pipe(pt('child stdout: ')).pipe(process.stdout);
n.stderr.pipe(pt('child stderr: ')).pipe(process.stderr);
【讨论】:
我认为这有答案 - strongloop.com/strongblog/…【参考方案2】:总共需要正确处理三种情况:
代表整行的单个块 一个块代表多行 仅代表行的一部分的单个块这是解决所有三种情况的算法描述
-
接收一大块数据
扫描块中的换行符
找到换行符后,立即获取它之前的所有内容(包括换行符)并将其作为单行条目发送出去,并进行任何您需要的修改
重复直到处理完整个块(没有剩余数据)或直到没有找到其他换行符(剩余一些数据,保存以备后用)
这是一个实际的实现,其中描述了为什么需要它等。
请注意,出于性能原因,我不会将缓冲区转换为经典的 JS 字符串。
const Transform = require('stream')
const prefix = Buffer.from('[worker]: ')
const prepender = new Transform(
transform(chunk, encoding, done)
this._rest = this._rest && this._rest.length
? Buffer.concat([this._rest, chunk])
: chunk
let index
// As long as we keep finding newlines, keep making slices of the buffer and push them to the
// readable side of the transform stream
while ((index = this._rest.indexOf('\n')) !== -1)
// The `end` parameter is non-inclusive, so increase it to include the newline we found
const line = this._rest.slice(0, ++index)
// `start` is inclusive, but we are already one char ahead of the newline -> all good
this._rest = this._rest.slice(index)
// We have a single line here! Prepend the string we want
this.push(Buffer.concat([prefix, line]))
return void done()
,
// Called before the end of the input so we can handle any remaining
// data that we have saved
flush(done)
// If we have any remaining data in the cache, send it out
if (this._rest && this._rest.length)
return void done(null, Buffer.concat([prefix, this._rest])
,
)
process.stdin.pipe(prepender).pipe(process.stdout)
【讨论】:
以上是关于转换流以将字符串添加到每一行的主要内容,如果未能解决你的问题,请参考以下文章
使用客户端凭据流以编程方式在 Azure AD 中添加应用程序