如何在 Dart 中创建 StreamTransformer?

Posted

技术标签:

【中文标题】如何在 Dart 中创建 StreamTransformer?【英文标题】:How to create a StreamTransformer in Dart? 【发布时间】:2015-03-02 04:26:26 【问题描述】:

尝试构建一个自定义 StreamTransformer 类,但是那里的许多示例似乎已经过时,并且在文档中找到的示例不是(某些类型语言可能会考虑的)作为一个类(找到这里:https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:async.StreamTransformer)。这似乎不是一种非常类似于 Dart 的处理方式,而更像是一种类似 javascript 的方式(我使用 Dart 来避免这种方式)。

许多在线资源说这是您创建 StreamTransformer 的方式,但是在扩展它时会出错。

class exampleStreamTransformer extends StreamTransformer

  //... (This won't work)

“实现”似乎是要走的路,以及实现所需的绑定功能:

class exampleStreamTransformer implements StreamTransformer

  Stream bind(Stream stream)
  
    //... (Go on to return new stream, etc)
  

我似乎找不到任何这种方式的例子,但我自己把一些东西放在一起(在我的 IDE 中被接受,但在运行时不被接受,当它尝试使用 pause 时出现空对象错误吸气剂):

class exampleStreamTransformer implements StreamTransformer

  StreamController<String> _controller;
  StreamSubscription<String> _subscription;

  Stream bind(Stream stream)
  
    _controller = new StreamController<String>(
        onListen: ()
        
          _subscription = stream.listen((data)
          
            // Transform the data.
            _controller.add(data);
          ,
          onError: _controller.addError,
          onDone: _controller.close,
          cancelOnError: true); // Unsure how I'd pass this in?????
        ,
        onPause: _subscription.pause,
        onResume: _subscription.resume,
        onCancel: _subscription.cancel,
        sync: true
    );

    return _controller.stream;
  

希望以这种方式实现它,就像以“类型化”方式生成课程一样,非常感谢任何帮助,谢谢。

【问题讨论】:

我同意StreamTransformer 是一个不必要的类 - 它只包含一个函数,因此您可以在传递对象的任何地方传递该函数。如果现在添加到库中,它只是一个函数类型。 【参考方案1】:

你为什么不用StreamTransformer.fromHandler()

import 'dart:async';

void handleData(data, EventSink sink) 
  sink.add(data*2);


void main() 
  StreamTransformer doubleTransformer = new StreamTransformer.fromHandlers(handleData: handleData);

  StreamController controller = new StreamController();
  controller.stream.transform(doubleTransformer).listen((data) 
    print('data: $data');
  );

  controller.add(1);
  controller.add(2);
  controller.add(3);

输出

data: 2
data: 4
data: 6

【讨论】:

因为我需要一堂课。内容需要缓冲,因此非常复杂。还是谢谢你。 然后做一个全局buffer变量,缓存数据,根据需要写数据到sink? 不,这本质上是个坏主意。如果我想多次使用它怎么办?拥有一个全局缓冲区意味着两个流可能会缓冲到同一个地方,否则每次我都必须在函数范围之外的某个地方创建一个单独的变量。缺乏封装性和代码可维护性。只想调用 .transform(new exampleStreamTransformer()); + 甚至可以缩短,但我在这方面更坚持语言约定。这是一个旧的(已弃用)示例:victorsavkin.com/post/51233496661/…【参考方案2】:

好的。这是另一个工作示例:

import 'dart:async';

class DuplicateTransformer<S, T> implements StreamTransformer<S, T> 
  StreamController _controller;

  StreamSubscription _subscription;

  bool cancelOnError;

  // Original Stream
  Stream<S> _stream;

  DuplicateTransformer(bool sync: false, this.cancelOnError) 
    _controller = new StreamController<T>(onListen: _onListen, onCancel: _onCancel, onPause: () 
      _subscription.pause();
    , onResume: () 
      _subscription.resume();
    , sync: sync);
  

  DuplicateTransformer.broadcast(bool sync: false, bool this.cancelOnError) 
    _controller = new StreamController<T>.broadcast(onListen: _onListen, onCancel: _onCancel, sync: sync);
  

  void _onListen() 
    _subscription = _stream.listen(onData,
      onError: _controller.addError,
      onDone: _controller.close,
      cancelOnError: cancelOnError);
  

  void _onCancel() 
    _subscription.cancel();
    _subscription = null;
  

  /**
   * Transformation
   */

  void onData(S data) 
    _controller.add(data);
    _controller.add(data); /* DUPLICATE EXAMPLE!! REMOVE FOR YOUR OWN IMPLEMENTATION!! */
  

  /**
   * Bind
   */

  Stream<T> bind(Stream<S> stream) 
    this._stream = stream;
    return _controller.stream;
  


void main() 
  // Create StreamController
  StreamController controller = new StreamController.broadcast();
  // Transform
  Stream s = controller.stream.transform(new DuplicateTransformer.broadcast());

  s.listen((data) 
    print('data: $data');
  ).cancel();

  s.listen((data) 
    print('data2: $data');
  ).cancel();

  s.listen((data) 
    print('data3: $data');
  );

  // Simulate data

  controller.add(1);
  controller.add(2);
  controller.add(3);

让我添加一些注释:

在查看其他 dart 内部转换器的源代码时,使用 implements 似乎是正确的方法。 我实现了常规和广播流的两个版本。 如果是常规流,您可以直接在新的流控制器上调用取消/暂停/恢复,因为我们只能监听一次。 如果您使用广播流,我发现只有在没有人收听该流时才调用listen()。 onCancel 的行为相同。如果最后一个订阅者取消其订阅,则调用 onCancel。这就是为什么在这里使用相同的功能是安全的。

【讨论】:

是的,这看起来是一个更好的实现!谢谢,我稍后试试 感谢您的实施,这非常有帮助:)【参考方案3】:

https://github.com/dart-lang/sdk/issues/27740#issuecomment-258073139

您可以使用 StreamTransformer.fromHandlers 轻松创建 只是将输入事件转换为输出事件的转换器。

例子:

new StreamTransformer.fromHandlers(handleData: (String event, EventSink output) 
  if (event.startsWith('data:')) 
    output.add(JSON.decode(event.substring('data:'.length)));
   else if (event.isNotEmpty) 
    output.addError('Unexpected data from CloudBit stream: "$event"');
  
);

【讨论】:

【参考方案4】:

map 不同,转换器更强大,允许您保持内部状态,并随时发出值。它可以实现map做不到的事情,比如延迟、复制值、选择性地省略某些值等等。

从本质上讲,实现需要一个 bind 方法,该方法基于传入的旧流提供一个新流,以及一个有助于在运行时进行类型检查的 cast 方法。

这是一个实现“TallyTransformer”的过度简化示例,该示例将整数值流转换为总和流。例如,如果到目前为止的输入流有1, 1, 1, -2, 0, ...,则输出流将是1, 2, 3, 1, 1, ...,即将所有输入相加。

用法示例:stream.transform(TallyTransformer())

class TallyTransformer implements StreamTransformer 
  StreamController _controller = StreamController();
  int _sum = 0; // sum of all values so far

  @override
  Stream bind(Stream stream) 
    // start listening on input stream
    stream.listen((value) 
      _sum += value; // add the new value to sum
      _controller.add(_sum); // emit current sum to our listener
    );
    // return an output stream for our listener
    return _controller.stream;
  

  @override
  StreamTransformer<RS, RT> cast<RS, RT>() 
    return StreamTransformer.castFrom(this);
  

此示例过于简化(但仍然有效),不包括流暂停、恢复或取消等情况。如果遇到"Stream has already been listened" 错误,请确保流正在广播。

【讨论】:

【参考方案5】:

如果您想使用这样的函数简单地转换值

int handleData(int data) 
  return data * 2;

使用map Stream的方法

stream
  .map(handleData)
  .listen((data) 
    print('data: $data');
  );

完整示例:

import 'dart:async';

int handleData(int data) 
  return data * 2;


void main() 
  final controller = StreamController<int>();

  controller.stream
    .map(handleData)
    .listen((data) 
      print('data: $data');
    );

  controller.add(1);
  controller.add(2);
  controller.add(3);

在 dart.dev 上查看 more examples

【讨论】:

以上是关于如何在 Dart 中创建 StreamTransformer?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Dart 中创建一个空 Set

在 Dart 中创建大写方法

如何在Flutter / Dart中创建异步回调?

如何编辑列表中的每个项目并在 dart 中创建一个新项目

如何在本地存储/外部 Flutter 中创建文件夹?

如何在颤振/飞镖中创建泛型类型的对象?