如何使用 Reactor Flux 处理 CSV 文件并输出为 JSON
Posted
技术标签:
【中文标题】如何使用 Reactor Flux 处理 CSV 文件并输出为 JSON【英文标题】:How to process a CSV file using Reactor Flux and output as JSON 【发布时间】:2020-01-25 14:26:37 【问题描述】:我有一个 CSV 文件,我想使用 Spring Reactor Flux 处理它。
给定一个带有标题的 CSV 文件,其中前两列是固定的,并且 可以有多个可选数据列
Id, Name, Group, Status
6EF3C06E-6240-1A4A-17D6-27E73F0CDD31, Harlan Ferguson, xy1, true
6B261437-217C-0FDF-741A-92477EE354EC, Risa Greene, xy2, false
4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1, Samson Blanchard, xy3, false
562C3486-E009-2C2D-9D3E-14355DB7D4D7, Damian Carson, xy4, true
...
...
...
我想使用 Flux 处理输入 所以输出是
[
"Id": "6EF3C06E-6240-1A4A-17D6-27E73F0CDD31",
"Name": "Harlan Ferguson",
"data":
"Group": "xyz1",
"Status": true
,
"Id": "6B261437-217C-0FDF-741A-92477EE354EC",
"Name": "Risa Greene",
"data":
"Group": "xy2",
"Status": false
,
"Id": "4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1",
"Name": "Samson Blanchard",
"data":
"Group": "xy3",
"Status": false
,
"Id": "562C3486-E009-2C2D-9D3E-14355DB7D4D7",
"Name": "Damian Carson",
"data":
"Group": "xy4",
"Status": true
]
我正在使用 CSVReader 进行流式传输和创建以及使用 Flux
new CSVReader( Files.newBufferedReader(file) );
Flux<String[]> fluxOfCsvRecords = Flux.fromIterable(reader);
几年后我要回到 Spring Reactor,所以我的理解有点生疏。
使用创建单声道标题
Mono<String[]> headerMono = fluxOfCsvRecords.next();
然后,
fluxOfCsvRecords.skip(1)
.flatMap(csvRecord -> headerMono.map(header -> header[0] + " : " + csvRecord[0]))
.subscribe(System.out::println);
这是中途代码,只是为了测试我是否能够合并来自标题和其余通量的数据,期待看到
Id : 6EF3C06E-6240-1A4A-17D6-27E73F0CDD31
Id : 6B261437-217C-0FDF-741A-92477EE354EC
Id : 4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1
Id : 562C3486-E009-2C2D-9D3E-14355DB7D4D7
但我的输出只是
4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1 : 6EF3C06E-6240-1A4A-17D6-27E73F0CDD31
如果有人能帮助我了解如何实现这一目标,我将不胜感激。
---------------更新------- --
尝试了另一种方法
Flux<String[]> take1 = fluxOfCsvRecords.take(1);
take1.flatMap(header -> fluxOfCsvRecords.map(csvRecord -> header[0] + " : " + csvRecord[0]))
.subscribe(System.out::println);
输出是
Id : 6B261437-217C-0FDF-741A-92477EE354EC
Id : 4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1
Id : 562C3486-E009-2C2D-9D3E-14355DB7D4D7
缺少标题后的行
【问题讨论】:
【参考方案1】:添加两个类
public class TopJson
private int Id;
private String name;
private InnerJson data;
public TopJson()
public TopJson(int id, String name, InnerJson data)
super();
Id = id;
this.name = name;
this.data = data;
class InnerJson
private String group;
private String status;
public InnerJson()
public InnerJson(String group, String status)
super();
this.group = group;
this.status = status;
转换为适当的类型并用于创建对象。
fluxOfCsvRecords.skip(1)
.map((Function<String, TopJson>) x ->
String[] csvRecord = line.split(",");// a CSV has comma separated lines
return new TopJson(Integer.parseInt(csvRecord[0]), csvRecord[1],
new InnerJson(csvRecord[2], csvRecord[3]));
).collect(Collectors.toList()));
【讨论】:
嗨,Harkesh,感谢您发布回复。 sn-p 不起作用,因为存在编译错误。但在快速查看代码后,您似乎忽略了 CSV 标头。对于固定的 CSV 模式,这可能有效,但有一些可选列,其中标题可以是任何内容 - 因此我在数据字段中捕获它们。我希望这可以澄清要求。 等待我没有跳过标题以上是关于如何使用 Reactor Flux 处理 CSV 文件并输出为 JSON的主要内容,如果未能解决你的问题,请参考以下文章