AbstractFileOutputWriter 生成重复的 tmp 文件
Posted
技术标签:
【中文标题】AbstractFileOutputWriter 生成重复的 tmp 文件【英文标题】:AbstractFileOutputWriter Generating duplicate tmp files 【发布时间】:2016-08-19 00:28:45 【问题描述】:我有一个使用 Kafka 日志并将其写入 HDFS 的 Apache Apex 应用程序。
DAG 非常简单,有一个 Kafka 消费者(20 个 2 GB 内存的分区用于操作员)通过流连接到“MyWriter extends AbstractFileOutputOperator”。
问题: 1.我看到Writer多次重复写入相同大小和相同数据的.tmp文件。我尝试增加写入操作符的内存,增加写入器的分区数量等。这个问题仍然存在。
我尝试向 MyWriter 添加/删除 requestFinalize。还是同样的问题。
@Override
public void endWindow()
if (null != fileName)
requestFinalize(fileName);
super.endWindow();
这是我的 properties.xml 的子集
<property>
<name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
<value>1000</value>
</property>
<property>
<name>dt.application.myapp.operator.*.attr.APPLICATION_WINDOW_COUNT</name>
<value>60</value>
</property>
<property>
<name>dt.application.myapp.operator.*.attr.CHECKPOINT_WINDOW_COUNT</name>
<value>60</value>
</property>
<property>
<name>dt.application.myapp.operator.myWriter.attr.PARTITIONER</name>
<value>com.datatorrent.common.partitioner.StatelessPartitioner:20</value>
</property>
<property>
<name>dt.application.myapp.operator.myWriter.prop.maxLength</name>
<value>1000000000</value> <!-- 1 GB File -->
</property>
这是我能够从 dt.log 为操作员获取的堆栈跟踪: 操作员可能会在不同的容器中重新部署,抛出此异常并继续写入重复文件。
java.lang.RuntimeException: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:418)
at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112)
at com.datatorrent.stram.engine.Node.setup(Node.java:187)
at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309)
at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130)
at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
Caused by: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1219)
at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1211)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1211)
at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:411)
... 5 more
2016-08-17 22:17:01,108 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [161, 177]
2016-08-17 22:17:01,116 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy complete.
2016-08-17 22:17:02,121 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:02,625 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:03,129 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
【问题讨论】:
【参考方案1】:基本运算符的代码位于以下链接中,并在 下面的cmets: https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
通过将最大文件大小设置为 1GB,您将自动启用滚动文件;相关字段是:
protected Long maxLength = Long.MAX_VALUE;
protected transient boolean rollingFile = false;
如果前者的值小于默认值Long.MAX_VALUE
,则后者在setup()
方法中设置为true。
启用滚动文件后,文件会自动完成,因此您不应调用requestFinalize()
。
其次,在您的MyWriter
类中,删除endWindow()
覆盖,并确保在setup()
方法中创建包含操作员ID 的所需文件名,并在getFileName()
覆盖中返回此文件名;这确保了多个分区器不会互相踩踏。例如:
@NotNull
private String fileName; // current base file name
private transient String fName; // per partition file name
@Override
public void setup(Context.OperatorContext context)
// create file name for this partition by appending the operator id to
// the base name
//
long id = context.getId();
fName = fileName + "_p" + id;
super.setup(context);
LOG.debug("Leaving setup, fName = , id = ", fName, id);
@Override
protected String getFileName(Long[] tuple)
return fName;
文件基名(上面代码中的fileName
)可以直接在代码中设置或从 XML 文件中的属性初始化(您还需要为其添加 getter 和 setter)。
您可以在以下位置查看此类用法的示例: https://github.com/DataTorrent/examples/tree/master/tutorials/fileOutput
几个额外的建议:
-
将分区计数设置为 1(或注释掉设置
PARTITIONER
属性的 XML)并确保一切正常。这将消除任何与分区无关的问题。如果可能,还将最大文件大小减小到 2K 或 4K,以便更轻松地进行测试。
一旦单分区情况有效,将分区数增加到 2。如果这样有效,任意更大的数字(在合理范围内)也应该有效。
【讨论】:
以上是关于AbstractFileOutputWriter 生成重复的 tmp 文件的主要内容,如果未能解决你的问题,请参考以下文章