MetaMask/json-rpc-middleware-stream

Posted 慢行厚积

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MetaMask/json-rpc-middleware-stream相关的知识,希望对你有一定的参考价值。

https://github.com/MetaMask/json-rpc-middleware-stream/blob/master/test/index.js#L20

A small toolset for streaming json rpc and matching requests and responses. Made to be used with json-rpc-engine.

可以用来与json-rpc-engine结合使用的,对输入的json rpc进行读入、写出处理

 

json-rpc-middleware-stream/index.js

const SafeEventEmitter = require(safe-event-emitter)
const DuplexStream = require(readable-stream).Duplex //即一个可读且可写的流

module.exports = createStreamMiddleware

function createStreamMiddleware() {
  const idMap = {}
  const stream = new DuplexStream({
    objectMode: true, //输入的是任意形式的数据,而不非是字符串和 Buffer(或 Uint8Array
    read: readNoop, //下面定义的返回false的函数,会覆写_read(),当要从别地流处读出时调用
    write: processMessage,//会覆写_write(),当要写入别的流时调用
  })

  const events = new SafeEventEmitter()

  const middleware = (req, res, next, end) => {
    // write req to stream
    stream.push(req)//从req流中读出,就会去调用readNoop函数
    // register request on id map
    idMap[req.id] = { req, res, next, end }//并在数组中将req根据其id记录下来
  }

  return { events, middleware, stream }

  function readNoop () {
    return false
  }

  function processMessage (res, encoding, cb) {//当要写出时调用
    let err
    try {
      const isNotification = !res.id //如果res.id有值,则isNotification为false;否则为true
      if (isNotification) {//res.id没值或为0
        processNotification(res)//触发事件‘notification‘
      } else {
        processResponse(res)//res.id有值
      }
    } catch (_err) {
      err = _err
    }
    // continue processing stream
    cb(err)
  }

  function processResponse(res) {//将流中内容写出到res流
    const context = idMap[res.id] //查看有没有与相应的res.id相同的ID的流之前读入过
    if (!context) throw new Error(`StreamMiddleware - Unknown response id ${res.id}`) //如果context为空,则说明相应的id的流并没有读入过,这样写出的时候就不知道要怎么写出了,无end,所以会出错
    delete idMap[res.id] //如果有读入过,则写出前先清空idMap中的相应内容
    // copy whole res onto original res
    Object.assign(context.res, res) //然后将现在要写出到的res流覆写context.res,并返回context.res
    // run callback on empty stack,
    // prevent internal stream-handler from catching errors
    setTimeout(context.end) //调用之前读入时写好的end函数来结束写出操作
  }

  function processNotification(res) {//该事件的监听会在inpage-provider处设置
    events.emit(notification, res)
  }

}
Object.assign(target, ...sources):用于将所有可枚举属性的值从一个或多个源对象复制到目标对象。它将返回目标对象。

 

json-rpc-middleware-stream/engineStream.js

const DuplexStream = require(readable-stream).Duplex

module.exports = createEngineStream

function createEngineStream({ engine }) {//engineRpcEngine
  if (!engine) throw new Error(Missing engine parameter!)
  const stream = new DuplexStream({ objectMode: true, read, write })
  // forward notifications
  if (engine.on) {
    engine.on(notification, (message) => {//监听notification事件
      stream.push(message) //事件被触发的话就将message数据读入stream,调用read函数
    })
  }
  return stream

  function read () {
    return false
  }
  function write (req, encoding, cb) {//当写出时调用该函数
    engine.handle(req, (err, res) => {
      this.push(res)
    })
    cb()
  }
}

 

 

测试:

json-rpc-middleware-stream/test/index.js

const test = require(tape)
const RpcEngine = require(json-rpc-engine)
const createJsonRpcStream = require(../index)
const createEngineStream = require(../engineStream)

test(middleware - raw test, (t) => {

  const jsonRpcConnection = createJsonRpcStream()
  const req = { id: 1, jsonrpc: 2.0, method: test }
  const initRes = { id: 1, jsonrpc: 2.0 }
  const res = { id: 1, jsonrpc: 2.0, result: test }

  // listen for incomming requests
  jsonRpcConnection.stream.on(data, (_req) => {//监听data事件
    t.equal(req, _req, got the expected request)//说明触发data事件传来的
    jsonRpcConnection.stream.write(res)//将流中的与res.id相同的数据写出
  })

  // run middleware, expect end fn to be called
  jsonRpcConnection.middleware(req, initRes, () => {//将req流写入createJsonRpcStream
    t.fail(should not call next)
  }, (err) => {
    t.notOk(err, should not error)
    t.deepEqual(initRes, res, got the expected response)
    t.end()
  })

})

test(engine to stream - raw test, (t) => {

  const engine = new RpcEngine()
  engine.push((req, res, next, end) => {
    res.result = test
    end()
  })

  const stream = createEngineStream({ engine })
  const req = { id: 1, jsonrpc: 2.0, method: test }
  const res = { id: 1, jsonrpc: 2.0, result: test }

  // listen for incomming requests
  stream.on(data, (_res) => {
    t.deepEqual(res, _res, got the expected response)
    t.end()
  })

  stream.on(error, (err) => {
    t.fail(error.message)
  })

  stream.write(req)

})


test(middleware and engine to stream, (t) => {//上面两者的结合

  // create guest
  const engineA = new RpcEngine()
  const jsonRpcConnection = createJsonRpcStream()
  engineA.push(jsonRpcConnection.middleware)

  // create host
  const engineB = new RpcEngine()
  engineB.push((req, res, next, end) => {
    res.result = test
    end()
  })

  // connect both
  const clientSideStream = jsonRpcConnection.stream
  const hostSideStream = createEngineStream({ engine: engineB })
  clientSideStream
  .pipe(hostSideStream)
  .pipe(clientSideStream)

  // request and expected result
  const req = { id: 1, jsonrpc: 2.0, method: test }
  const res = { id: 1, jsonrpc: 2.0, result: test }

  engineA.handle(req, (err, _res) => {//req调用jsonRpcConnection.middleware读入clientSideStream,然后hostSideStreamclientSideStream中读入req数据,然后调用engineB的方法写出,所以得到的result: test
    t.notOk(err, does not error)
    t.deepEqual(res, _res, got the expected response)
    t.end()
  })

})

test(server notification, (t) => {
  t.plan(1)

  const jsonRpcConnection = createJsonRpcStream()
  const notif = { jsonrpc: 2.0, method: test_notif }//这里没有设置id,所以调用write所以会触发processNotification函数,触发notification事件

  jsonRpcConnection.events.once(notification, (message) => {
    t.equals(message.method, notif.method)
    t.end()
  })

  // receive notification
  jsonRpcConnection.stream.write(notif)
})


test(server notification in stream, (t) => {
  const engine = new RpcEngine()

  const stream = createEngineStream({ engine })
  const notif = { jsonrpc: 2.0, method: test_notif }

  // listen for incomming requests
  stream.once(data, (_notif) => {
    t.deepEqual(notif, _notif, got the expected notification)
    t.end()
  })

  stream.on(error, (err) => {
    t.fail(error.message)
  })

  engine.emit(notification, notif)//将触发createEngineStream中的notification事件,notif将被读入stream,将触发data事件
})

 

以上是关于MetaMask/json-rpc-middleware-stream的主要内容,如果未能解决你的问题,请参考以下文章