一文彻底搞懂前端沙箱
Posted 让前端飞
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一文彻底搞懂前端沙箱相关的知识,希望对你有一定的参考价值。
什么是“沙箱”
沙箱(Sandbox)[1]
也称作:“沙箱/沙盒/沙盘”。沙箱是一种安全机制,为运行中的程序提供隔离环境。通常是作为一些来源不可信、具破坏力或无法判定程序意图的程序提供实验之用。沙箱能够安全的执行不受信任的代码,且不影响外部实际代码影响的独立环境。
有哪些动态执行脚本的场景?
在一些应用中,我们希望给用户提供插入自定义逻辑的能力,比如 Microsoft 的 Office 中的 VBA,比如一些游戏中的 lua 脚本,FireFox 的「油猴脚本」,能够让用户发在可控的范围和权限内发挥想象做一些好玩、有用的事情,扩展了能力,满足用户的个性化需求。
大多数都是一些客户端程序,在一些在线的系统和产品中也常常也有类似的需求,事实上,在线的应用中也有不少提供了自定义脚本的能力,比如 Google Docs 中的 Apps Script,它可以让你使用 javascript 做一些非常有用的事情,比如运行代码来响应文档打开事件或单元格更改事件,为公式制作自定义电子表格函数等等。
与运行在「用户电脑中」的客户端应用不同,用户的自定义脚本通常只能影响用户自已,而对于在线的应用或服务来讲,有一些情况就变得更为重要,比如「安全」,用户的「自定义脚本」必须严格受到限制和隔离,即不能影响到宿主程序,也不能影响到其它用户。
另外,有一些牵扯「模板化」的前端框架,如Vue.js、Venom.js等都会用到动态代码执行。
JavaScript中的沙箱实现
零、几个基础知识什么是constructor•JavaScript中constructor属性指向创建当前对象的构造函数,该属性是存在原型里的,且是不可靠的 JavaScript中constructor属性[2]
function test()
const obj = new test();
console.log(obj.hasOwnProperty(\'constructor\')); // false
console.log(obj.__proto__.hasOwnProperty(\'constructor\')); // true
console.log(obj.__proto__ === test.prototype); // true
console.log(test.prototype.hasOwnProperty(\'constructor\')); // true
/** constructor是不可靠的 */
function Foo()
Foo.prototype = ;
const foo = new Foo();
console.log(foo.constructor === Object); // true,可以看出不是Foo了
•constructor也是一种用于创建和初始化class[3]创建的对象的特殊方法 Class构造方法[4]
几个典型的constructor:
(async function())().constructor === Promise
// 浏览器环境下
this.constructor.constructor === Function
window.constructor.constructor === Function
// node环境下
this.constructor.constructor === Function
global.constructor.constructor === Function
JS Proxy getPrototypeOf()handler.getPrototypeOf()
是一个代理方法,当读取代理对象的原型时,该方法就会被调用。语法:
const p = new Proxy(obj,
getPrototypeOf(target) // target 被代理的目标对象。
...
);
当 getPrototypeOf 方法被调用时,this 指向的是它所属的处理器对象,getPrototypeOf 方法的返回值必须是一个对象或者 null。
在 JavaScript 中,有下面这五种操作(方法/属性/运算符)可以触发 JS 引擎读取一个对象的原型,也就是可以触发 getPrototypeOf() 代理方法的运行:
•Object.getPrototypeOf()[5]•Reflect.getPrototypeOf()[6]•proto[7]•Object.prototype.isPrototypeOf()[8]•instanceof[9]
如果遇到了下面两种情况,JS 引擎会抛出 TypeError[10] 异常:
•getPrototypeOf() 方法返回的不是对象也不是 null。•目标对象是不可扩展的,且 getPrototypeOf() 方法返回的原型不是目标对象本身的原型。
基本用法:
const obj = ;
const proto = ;
const handler =
getPrototypeOf(target)
console.log(target === obj); // true
console.log(this === handler); // true
return proto;
;
var p = new Proxy(obj, handler); // obj是被代理的对象,也就是handler.getPrototypeOf的target参数
console.log(Object.getPrototypeOf(p) === proto); // true
5 种触发 getPrototypeOf 代理方法的方式:
const obj = ;
const p = new Proxy(obj,
getPrototypeOf(target)
return Array.prototype;
);
console.log(
Object.getPrototypeOf(p) === Array.prototype, // true
Reflect.getPrototypeOf(p) === Array.prototype, // true
p.__proto__ === Array.prototype, // true
Array.prototype.isPrototypeOf(p), // true
p instanceof Array // true
);
两种异常的情况:
// getPrototypeOf() 方法返回的不是对象也不是 null
const obj = ;
const p = new Proxy(obj,
getPrototypeOf(target)
return "foo";
);
Object.getPrototypeOf(p); // TypeError: "foo" is not an object or null
// 目标对象是不可扩展的,且 getPrototypeOf() 方法返回的原型不是目标对象本身的原型
const obj = Object.preventExtensions(); // obj不可扩展
const p = new Proxy(obj,
getPrototypeOf(target)
return ;
);
Object.getPrototypeOf(p); // TypeError: expected same prototype value
// 如果对上面的代码做如下的改造就没问题
const obj = Object.preventExtensions(); // obj不可扩展
const p = new Proxy(obj,
getPrototypeOf(target) // target就是上面的obj
return obj.__proto__; // 返回的是目标对象本身的原型
);
Object.getPrototypeOf(p); // 不报错
一、跟浏览器宿主环境一致的沙箱实现构建闭包环境我们知道在 JavaScript 中的作用域(scope)只有全局作用域(global scope)、函数作用域(function scope)以及从 ES6 开始才有的块级作用域(block scope)。如果要将一段代码中的变量、函数等的定义隔离出来,受限于 JavaScript 对作用域的控制,只能将这段代码封装到一个 Function 中,通过使用 function scope 来达到作用域隔离的目的。也因为需要这种使用函数来达到作用域隔离的目的方式,于是就有 IIFE(立即调用函数表达式),这是一个被称为“自执行匿名函数”的设计模式。
(function foo()
const a = 1;
console.log(a);
)();// 无法从外部访问变量
console.log(a) // 抛出错误:"Uncaught ReferenceError: a is not defined"
当函数变成立即执行的函数表达式时,表达式中的变量不能从外部访问,它拥有独立的词法作用域。不仅避免了外界访问 IIFE 中的变量,而且又不会污染全局作用域,弥补了 JavaScript 在 scope 方面的缺陷。一般常见于写插件和类库时,如 JQuery 当中的沙箱模式
(function (window)
var jQuery = function (selector, context)
return new jQuery.fn.init(selector, context);
jQuery.fn = jQuery.prototype = function ()
//原型上的方法,即所有jQuery对象都可以共享的方法和属性
jQuery.fn.init.prototype = jQuery.fn;
window.jQeury = window.$ = jQuery; //如果需要在外界暴露一些属性或者方法,可以将这些属性和方法加到window全局对象上去
)(window);
当将 IIFE 分配给一个变量,不是存储 IIFE 本身,而是存储 IIFE 执行后返回的结果。
const result = (function ()
const name = "张三";
return name;
)();
console.log(result); // "张三"
原生浏览器对象的模拟模拟原生浏览器对象的目的是为了防止闭包环境,操作原生对象,篡改污染原生环境,完成模拟浏览器对象之前我们需要先关注几个不常用的 API。
eval
eval 函数可将字符串转换为代码执行,并返回一个或多个值:
const b = eval("(name:\'张三\')");
console.log(b.name);
由于 eval 执行的代码可以访问闭包和全局范围,因此就导致了代码注入的安全问题,因为代码内部可以沿着作用域链往上找,篡改全局变量,这是我们不希望的。
console.log(eval( this.window === window )); // true
补充几个点:
•性能&安全问题,一般不建议在实际业务代码中引入eval•辅助异步编程框架的windjs大量采用eval的写法来辅助编程,引发争议 专访 Wind.js 作者老赵(上):缘由、思路及发展[11]•浏览器环境下,(0, eval)()比eval()的性能要好「目前已经不是了」(0, eval)(‘this’)[12]
const times = 1000;
const time1 = \'直接引用\';
const time2 = \'间接引用\';
let times1 = times;
console.time(time1);
while(times1--)
eval(`199 + 200`);
console.timeEnd(time1);
let times2 = times;
console.time(time2);
while(times2--)
(0, eval)(`199 + 200`);
console.timeEnd(time2);
new Function
Function构造函数创建一个新的 Function 对象。直接调用这个构造函数可用于动态创建函数。
new Function ([arg1[, arg2[, ...argN]],] functionBody)
arg1, arg2, ... argN 被函数使用的参数的名称必须是合法命名的。参数名称是一个有效的 JavaScript 标识符的字符串,或者一个用逗号分隔的有效字符串的列表,例如“×”,“theValue”,或“a,b”。
补充几个点:
•new Function()性能一般比eval要好,很多用到这块的前端框架都是用new Function()实现的,比如:Vue.js•打开浏览器控制台后,new Function()的性能要慢一倍以上
functionBody
一个含有包括函数定义的 JavaScript 语句的字符串。
const sum = new Function(\'a\', \'b\', \'return a + b\');
console.log(sum(1, 2));//3
同样也会遇到和 eval 类似的的安全问题和相对较小的性能问题。
let a = 1;
function sandbox()
let a = 2;
return new Function(\'return a;\'); // 这里的 a 指向最上面全局作用域内的 1
const f = sandbox();
console.log(f());
与 eval 不同的是 Function 创建的函数只能在全局作用域中运行,它无法访问局部闭包变量,它们总是被创建于全局环境,因此在运行时它们只能访问全局变量和自己的局部变量,不能访问它们被 Function 构造器创建时所在的作用域的变量。new Function()是 eval()更好替代方案。它具有卓越的性能和安全性,但仍没有解决访问全局的问题。
with
with 是 JavaScript 中一个关键字,扩展一个语句的作用域链。它允许半沙盒执行。那什么叫半沙盒?语句将某个对象添加到作用域链的顶部,如果在沙盒中有某个未使用命名空间的变量,跟作用域链中的某个属性同名,则这个变量将指向这个属性值。如果沒有同名的属性,则将拋出 ReferenceError。
// 严格模式下以下代码运行会有问题
function sandbox(o)
with (o)
//a=5;
c=2;
d=3;
console.log(a,b,c,d); // 0,1,2,3 //每个变量首先被认为是一个局部变量,如果局部变量与 obj 对象的某个属性同名,则这个局部变量会指向 obj 对象属性。
const f =
a:0,
b:1
sandbox(f);
console.log(f);
console.log(c,d); // 2,3 c、d被泄露到window对象上
究其原理,with在内部使用in运算符。对于块内的每个变量访问,它都在沙盒条件下计算变量。如果条件是 true,它将从沙盒中检索变量。否则,就在全局范围内查找变量。但是 with 语句使程序在查找变量值时,都是先在指定的对象中查找。所以对于那些本来不是这个对象的属性的变量,查找起来会很慢,对于有性能要求的程序不适合(JavaScript 引擎会在编译阶段进行数项的性能优化。其中有些优化依赖于能够根据代码的词法进行静态分析,并预先确定所有变量和函数的定义位置,才能在执行过程中快速找到标识符)。with 也会导致数据泄漏(在非严格模式下,会自动在全局作用域创建一个全局变量)
in 运算符
in 运算符能够检测左侧操作数是否为右侧操作数的成员。其中,左侧操作数是一个字符串,或者可以转换为字符串的表达式,右侧操作数是一个对象或数组。
const o =
a : 1,
b : function()
;
console.log("a" in o); //true
console.log("b" in o); //true
console.log("c" in o); //false
console.log("valueOf" in o); //返回true,继承Object的原型方法
console.log("constructor" in o); //返回true,继承Object的原型属性
with + new Function
配合 with 用法可以稍微限制沙盒作用域,先从当前的 with 提供对象查找,但是如果查找不到依然还能从更上面的作用域获取,污染或篡改全局环境。
function sandbox (src)
src = \'with (sandbox) \' + src + \'\';
return new Function(\'sandbox\', src);
const str = `
let a = 1;
window.name="张三";
console.log(a); // 打印:1
`;
sandbox(str)();
console.log(window.name);//\'张三\'
可以看到,基于上面的方案都多多少少存在一些安全问题:
•eval 是全局对象的一个函数属性,执行的代码拥有着和应用中其它正常代码一样的的权限,它能访问「执行上下文」中的局部变量,也能访问所有「全局变量」,在这个场景下,它是一个非常危险的函数•使用 Function 构造器生成的函数,并不会在创建它的上下文中创建闭包,一般在全局作用域中被创建。当运行函数的时候,只能访问自己的本地变量和全局变量,不能访问 Function 构造器被调用生成的上下文的作用域•with 一样的问题,它首先会在传入的对象中查找对应的变量,如果找不到就会往更上层的全局作用域去查找,所以也避免不了污染或篡改全局环境
那有没有更安全一些的沙箱环境实现呢?
基于 Proxy 实现的沙箱(ProxySandbox)
ES6 Proxy 用于修改某些操作的默认行为,等同于在语言层面做出修改,属于一种“元编程”(meta programming)
function evalute(code,sandbox)
sandbox = sandbox || Object.create(null);
const fn = new Function(\'sandbox\', `with(sandbox)return ($code)`);
const proxy = new Proxy(sandbox,
has(target, key)
// 让动态执行的代码认为属性已存在
return true;
);
return fn(proxy);
evalute(\'1+2\') // 3
evalute(\'console.log(1)\') // Cannot read property \'log\' of undefined
我们知道无论 eval 还是 function,执行时都会把作用域一层一层向上查找,如果找不到会一直到 global,那么利用 Proxy 的原理就是,让执行了代码在 sandobx 中找的到,以达到「防逃逸」的目的。
我们前面提到with在内部使用in运算符来计算变量,如果条件是 true,它将从沙盒中检索变量。理想状态下没有问题,但也总有些特例独行的存在,比如 Symbol.unscopables。
Symbol 对象的 Symbol.unscopables 属性,指向一个对象。该对象指定了使用 with 关键字时,哪些属性会被 with 环境排除。
Array.prototype[Symbol.unscopables]
// // copyWithin: true,// entries: true,// fill: true,// find: true,// findIndex: true,// keys: true// Object.keys(Array.prototype[Symbol.unscopables])
// [\'copyWithin\', \'entries\', \'fill\', \'find\', \'findIndex\', \'keys\']
上面代码说明,数组有 6 个属性,会被 with 命令排除。
由此我们的代码还需要修改如下:
function sandbox(code)
code = \'with (sandbox) \' + code + \'\'
const fn = new Function(\'sandbox\', code)
return function (sandbox)
const sandboxProxy = new Proxy(sandbox,
has(target, key)
return true
,
get(target, key)
if (key === Symbol.unscopables) return undefined
return target[key]
)
return fn(sandboxProxy)
const test =
a: 1,
log()
console.log(\'11111\')
const code = \'log(); console.log(a)\' // 1111,TypeError: Cannot read property \'log\' of undefinedsandbox(code)(test)
Symbol.unscopables 定义对象的不可作用属性。Unscopeable 属性永远不会从 with 语句中的沙箱对象中检索,而是直接从闭包或全局范围中检索。
快照沙箱(SnapshotSandbox)
快照沙箱实现来说比较简单,主要用于不支持 Proxy 的低版本浏览器,原理是基于diff来实现的,在子应用激活或者卸载时分别去通过快照的形式记录或还原状态来实现沙箱,snapshotSandbox 会污染全局 window。
我们看下 qiankun[13] 的 snapshotSandbox 的源码,这里为了帮助理解做部分精简及注释。
function iter(obj, callbackFn)
for (const prop in obj)
if (obj.hasOwnProperty(prop))
callbackFn(prop);
/**
* 基于 diff 方式实现的沙箱,用于不支持 Proxy 的低版本浏览器
*/
class SnapshotSandbox
constructor(name)
this.name = name;
this.proxy = window;
this.type = \'Snapshot\';
this.sandboxRunning = true;
this.windowSnapshot = ;
this.modifyPropsMap = ;
this.active();
//激活
active()
// 记录当前快照
this.windowSnapshot = ;
iter(window, (prop) =>
this.windowSnapshot[prop] = window[prop];
);
// 恢复之前的变更
Object.keys(this.modifyPropsMap).forEach((p) =>
window[p] = this.modifyPropsMap[p];
);
this.sandboxRunning = true;
//还原
inactive()
this.modifyPropsMap = ;
iter(window, (prop) =>
if (window[prop] !== this.windowSnapshot[prop])
// 记录变更,恢复环境
this.modifyPropsMap[prop] = window[prop];
window[prop] = this.windowSnapshot[prop];
);
this.sandboxRunning = false;
let sandbox = new SnapshotSandbox();
//test
((window) =>
window.name = \'张三\'
window.age = 18
console.log(window.name, window.age) // 张三,18
sandbox.inactive() // 还原
console.log(window.name, window.age) // undefined,undefined
sandbox.active() // 激活
console.log(window.name, window.age) // 张三,18
)(sandbox.proxy);
legacySandBox
qiankun 框架 singular 模式下的 proxy 沙箱实现,为了便于理解,这里做了部分代码的精简和注释。
//legacySandBox
const callableFnCacheMap = new WeakMap();
function isCallable(fn)
if (callableFnCacheMap.has(fn))
return true;
const naughtySafari = typeof document.all === \'function\' && typeof document.all === \'undefined\';
const callable = naughtySafari ? typeof fn === \'function\' && typeof fn !== \'undefined\' : typeof fn ===
\'function\';
if (callable)
callableFnCacheMap.set(fn, callable);
return callable;
;
function isPropConfigurable(target, prop)
const descriptor = Object.getOwnPropertyDescriptor(target, prop);
return descriptor ? descriptor.configurable : true;
function setWindowProp(prop, value, toDelete)
if (value === undefined && toDelete)
delete window[prop];
else if (isPropConfigurable(window, prop) && typeof prop !== \'symbol\')
Object.defineProperty(window, prop,
writable: true,
configurable: true
&nbsFlink总结之一文彻底搞懂时间和窗口
Flink总结之一文彻底搞懂时间和窗口
文章目录
Flink中时间语义是非常丰富的,总共有三种,分别是事件时间(Event Time)、处理时间(Processing Time)和摄入时间(Ingestion Time),丰富的时间语义加上水位线( Watermarks)功能,让我们在处理流式数据更加轻松。
在Flink中窗口也定义的非常全面,有计数窗口(Count Window)和时间窗口(Time Window),在窗口切分上有份滚动窗口(Tumbling Windows)、滑动窗口(Sliding Windows)、会话窗口(Session Windows)和全局窗口全局窗口(Global Windows)。
官方文档地址:链接: Flink文档
一、Flink中时间概念
1. 事件时间(Event Time)
顾名思义,事件时间就是指数据产生的时间,是数据本身的时间属性,不依赖系统时间
2. 处理时间(Processing Time)
处理时间是指数据真正被Flink处理的时间,是服务器时间
3. 摄入时间(Ingestion Time)
它是指数据进入 Flink 数据流的时间,也就是 Source 算子读入数据的时间。摄入时间相当于是事件时间和处理时间的一个中和,它是把 Source 任务的处理时间,当作了数据的产生时间添加到数据里。
二、水位线(Watermark)
1. 什么是水位线
本质就是时间戳,在流式环境下,我们通常用事件时间来进行统计分析,事件时间又不属于系统时间,因此我们怎么衡量时间的进展,就是通过水位线(Watermark)。
如下图所示,Flink处理每一条数据的时候,将会把这个数据的事件时间作为水位线。
2. 水位线分类
1. 有序流中的水位线
有序流中的水位线比较容易理解,事件是按照顺序一个一个到达Flink的,那么水位线只要根据数据自身的事件时间来识别就可以了,水位线会随着时间往前推动,如下图所示:
1、如何使用
2. 乱序流中的水位线
乱序流中的水位线相比较于有序流的场景比较复杂一些,数据是源源不断到达Flink的,此时可能存在后一刻的数据比前一刻的数据先到达Flink,比如:9:00:00整的数据比8:59:59的数据来的要早,如果此时仍然使用有序流中的水位线,那么数据就会不准确,不属于8点到9点的数据被统计到了这个窗口范围,属于这个范围的数据却会丢失。乱序流中水位线如下图所示:
2.1 乱序流中如何保证数据的准确性
那么怎么才能保证乱序流中的水位线也是正确的呢
水位线增加延迟策略,我们设置水位线后增加一个等待时间,比如9:00:00的数据到了我们不结束这个窗口,等待2秒,9:00:02的时候结束,那么就会保证数据尽可能的都在。
至于延迟多久,这是相对的,时间越短,准确性越低,数据的时效性越高,需要根据具体情况设置。
2.2 如何使用
三、窗口
1. 什么是窗口
Flink是一种流式引擎,数据是源源不断到来的,但是我们统计周期往往是有界的,因此在处理数据的时候我们需要把源源不断的数据切分成有界的数据,这就是窗口。
在开发指标中有些定义如下:
- 每分钟的浏览量
- 每位用户每周的会话数
- 每个传感器每分钟的最高温度
每分钟、每周、每分钟的统计数据,这就是窗口。
那么在乱序流中,即使数据不是有序到来的,Flink也可以正确的把数据分到对应的窗口中,在Flink中,每个窗口就类似一个桶,数据到来了放到对应的桶中,到达窗口关闭时刻,关闭对应的桶收集即可。
2. 窗口分类
1. 按照驱动类型分
1. 计数窗口(Count Window)
计数窗口就是多少个数据为一个窗口,比如10个数据为一个窗口,那么这个窗口就会等凑到10个数据后再关闭,和时间无关。实际很少用。
2. 时间窗口(Time Window)
时间窗口是根据时间划分窗口的,Flink中基本都是使用时间窗口,因为我们在统计数据的时候一般都是以时间维度来进行统计的,如上面所属,每分钟、每小时、每天等。
1. 按照分配数据规则分
1. 滚动窗口**(Tumbling Windows)**
滚动窗口是按照固定的窗口大小设置的,对数据均匀切片,窗口之间不会重叠,也不会有间隔,每个窗口之间都是无缝衔接。比如:每分钟浏览量
如何使用:
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)
2. 滑动窗口(Sliding Windows)
滑动窗口是在每个窗口之间有个滑动步长,滚动窗口就是滑动窗口的一种特例,当窗口大小等于滑动步长的时候就是滚动窗口,适合统计输出频率比较高的指标。比如:每10秒钟计算前1分钟的页面浏览量
如何使用:
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)
3. 会话窗口(Session Windows)
会话窗口是基于会话间隔进行切分的,会话窗口中窗口大小和窗口数量多少是不固定的,切分是根据会话间隔时间来的
如何使用:
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)
4. 全局窗口(Global Windows)
数据没有窗口切分,全局只有一个窗口,如果需要计算数据,需要使用触发器实现。
如何使用:
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)
四、窗口API使用
主要是对窗口API的使用说明
1. 按键分区(Keyed Windows)
按键分区是指对数据流进行keyby操作,数据流会按照key分为多个流,生成keyedStream,此时如果使用窗口函数,那么将会多所有key上进行**,相同的key发送到同一个并行子任务,窗口基于每一个key进行单独处理**,使用上需要使用keyby函数将数据流转换为keyedStream。
stream
.keyBy(<key selector>)
.window(<window assigner>)
.reduce|aggregate|process(<window function>);
2.非按键分区
如果实际场景中不需要使用按键分区,则可以不适用按键分区,此时我们可以调用非按键分区的API,缺点是此时的窗口不是并行处理的。窗口逻辑只能在一个任务( task)上执行,就相当于并行度变成了 1。 不推荐使用。
stream
.windowAll(<window assigner>)
.reduce|aggregate|process(<window function>);
3.窗口适配器( Window Assigners)
1. 窗口适配器的区分和使用:
- 滚动时间窗口
- 每分钟页面浏览量
// 滚动处理时间窗口
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)
// 滚动事件时间窗口
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)
- 滑动时间窗口
- 每10秒钟计算前1分钟的页面浏览量
// 滑动处理时间窗口
stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
// 滑动事件时间窗口
stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
- 会话窗口
- 每个会话的网页浏览量,其中会话之间的间隔至少为30分钟
// 会话处理时间窗口
stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
// 会话事件时间窗口
stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
以上都是一些可以使用的间隔时间 Time.milliseconds(n), Time.seconds(n), Time.minutes(n), Time.hours(n), 和 Time.days(n)。
基于时间的窗口分配器(包括会话时间)既可以处理 事件时间,也可以处理 处理时间。这两种基于时间的处理没有哪一个更好,需要根据实际情况选择。
使用处理时间,优点是延迟较低,缺点是:
- 无法正确处理历史数据,
- 无法正确处理超过最大无序边界的数据,
- 结果将是不确定的,
使用基于计数的窗口时,只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。尽管可以使用自定义触发器自己实现该行为,但无法应对超时和处理部分窗口。
我们可能在有些场景下,想使用全局 window assigner 将每个事件(相同的 key)都分配给某一个指定的全局窗口。 很多情况下,一个比较好的建议是使用 ProcessFunction。
3.窗口函数(ProcessWindowFunction)
1. 数据流转换图
2. 增量聚合函数(incremental aggregation functions)
增量聚合函数就是我们在处理源源不断的数据的时候,并不是等窗口结束的时候一次性计算窗口内的数据,而是每个数据到来的时候我们就计算一次,只是不输出结果,等窗口结束的时候我们再进行结果的输出。
比如我们常见的规约函数(reduceFunction)和聚合函数(AggregateFunction )
聚合函数函数示例
public class WaterMarkAndWindows
public static void main(String[] args)
// 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取数据源
DataStreamSource<String> sourceStream = env.fromCollection(Arrays.asList("aa", "bb", "cc", "aa", "cc", "aa"));
// 使用keyby
KeyedStream<String, String> keyedStream = sourceStream.keyBy(data -> data);
// 基于keyedStream使用滚动窗口,窗口大小为1分钟
WindowedStream<String, String, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)));
// 基于窗口函数进行聚合计算
SingleOutputStreamOperator<Long> aggregate = windowedStream.aggregate(new AggregateFunction<String, Long, Long>()
// 创建一个新的累加器,开始一个新的聚合。累加器是正在运行的聚合的状态。
@Override
public Long createAccumulator()
return 0l;
// 将给定的输入添加到给定的累加器,并返回新的累加器值。
@Override
public Long add(String str, Long aLong)
return ++aLong;
// 从累加器获取聚合结果。
@Override
public Long getResult(Long aLong)
return aLong;
// 合并两个累加器,返回合并后的累加器的状态。
@Override
public Long merge(Long aLong, Long acc1)
return aLong+acc1;
);
这里对四个方法进行总结下:
- **createAccumulator():**创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
- ** add():**将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
- **getResult():**从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
- **merge():**合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。
3. 全窗口函数(full window functions)
与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。 思路就是攒批处理
在 Flink 中,全窗口函数也有两种: WindowFunction 和 ProcessWindowFunction。
1. 窗口函数(WindowFunction)
我们可以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。 这个类中可以获取到包含窗口所有数据的可迭代集合( Iterable),还可以拿到窗口(Window)本身的信息。 当窗口到达结束时间需要触发计算时,就会调用这里的 apply 方法。我们可以从 input 集合中取出窗口收集的数据,结合 key 和 window 信息,通过收集器(Collector)输出结果。这里 Collector 的用法,与 FlatMapFunction 中相同。已被ProcessWindowFunction替代
stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
2. 处理窗口函数(ProcessWindowFunction)
除了可以拿到窗口中的所有数据之外, ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线( eventtime watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上,ProcessWindowFunction 是 Flink 底层 API处理函数( process function)中的一员。ProcessWindowFunction 同样需要将所有数据缓存下来、等到窗口触发计算时才使用。它其实就是一个增强版的 WindowFunction。
4.窗口生命周期
1. 窗口的创建
窗口的类型是根据上面的窗口适配器确定的,但是窗口的实际创建是根据数据来确定的,也就是说服务器时间到了9:00:00 但是没有9:00:00的数据到来,那么9点整的窗口是不会创建的,只有9:00:00的这个窗口的数据到来了之后才会进行创建。
2. 窗口计算的触发
每个窗口都有自己的窗口函数(window process)和触发器(trigger)。**窗口函数定义了窗口的聚合逻辑。触发器定义了调用窗口函数的条件。 **
对于不同的窗口类型,触发计算条件也不同:
- 滚动事件时间窗口,应该在水位线到达窗口结束时间的时候触发计算。(**如果设置了延时时间,但还没有到达设定的最大延迟时间,这期间内到达的迟到数据也会触发窗口计算 ** )
- 计数窗口,会在窗口中元素数量达到定义大小时触发计算 。
3. 窗口的销毁
一般情况下,当时间到达了结束点就会触发计算,然后输出计算结果并销毁窗口。但是Flink中只针对时间窗口有销毁机制,计数窗口是基于全局窗口实现的,全局窗口不会清除状态,因此不会被销毁。
5.迟到数据的处理(重点)
在Flink中我们即使定义了水位线的延迟策略,那么也不能保证数据全部都能及时被统计,Flink为了保证数据的最终一致性,采用了侧输出流的机制将迟到的数据收集到侧输出流中:
OutputTag<Event> lateTag = new OutputTag<Event>("late");
SingleOutputStreamOperator<Event> result = stream
.keyBy(...)
.window(...)
.sideOutputLateData(lateTag)
.process(...);
DataStream<Event> lateStream = result.getSideOutput(lateTag);
我们还可以指定 允许的延迟(allowed lateness) 的间隔,在这个间隔时间内,延迟的事件将会继续分配给窗口(同时状态会被保留),默认状态下,每个延迟事件都会导致窗口函数被再次调用(有时也称之为 late firing )。
默认情况下,允许的延迟为 0。换句话说,watermark 之后的元素将被丢弃(或发送到侧输出流)。
举例说明:
stream.keyBy(...)
.window(...)
.allowedLateness(Time.seconds(10))
.process(...);
当允许的延迟大于零时,只有那些超过最大无序边界以至于会被丢弃的事件才会被发送到侧输出流(如果已配置)。
结束~
以上是关于一文彻底搞懂前端沙箱的主要内容,如果未能解决你的问题,请参考以下文章