Apache NiFi深度扩展

Posted victor的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache NiFi深度扩展相关的知识,希望对你有一定的参考价值。

介绍

该高级文档旨在深入了解NiFi的实施和设计决策。它假设读者已经阅读了足够的其他文档来了解NiFi的基础知识。

FlowFiles是NiFi的核心,也是基于流程的设计。FlowFile是一种数据记录,由指向其内容(有效负载)的指针和支持内容的属性组成,该指针与一个或多个起源事件相关联。属性是用作FlowFile元数据的键/值对,例如FlowFile文件名。内容是文件的实际数据或有效负载。原产地是FlowFile发生的事情的记录。这些部分中的每一个都有自己的存储库(repo)用于存储。

存储库的一个关键方面是不可变性。内容存储库中的内容和FlowFile存储库中的数据是不可变的。当FlowFile的属性发生更改时,属性的新副本将在内存中创建,然后保留在磁盘上。当为给定的FlowFile更改内容时,将读取其原始内容,通过转换进行流式处理,并将其写入新流。然后FlowFile的内容指针更新到磁盘上的新位置。因此,FlowFile内容存储的默认方法可以说是不可变版本的内容存储。这样做的好处很多,包括:大幅减少典型的复杂处理图所需的存储空间,自然重放功能,利用操作系统缓存,降低随机读/写性能,很容易推理。以前的修订版根据“nifi.properties”文件中设置的归档属性进行保存,并在中进行了概述NiFi系统管理员指南

库(Repositories)

NiFi使用了三个存储库。每个都存在于OS / Host的文件系统中,并提供特定的功能。为了完全理解FlowFiles以及底层系统如何使用它们,了解这些存储库非常重要。所有三个存储库都是NiFi用于保存数据的本地存储上的目录。

  •  FlowFile Repository  包含流中所有当前FlowFiles的元数据。

  •  Content Repository保存当前和过去的FlowFiles的内容。

  • Provenance Repository 保存FlowFiles的历史记录。

技术图片

FlowFile存储库

系统主动处理的FlowFiles保存在JVM内存中的哈希映射中(更多关于Deeper View中的内容:内存和磁盘上的FlowFiles))。这使得处理它们非常有效,但是由于诸多原因(例如断电,内核紧张,系统升级和维护周期),需要辅助机制来提供跨流程重启的数据持久性。FlowFile存储库是系统中当前存在的每个FlowFiles的元数据的“预写日志(Write-Ahead Log)”(或数据记录)。此FlowFile元数据包括与FlowFile关联的所有属性,指向FlowFile的实际内容的指针(存在于内容回购中)和FlowFile的状态,例如FlowFile所属的Connection / Queue。 Ahead Log为NiFi提供了处理重启和意外系统故障所需的弹性。

FlowFile存储库充当NiFi的预写日志,因此当FlowFiles流经系统时,每个更改都会在作为事务工作单元发生之前记录在FlowFile存储库中。这允许系统在处理一段数据时准确地知道节点处于什么步骤。如果节点在处理数据时出现故障,则可以轻松地从重新启动时停止的位置恢复(更深入地介绍事务中系统故障的影响)。日志中FlowFiles的格式是沿途发生的一系列增量(或更改)。NiFi通过恢复FlowFile的“快照”(在Repository指向检查时创建)然后重放每个增量来恢复FlowFile。

系统会定期自动拍摄快照,从而为每个FlowFile创建一个新快照。系统通过序列化哈希映射中的每个FlowFile并使用文件名“.partial”将其写入磁盘来计算新的基本检查点。随着检查点的进行,新的FlowFile基线被写入“.partial”文件。检查点完成后,将删除旧的“快照”文件,并将“.partial”文件重命名为“snapshot”。

系统检查点之间的时间段可在“nifi.properties”文件中进行配置(在“ NiFi系统管理员指南”中有说明)。默认值为两分钟。

系统失败对交易的影响(Effect of System Failure on Transactions)

NiFi通过在各自的FlowFile Repo中记录当时每个节点上发生的事情来防止硬件和系统故障。如上所述,FlowFile Repo是NiFi的预写日志。当节点重新联机时,它首先检查“快照”和“.partial”文件,以恢复其状态。节点接受“快照”并删除“.partial”(如果存在),或者如果“快照”文件不存在,则将“.partial”文件重命名为“snapshot”。

如果节点在内容发生故障时处于写入内容的中间,那么由于Copy On Write(下面提到)和Immutability(上面提到的)范例,没有任何内容被破坏。由于FlowFile事务从不修改原始内容(由内容指针指向),因此原始内容是安全的。当NiFi发生故障时,对更改的写入声明将成为孤立状态,然后通过后台垃圾收集进行清理。这提供了对最后已知稳定状态的“回滚”。

然后,Node从FlowFile恢复其状态。有关该过程的更深入,逐步说明,请参阅 NiFi的预写日志实施

就交易工作单元而言,这种设置允许NiFi在逆境中具有很强的弹性,确保即使NiFi突然被杀死,它也可以在没有任何数据丢失的情况下进行回收。

更深入的视野:内存和磁盘上的FlowFiles ( Deeper View: FlowFiles in Memory and on Disk )

术语“FlowFile”有点用词不当。这将导致人们相信每个FlowFile对应于磁盘上的文件,但事实并非如此。FlowFile属性存在两个主要位置,上面说明的预写日志和工作内存中的哈希映射。此哈希映射引用了Flow中正在使用的所有FlowFiles。此映射引用的对象与处理器使用并保存在连接队列中的对象相同。由于FlowFile对象保存在内存中,因此处理器获取FlowFile所需要做的就是让ProcessSession从队列中获取它。

当FlowFile发生更改时,delta会写入Write-Ahead Log,并相应地修改内存中的对象。这允许系统快速使用FlowFiles,同时还可以跟踪发生的事件以及提交会话时将发生的情况。这提供了非常坚固耐用的系统。

还有“交换”FlowFiles的概念。当连接队列中的FlowFiles数超过“nifi.queue.swap.threshold”属性中设置的值时,会发生这种情况。连接队列中具有最低优先级的FlowFile被序列化并以10,000个批次的“交换文件”写入磁盘。然后从上面提到的哈希映射中删除这些FlowFiles,连接队列负责确定何时将文件交换回内存。当FlowFiles被换出时,会通知FlowFile repo并保留交换文件的列表。系统检查点时,快照包含换出文件的部分。重新交换交换文件时,FlowFiles会重新添加到哈希映射中。这种交换技术,

内容存储库(Content Repository)

内容存储库只是本地存储中的一个位置,其中存在所有FlowFiles的内容,它通常是三个存储库中最大的。如介绍部分所述,此存储库利用不变性和写时复制范例来最大化速度和线程安全性。影响内容回购的核心设计决策是将FlowFile的内容保存在磁盘上,并在需要时将其读入JVM内存。这使得NiFi可以处理微小且大型的对象,而无需生产者和消费者处理器将完整的对象保存在内存中。因此,在不损害内存的情况下,分割,聚合和转换非常大的对象等操作非常容易。

以同样的方式,JVM Heap具有垃圾收集过程以在需要空间时回收无法到达的对象时,NiFi中存在专用线程来分析未使用内容的内容存储库(更多信息在“更深层视图:删除检查点后”) “ 部分)。在将FlowFile的内容标识为不再使用后,它将被删除或存档。如果在‘nifi.properties‘中启用了归档,那么FlowFile的内容将存在于内容回购中,直到它老化(在一定时间后删除)或由于内容回购占用太多空间而被删除。存档和/或删除的条件在‘nifi.properties‘文件中配置(“nifi.content.repository.archive.max.retention.period”,“nifi.content.repository.archive.max.usage.percentage”NiFi系统管理员指南。有关删除内容的更多信息,请参阅“数据出口”部分。

更深入的视野:内容声明(Deeper View: Content Claim)

通常,在谈论FlowFile时,对其内容的引用可以简称为内容的“指针”。但是,FlowFile内容参考的底层实现具有多层复杂性。内容存储库由磁盘上的文件集合组成。这些文件被分箱到容器和部分中。Section是Container的子目录。可以将Container视为内容存储库的“根目录”。但是,内容存储库可以由许多容器组成。这样做是为了让NiFi能够并行利用多个物理分区。“NiFi能够并行读取和写入所有这些磁盘,以实现数百兆字节甚至千兆字节的数据速率。单个节点上的第二个磁盘吞吐量。

为了跟踪FlowFile的内容,FlowFile有一个“Content Claim”对象。此内容声明引用了资源声明,其中包含内容,文件内容的偏移量以及内容的长度。要访问内容,内容存储库使用Resource Claim的属性向下钻取到磁盘上的特定文件,然后在从文件传输内容之前寻找资源声明指定的偏移量。

完成了这一抽象层(资源声明),因此磁盘上没有每个FlowFile内容的文件。不变性的概念是实现这一目标的关键。由于内容在写入后永远不会更改(“写入时复制”用于进行更改),因此如果FlowFile的内容发生更改,则不会出现内存碎片或移动数据。通过利用磁盘上的单个文件来保存许多FlowFiles的内容,NiFi能够提供更好的吞吐量,通常接近磁盘提供的最大数据速率。

起源库(Provenance Repository)

Provenance Repository是存储每个FlowFile的历史记录的地方。该历史记录用于提供每条数据的数据沿袭(也称为监管链)。每次为FlowFile发生事件(FlowFile被创建,分叉,克隆,修改等)时,都会创建一个新的来源事件。此来源事件是FlowFile的快照,因为它看起来很适合当时存在的流程。创建起源事件时,它会复制所有FlowFile的属性和指向FlowFile内容的指针,并将FlowFile的状态(例如与其他起源事件的关系)聚合到Provenance Repo中的一个位置。除了数据过期之外,此快照不会更改。

由于所有FlowFile属性和指向内容的指针都保存在Provenance Repository中,因此Dataflow Manager不仅能够查看该数据的谱系或处理历史记录,而且还能够以后查看数据本身甚至可以从流程中的任何一点重放数据。这种情况的一个常见用例是特定的下游系统声称没有收到数据。数据沿袭可以准确显示数据何时传送到下游系统,数据的样子,文件名以及数据发送到的URL - 或者可以确认数据确实从未发送过。在任何一种情况下,只需单击按钮(或通过访问适当的HTTP API端点)即可重播Send事件,以便仅将数据重新发送到该特定下游系统。或者,

但请记住,由于Provenance不会复制内容回购中的内容,只是将FlowFile的指针复制到内容,因此可以在删除引用它的出处事件之前删除内容。这意味着用户将无法再查看内容或稍后重播FlowFile。但是,用户仍然可以查看FlowFile的血统并了解数据发生了什么。例如,即使数据本身不可访问,用户仍然能够看到数据的唯一标识符,文件名(如果适用),接收时间,接收地点,操作方式,发送的地方,等等。此外,由于FlowFile的属性可用,

  由于起源事件是FlowFile的快照,因为它存在于当前流中,因此对流的更改可能会影响以后重放起源事件的能力。例如,如果从流中删除了连接,则无法从流中的该点重放数据,因为现在无法将数据排入队列以进行处理。

有关Provenance Repository背后的设计决策,请查看Persistent Provenance Repository Design

更深入的视野:原型日志文件(Deeper View: Provenance Log Files)

每个来源事件都有两个映射,一个用于事件之前的属性,另一个用于更新的属性值。通常,来源事件不会存储在发出事件时存在的属性的更新值,而是存储会话提交时的属性值。事件被缓存并保存,直到提交会话为止,一旦提交了会话,就会在提交会话时使用与FlowFile关联的属性发出事件。此规则的例外是“SEND”事件,在这种情况下,事件包含发出事件时存在的属性。这样做是因为如果还发送了属性本身,则准确地说明确切地发送了哪些信息非常重要。

随着NiFi的运行,有一个由16个出处日志文件组成的滚动组。当发现起源事件时,它们被写入16个文件中的一个(有多个文件来增加吞吐量)。定期滚动日志文件(默认时间范围是每30秒)。这意味着新创建的出处事件开始写入一组16个日志文件,并处理原始文件以进行长期存储。首先,将翻转的日志合并到一个文件中。然后可选地压缩文件(由“nifi.provenance.repository.compress.on.rollover”属性确定)。最后,事件使用Lucene编制索引并可用于查询。

一个单独的线程处理原始日志的删除。管理员可以设置控制删除源项日志的两个条件是它可以占用的最大磁盘空间量以及日志的最大保留持续时间。线程按上次修改日期对repo进行排序,并在超出其中一个条件时删除最旧的文件。

Provenance Repo是一个Lucene索引,分为多个分片。这是出于多种原因。首先,Lucene使用32位整数作为文档标识符,因此Lucene支持的没有分片的最大文档数量是有限的。其次,如果我们知道每个分片的时间范围,就可以轻松搜索多个线程。此外,这种分片还允许更有效的删除。在从磁盘中删除整个分片之前,NiFi会一直等到分片中的所有事件都被安排删除。这使得我们在删除时不必更新Lucene索引。

通用存储库说明

多个物理存储点

对于Provenance和Content repos,可以选择跨多个物理分区对信息进行条带化。如果管理员想要跨多个磁盘联合读写,那么管理员就会这样做。repo(Content或Provenance)仍然是一个逻辑存储,但是系统会自动在多个卷/分区上划分写入。这些目录在‘nifi.properties‘文件中指定。

最佳实践

最好的做法是尽可能少地分析FlowFile的内容,而是将内容中的关键信息提取到FlowFile的属性中; 然后从FlowFile属性读取/写入信息。其中一个例子是ExtractText处理器,它从FlowFile内容中提取文本并将其作为属性放置,以便其他处理器可以使用它。这提供了比连续处理FlowFile的整个内容更好的性能,因为属性保存在内存中,并且根据存储在每个内容库中的数据量,更新FlowFile存储库比更新内容存储库快得多。

FlowFile的生命

为了更好地理解repos如何相互影响,NiFi的基本功能以及FlowFile的生命周期; 下一节将包括实际流程中不同点的FlowFile示例。该流程是一个名为“WebCrawler.xml”的模板,可在此处获取:https//cwiki.apache.org/confluence/display/NIFI/Example+Dataflow+Templates

在较高级别,此模板可以访问GetHTTP处理器中配置的种子URL,然后使用RouteText处理器分析响应,以查找关键字的实例(在本例中为“nifi”),以及要触发的潜在URL。然后,InvokeHTTP使用原始种子网页中找到的URL执行HTTP Get请求。响应根据状态代码属性进行路由,并且只有200-202个状态代码被路由回原始的RouteText处理器进行分析。

该流还会检测重复的URL并阻止再次处理它们,在找到关键字时向用户发送电子邮件,记录所有成功的HTTP请求,并捆绑成功的请求以在磁盘上压缩和存档。

  要使用此流程,您需要配置几个选项。首先,必须使用默认属性添加DistributedMapCacheServer控制器服务。在撰写本文时,无法将控制器服务显式添加到模板中,因为没有处理器引用该服务,因此不包括该服务。另外,要获取电子邮件,必须使用您的电子邮件凭据配置PutEmail处理器。最后,要使用HTTPS,必须使用正确的密钥和信任存储配置StandardSSLContextService。请记住,必须使用适当的证书颁发机构配置信任库才能使用网站。下面的命令是使用“keytool”命令将默认Java 1.8.0_60 CA添加到名为myTrustStore的信任库的示例:keytool -importkeystore -srckeystore /Library/Java/JavaVirtualMachines/jdk1.8.0_60。

Web爬虫模板

 

 

技术图片

 

 

  由于Web爬行的随机性,在InvokeHttp处理器上出现诸如“连接超时”之类的消息的公告并不罕见。

数据入口

当生产者处理器调用“ProcessSession.create()”,然后调用ProvenanceReporter时,在系统中创建FlowFile。“ProcessSession.create()”调用创建一个空的FlowFile,其中包含一些核心属性(标准进程会话的文件名,路径和uuid),但没有父项的任何内容或沿袭(create方法被重载以允许父FlowFiles的参数)。然后,生产者处理器将内容和属性添加到FlowFile。

ProvenanceReporter用于为FlowFile发出Provenance事件。如果文件由NiFi从外部实体未接收的数据创建,则应发出“CREATE”事件。如果数据是根据从外部源接收的数据创建的,则应发出“RECEIVE”事件。Provenance事件分别使用“ProvenanceReporter.create()”和“ProvenanceReporter.receive()”进行。

在我们的WebCrawler流程中,GetHTTP处理器使用“ProcessSession.create()”创建初始FlowFile,并使用“ProvenanceReporter.receive()”记录数据的接收。此方法调用还提供从中接收数据的URL,传输数据所花费的时间以及添加到FlowFile的任何FlowFile属性。例如,HTTP标头可以添加为FlowFile属性。

技术图片

通过参考传递

基于流的编程的一个重要方面是黑盒之间资源受限关系的概念。在NiFi中,这些分别是队列和处理器。只需将引用传递给FlowFile(类似于EIP中的“Claim Check”模式),FlowFiles就可以通过队列从一个处理器路由到另一个处理器。

在WebCrawler流程中,InvokeHTTP处理器通过HTTP GET请求到达URL,并根据HTTP服务器的响应向FlowFile添加状态代码属性。更新FlowFile的文件名后(在InvokeHttp之后的UpdateAttribute处理器中)有一个RouteOnAttribute处理器,它将具有成功状态代码属性的FlowFiles路由到两个不同的处理器。RouteOnAttribute处理器“删除”那些不匹配的(“数据出口”部分),因为它被配置为自动终止任何与任何路由规则不匹配的数据。进入RouteOnAttribute处理器有一个FlowFile(F1),它包含状态代码属性并指向内容(C1)。有一个起源事件指向C1并包含F1的快照但省略以更好地关注路由。此信息分别位于FlowFile,Content和Provenance Repos中。

在RouteOnAttribute处理器检查FlowFile的状态代码属性后,它确定应将其路由到两个不同的位置。首先发生的事情是处理器克隆FlowFile来创建F2。这会复制所有属性和指向内容的指针。由于它仅仅是路由和分析属性,因此内容不会改变。然后将FlowFiles添加到相应的连接队列以等待下一个处理器检索它们以进行处理。

ProvenanceReporter记录发生的更改,包括CLONE和两个ROUTE事件。每个事件都有一个指向相关内容的指针,并以快照的形式包含相应FlowFiles的副本。

技术图片

扩展路由用例

除了基于属性路由FlowFiles之外,一些处理器还基于内容进行路由。虽然效率不高,但有时需要将FlowFile的内容拆分为多个FlowFiles。

一个例子是SplitText处理器。此处理器分析查找结束行字符的内容,并创建包含可配置行数的新FlowFiles。Web Crawler流程使用它将潜在的URL拆分为单行以进行URL提取,并充当InvokeHttp的请求。SplitText处理器的一个好处是,由于处理器正在拆分连续的块(没有FlowFile内容是不相交或重叠的),处理器可以在不复制任何内容的情况下进行此路由。它所做的只是创建新的FlowFiles,每个FlowFiles都有一个指向原始FlowFile内容的一部分的指针。这可以通过NiFi API内置的内容划分和拆分工具实现。虽然在可行的情况下以这种方式分割并不总是可行的,但性能益处是相当大的。

RouteText是一个处理器,它显示了为什么某些样式的路由可能需要复制内容。该处理器分析每一行并根据可配置属性将其路由到一个或多个关系。当多个行被路由到相同的关系时(对于相同的输入FlowFile),这些行被合并到一个FlowFile中。由于行可能是不相交的(第1行和第100行路由到相同的关系)并且一个指针无法准确描述FlowFile的内容,因此处理器必须将内容复制到新位置。例如,在Web Crawler流程中,RouteText处理器将包含“nifi”的所有行路由到“NiFi”关系。因此,当有一个输入FlowFile在网页上多次出现“nifi”时,

漏斗

漏斗是一个组件,它从一个或多个连接获取输入并将它们路由到一个或多个目标。用户指南中描述了其典型用例。无论用例如何,如果漏斗下游只有一个处理器,那么漏斗就不会发现物源事件,它在原型图中看起来是不可见的。如果有多个下游处理器(如WebCrawler中的处理器),则会发生克隆事件。参考下图,您可以看到从原始FlowFile(F1)克隆了一个新的FlowFile(F2),就像上面的Routing一样,新的FlowFile只有一个指向相同内容的指针(内容不会被复制) )。

从开发人员的角度来看,您可以将漏斗视为一个非常简单的处理器。当它被安排运行时,它只是对输出连接执行“ProcessSession.get()”然后“ProcessSession.transfer()”。如果有多个输出连接(如下例所示),则运行“ProcessSession.clone()”。最后调用“ProcessSession.commit()”,完成交易。

技术图片

写入时复制

在前面的示例中,只有路由但没有更改FlowFile的内容。下一个示例重点介绍模板的CompressContent处理器,该处理器压缩包含排队等待分析的网页的合并FlowFiles包。

在此示例中,FlowFile F1的内容C1正在CompressContent处理器中进行压缩。由于C1是不可变的,我们想要一个完全可重复播放的起源历史,我们不能只覆盖C1。为了“修改”C1,我们执行“写入时复制”,我们通过在将内容复制到内容存储库中的新位置时修改内容来实现。执行此操作时,FlowFile引用F1将更新为指向新的压缩内容C2,并创建引用新FlowFile F1.1的新Provenance事件P2。因为FlowFile repo是不可变的,所以不是修改旧F1,而是创建新的delta(F1.1)。以前的来源事件仍然具有指向Content C1的指针并包含旧属性,但它们不是FlowFile的最新版本。

  为了专注于Copy on Write事件,省略了导致此点的FlowFile(F1)起源事件。

技术图片

写用例的扩展复制

Copy on Write的一个独特案例是MergeContent处理器。几乎每个处理器一次只能作用于一个FlowFile。MergeContent处理器的独特之处在于它接收多个FlowFiles并将它们合并为一个。目前,MergeContent有多种不同的合并策略,但所有这些都需要将输入FlowFiles的内容复制到新的合并位置。在MergeContent完成之后,它会发出类型为“JOIN”的起源事件,该事件确定给定的父项连接在一起以创建新的子FlowFile。

更新属性

使用FlowFile的属性是NiFi的核心方面。假设每次处理器在其上执行时,属性足够小以完全读入本地存储器。因此,它们易于使用非常重要。由于属性是路由和处理FlowFile的核心方式,因此只需更改FlowFile属性的处理器就很常见。一个这样的例子是UpdateAttribute处理器。所有UpdateAttribute处理器都会根据处理器的属性更改传入的FlowFile属性。

看一下图表,在处理器之前有FlowFile(F1),它有属性和指向内容的指针(C1)。处理器通过创建仍具有指向内容(C1)的指针的新增量(F1.1)来更新FlowFile的属性。发生这种情况时会发出“ATTRIBUTES_MODIFIED”来源事件。

在此示例中,先前的处理器(InvokeHTTP)从URL获取信息并创建新的响应FlowFile,其文件名属性与请求FlowFile相同。这无助于描述响应FlowFile,因此UpdateAttribute处理器将filename属性修改为更相关的内容(URL和事务ID)。

  为了集中于ATTRIBUTES_MODIFIED事件,省略了导致此点的FlowFile(F1)起源事件。

技术图片

典型用例说明

除了通过UpdateAttribute添加任意属性之外,从FlowFile的内容中提取信息到属性中是一个非常常见的用例。Web Crawler流程中的一个这样的示例是ExtractText处理器。当它嵌入FlowFile的内容时,我们不能使用URL,因此我们从FlowFile的内容中提取URL并将其作为属性放置。这样我们就可以使用表达式语言在InvokeHttp的URL属性中引用该属性。

数据出口

最终,NiFi中的数据将达到已加载到另一个系统并且我们可以停止处理它的点,或者我们将FlowFile过滤掉并确定我们不再关心它。无论哪种方式,FlowFile最终都会被“删除”。“DROP”是一个起源事件,意味着我们不再在Flow中处理FlowFile并且可以删除它。它保留在FlowFile存储库中,直到下一个存储库检查点。Provenance Repository将Provenance事件保留在‘nifi.properties‘中规定的时间内(默认为24小时)。一旦FlowFile离开NiFi并且将预写日志的后台检查点处理发生到紧凑/删除,则内容回购中的内容将被标记为删除。这是因为除非另一个FlowFile引用相同的内容或者在‘nifi.properties‘中启用了归档。如果启用了归档,则内容将一直存在,直到达到最大磁盘百分比或达到最大保留期限(也在‘nifi.properties‘中设置)。

更深入的视图:检查点后删除

  此部分主要依赖于上面“深层视图:内容声明”部分中的信息。

一旦“.partial”文件与底层存储机制同步并重命名为新快照(详见FlowFile Repo部分),就会回调FlowFile Repo以释放所有旧内容声明(这是在检查点后完成的)如果出现问题,内容不会丢失)。FlowFile Repo知道可以发布哪些内容声明并通知资源声明管理器。资源声明管理器跟踪已发布的所有内容声明以及准备删除哪些资源声明(当流中不再有任何FlowFiles引用它时,资源声明已准备好被删除)。

内容回购会定期向资源声明管理器询问可以清除哪些资源声明。然后,内容回购决定是否应归档或删除资源声明(基于‘nifi.properties‘文件中“nifi.content.repository.archive.enabled”属性的值)。如果禁用存档,则只需从磁盘中删除该文件。否则,运行后台线程以查看何时应删除存档(基于上述条件)。此后台线程保留10,000个最旧内容声明的列表,并将其删除,直到低于必要阈值。如果内容声明用完,它会扫描仓库中最旧的内容以重新填充列表。这提供了一个在Java堆利用率和磁盘I / O利用率方面都高效的模型。

关联不同的数据

Provenance Repository的一个特性是它允许有效访问顺序发生的事件。然后可以使用NiFi报告任务来迭代这些事件并将它们发送到外部服务。如果其他系统也向此外部系统发送类似类型的事件,则可能需要将NiFi FlowFile与另一条信息相关联。例如,如果使用GetSFTP来检索数据,则NiFi使用其自己的唯一UUID来引用该FlowFile。但是,如果放置文件的系统通过文件名引用文件,则NiFi应该有一种机制来指示这些是同一条数据。这是通过调用ProvenanceReporter.associate()方法并提供FlowFile的UUID和备用名称(在本例中为文件名)来完成的。由于确定两个数据相同可能是流依赖的,因此DataFlow Manager通常需要进行此关联。一种简单的方法是使用UpdateAttribute处理器并将其配置为设置“alternate.identifier”属性。这会自动发出“associate”事件,使用添加的任何值作为“alternate.identifier”属性。

闭幕致辞

NiFi与三个存储库结合使用写时复制,传递引用和不变性概念,是一个快速,高效,强大的企业数据流平台。本文档介绍了可插拔接口的具体实现。其中包括FlowFile存储库的基于Write-Ahead Log的实现,基于文件的Provenance存储库和基于文件的内容存储库。这些实现是NiFi默认设置,但是可插拔,因此,如果需要,用户可以编写自己的实现以满足某些用例。

希望本文档能够让您更好地了解NiFi的低级功能及其背后的决策。如果您希望更深入地解释某些内容,或者您??认为应该包含这些内容,请随时发送电子邮件至Apache NiFi Developer邮件列表(dev@nifi.apache.org)。

以上是关于Apache NiFi深度扩展的主要内容,如果未能解决你的问题,请参考以下文章

NIFI Apache NiFI 使用技巧

Apache Nifi 组件开发

Apache NiFi 概述

Apache NiFi系统管理员指南 [ 一 ]

Apache nifi 第二篇(小白初试) nifi数据对接流程初次尝试

Apache Beam 和 Apache Nifi 之间的区别