毕设三 spark与phoenix集成插入数据/解析json数组
Posted tele-share
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了毕设三 spark与phoenix集成插入数据/解析json数组相关的知识,希望对你有一定的参考价值。
需求:将前些日子采集的评论存储到hbase中
思路:
先用fastjson解析评论,然后构造rdd,最后使用spark与phoenix交互,把数据存储到hbase中
部分数据:
1 [ 2 { 3 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 4 "creationTime": "2019-04-08 01:13:42", 5 "content": "此用户没有填写评价内容", 6 "label": [] 7 }, 8 { 9 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 10 "creationTime": "2019-03-29 11:49:36", 11 "content": "不错", 12 "label": [] 13 }, 14 { 15 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 16 "creationTime": "2019-03-21 21:13:07", 17 "content": "正品没什么毛病。信号好像是照别的差一点,但是还可以,不是特别差。分辨率不是那么好,但是也不是特别差。一般般。手机不卡。打游戏很顺畅。官方正品没有翻车。", 18 "label": [ 19 { 20 "labelName": "功能齐全" 21 } 22 ] 23 }, 24 { 25 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 26 "creationTime": "2019-03-22 09:56:22", 27 "content": "不错是正品", 28 "label": [ 29 { 30 "labelName": "系统流畅" 31 }, 32 { 33 "labelName": "声音大" 34 }, 35 { 36 "labelName": "做工精致" 37 }, 38 { 39 "labelName": "待机时间长" 40 } 41 ] 42 }, 43 { 44 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 45 "creationTime": "2019-03-13 07:27:56", 46 "content": "性价比很高的手机,用习惯了ios转安卓7个月,重新回归,只能说,用苹果省心。苏宁质量有保障,送货快,价格优惠,推荐购买!", 47 "label": [ 48 { 49 "labelName": "系统流畅" 50 }, 51 { 52 "labelName": "功能齐全" 53 }, 54 { 55 "labelName": "包装一般" 56 } 57 ] 58 }, 59 { 60 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 61 "creationTime": "2019-02-25 22:03:18", 62 "content": "弟弟就想要一个苹果手机。本来打算买8p的。然后我,推荐他这款xr,是最新款。价格总体来说性价比,比8p好。买了很快就到了,第二天就。屏幕很大,还是面容id。苹果x大很多。比***x小一点", 63 "label": [ 64 { 65 "labelName": "外观漂亮" 66 }, 67 { 68 "labelName": "做工精致" 69 }, 70 { 71 "labelName": "反应快" 72 }, 73 { 74 "labelName": "系统流畅" 75 } 76 ] 77 }, 78 { 79 "referenceName": "【券后低至5388】Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 80 "creationTime": "2019-02-21 12:45:22", 81 "content": "物流很棒,xr价格可以接受、性能稳定!", 82 "label": [] 83 }, 84 { 85 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 86 "creationTime": "2019-02-22 12:14:55", 87 "content": "很不错的手机除了有点厚,极致的体验,黑边完全没有影响", 88 "label": [ 89 { 90 "labelName": "拍照效果好" 91 }, 92 { 93 "labelName": "待机时间长" 94 }, 95 { 96 "labelName": "电池耐用" 97 }, 98 { 99 "labelName": "性价比高 " 100 } 101 ] 102 }, 103 { 104 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 105 "creationTime": "2019-02-13 00:28:03", 106 "content": "非常非常好的商品。很不错,下次再来", 107 "label": [ 108 { 109 "labelName": "信号稳定" 110 }, 111 { 112 "labelName": "反应快" 113 }, 114 { 115 "labelName": "声音大" 116 }, 117 { 118 "labelName": "做工精致" 119 } 120 ] 121 }, 122 { 123 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G 手机", 124 "creationTime": "2019-04-02 17:29:43", 125 "content": "此用户没有填写评价内容", 126 "label": [] 127 } 128 ] 129 [ 130 { 131 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 132 "creationTime": "2019-04-05 18:13:14", 133 "content": "满意嘻嘻", 134 "label": [ 135 { 136 "labelName": "音质好" 137 }, 138 { 139 "labelName": "拍照效果好" 140 }, 141 { 142 "labelName": "功能齐全" 143 }, 144 { 145 "labelName": "外观漂亮" 146 } 147 ] 148 }, 149 { 150 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 151 "creationTime": "2019-03-21 00:13:17", 152 "content": "棒棒哒", 153 "label": [] 154 }, 155 { 156 "referenceName": "【双12爆款】Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 157 "creationTime": "2019-01-19 10:23:57", 158 "content": "双十二买的正好赶上手机坏了就买了xr太贵没舍得买一直用苹果的系统用顺手了不愿意换", 159 "label": [ 160 { 161 "labelName": "反应快" 162 }, 163 { 164 "labelName": "做工精致" 165 }, 166 { 167 "labelName": "信号稳定" 168 }, 169 { 170 "labelName": "待机时间长" 171 }, 172 { 173 "labelName": "性价比一般般" 174 } 175 ] 176 }, 177 { 178 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 179 "creationTime": "2019-03-21 12:47:34", 180 "content": "用了几天感觉还可以,只是信号不是很好,总体上是可以的", 181 "label": [ 182 { 183 "labelName": "音质好" 184 }, 185 { 186 "labelName": "分辨率高" 187 } 188 ] 189 }, 190 { 191 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 192 "creationTime": "2019-03-09 08:32:15", 193 "content": "苹果手机做工精细,手感不错,外观设计也是很有时尚感!系统运行十分流畅,操作体验不错;功能齐全,屏幕分辨率高,总体来说很满意!", 194 "label": [ 195 { 196 "labelName": "做工精致" 197 }, 198 { 199 "labelName": "系统流畅" 200 }, 201 { 202 "labelName": "功能齐全" 203 } 204 ] 205 }, 206 { 207 "referenceName": "【低至5399】Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 208 "creationTime": "2019-01-15 22:44:53", 209 "content": "真心喜欢,一直在徘徊x.还是xr.个人觉得真不错,一直信赖苏宁,新机为激活,价钱能接受,值得推荐,黑边什么的不影响。", 210 "label": [ 211 { 212 "labelName": "拍照效果好" 213 }, 214 { 215 "labelName": "外观漂亮" 216 }, 217 { 218 "labelName": "屏幕清晰" 219 } 220 ] 221 }, 222 { 223 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 224 "creationTime": "2019-03-08 22:12:18", 225 "content": "手机运行流畅,外观也漂亮,黑色彰显档次,音质也好,又是双卡双待手机,性价比不错,比起XS便宜不少。", 226 "label": [ 227 { 228 "labelName": "外观漂亮" 229 }, 230 { 231 "labelName": "系统流畅" 232 } 233 ] 234 }, 235 { 236 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 237 "creationTime": "2019-03-01 15:54:33", 238 "content": "手机很好,电池非常抗用,价钱也非常美丽,值得购买。", 239 "label": [ 240 { 241 "labelName": "系统流畅" 242 }, 243 { 244 "labelName": "电池耐用" 245 }, 246 { 247 "labelName": "分辨率高" 248 }, 249 { 250 "labelName": "待机时间长" 251 }, 252 { 253 "labelName": "包装一般" 254 } 255 ] 256 }, 257 { 258 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G手机 双卡双待", 259 "creationTime": "2019-02-19 09:37:25", 260 "content": "春节期间配送超快,手机没有任何问题,苏宁易购确实做到了全网最低价", 261 "label": [] 262 }, 263 { 264 "referenceName": "Apple iPhone XR 64GB 黑色 移动联通电信4G全网通手机 双卡双待", 265 "creationTime": "2019-02-23 13:27:15", 266 "content": "挺好的懒得拍照了借了几张黑边确实大", 267 "label": [ 268 { 269 "labelName": "待机时间长" 270 }, 271 { 272 "labelName": "反应快" 273 }, 274 { 275 "labelName": "做工精致" 276 } 277 ] 278 } 279 ]
数据是json数组,因此采用fastjson进行解析,为了最终存数据的更方便,需要构造Comment类
依赖
1 <dependency>
2 <groupId>com.alibaba</groupId>
3 <artifactId>fastjson</artifactId>
4 <version>1.2.47</version>
5 </dependency>
6
7 <dependency>
8 <groupId>junit</groupId>
9 <artifactId>junit</artifactId>
10 <version>4.9</version>
11 </dependency>
12
13
14 <!-- hbase -->
15 <!-- <dependency>
16 <groupId>org.apache.hbase</groupId>
17 <artifactId>hbase-client</artifactId>
18 <version>2.0.2</version>
19 </dependency>
20 <dependency>
21 <groupId>org.apache.hbase</groupId>
22 <artifactId>hbase-server</artifactId>
23 <version>2.0.2</version>
24 </dependency>
25 <dependency>
26 <groupId>org.apache.hbase</groupId>
27 <artifactId>hbase-common</artifactId>
28 <version>2.0.2</version>
29 </dependency> -->
30
31
32 <!-- phoenix -->
33 <dependency>
34 <groupId>org.apache.phoenix</groupId>
35 <artifactId>phoenix-core</artifactId>
36 <version>5.0.0-HBase-2.0</version>
37 </dependency>
38
39
40 <!-- phoenix_spark -->
41 <dependency>
42 <groupId>org.apache.phoenix</groupId>
43 <artifactId>phoenix-spark</artifactId>
44 <version>5.0.0-HBase-2.0</version>
45 </dependency>
46
47
48 <!-- spark -->
49 <dependency>
50 <groupId>org.apache.spark</groupId>
51 <artifactId>spark-core_2.11</artifactId>
52 <version>2.0.2</version>
53 </dependency>
54
55
56 <dependency>
57 <groupId>org.apache.spark</groupId>
58 <artifactId>spark-sql_2.11</artifactId>
59 <version>2.0.2</version>
60 </dependency>
Comment.java
1 package cn.tele.bean;
2
3 /**
4 *
5 * @author Tele
6 *
7 */
8 public class Comment {
9 private Integer id;
10 private String name;
11 private String content;
12 private String creationtime;
13 private String label;
14 private String platform;
15
16 public Integer getId() {
17 return id;
18 }
19
20 public void setId(Integer id) {
21 this.id = id;
22 }
23
24 public String getName() {
25 return name;
26 }
27
28 public void setName(String name) {
29 this.name = name;
30 }
31
32 public String getContent() {
33 return content;
34 }
35
36 public void setContent(String content) {
37 this.content = content;
38 }
39
40 public String getCreationtime() {
41 return creationtime;
42 }
43
44 public void setCreationtime(String creationtime) {
45 this.creationtime = creationtime;
46 }
47
48 public String getLabel() {
49 return label;
50 }
51
52 public void setLabel(String label) {
53 this.label = label;
54 }
55
56 public String getPlatform() {
57 return platform;
58 }
59
60 public void setPlatform(String platform) {
61 this.platform = platform;
62 }
63 }
StoreData.java
1 package cn.tele.spark;
2
3 import java.io.IOException;
4 import java.nio.file.FileVisitResult;
5 import java.nio.file.Files;
6 import java.nio.file.Path;
7 import java.nio.file.Paths;
8 import java.nio.file.SimpleFileVisitor;
9 import java.nio.file.attribute.BasicFileAttributes;
10 import java.sql.SQLException;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Map.Entry;
17 import java.util.Set;
18 import java.util.UUID;
19 import org.apache.spark.SparkConf;
20 import org.apache.spark.api.java.JavaRDD;
21 import org.apache.spark.api.java.JavaSparkContext;
22 import org.apache.spark.api.java.function.FlatMapFunction;
23 import org.apache.spark.api.java.function.Function;
24 import org.apache.spark.api.java.function.Function2;
25 import org.apache.spark.sql.Dataset;
26 import org.apache.spark.sql.Row;
27 import org.apache.spark.sql.SaveMode;
28 import org.apache.spark.sql.SparkSession;
29 import org.apache.spark.storage.StorageLevel;
30 import com.alibaba.fastjson.JSON;
31 import com.alibaba.fastjson.JSONArray;
32 import com.alibaba.fastjson.JSONObject;
33 import cn.tele.bean.Comment;
34 import scala.Tuple2;
35
36 /**
37 * 存储爬取的评论到hbase中
38 *
39 * @author Tele
40 *
41 */
42 public class StoreData {
43 private static SparkConf conf = new SparkConf().setAppName("storedata").setMaster("local").set("spark.serializer",
44 "org.apache.spark.serializer.KryoSerializer");
45 private static JavaSparkContext jsc = new JavaSparkContext(conf);
46 private static SparkSession session = new SparkSession(jsc.sc());
47 static {
48 // 注册
49 conf.registerKryoClasses(new Class[] { Comment.class });
50 }
51 // 链接信息
52 private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop002,hadoop003,hadoop004";
53 /*
54 * private static final String DB_PHOENIX_DRIVER =
55 * "org.apache.phoenix.jdbc.PhoenixDriver"; private static final String
56 * DB_PHOENIX_USER = ""; private static final String DB_PHOENIX_PASS = "";
57 * private static final String DB_PHOENIX_FETCHSIZE = "10000";
58 */
59
60 public static void main(String[] args) throws SQLException {
61
62 // 遍历文件夹
63 Path path = Paths.get("F:\\\\comment\\\\");
64
65 try {
66 MyFileVisitor myFileVisitor = new MyFileVisitor();
67 Files.walkFileTree(path, myFileVisitor);
68 List<Map<String, Object>> list = myFileVisitor.getData();
69 JavaRDD<Comment> commentRDD = getCommentRDD(list);
70 // 存储至hbase
71 storeData(commentRDD);
72
73 } catch (IOException e) {
74 e.printStackTrace();
75 }
76
77 // 读取数据
78 /*
79 * JavaRDD<String> rdd =
80 * jsc.textFile("file:\\\\F:\\\\comment\\\\sn_comment\\\\iphonexr-2019-04-16-18-27-36\\\\"
81 * ); List<Comment> commentList = getCommentList(rdd,"iphonexr");
82 */
83
84 jsc.close();
85
86 }
87
88 private static int storeData(JavaRDD<Comment> rdd) {
89 int successCount = 0;
90 /*
91 * DataTypes.createStructType(Arrays.asList(
92 * DataTypes.createStructField("id",DataTypes.IntegerType,false),
93 * DataTypes.createStructField("name",DataTypes.StringType,false),
94 * DataTypes.createStructField("content",DataTypes.StringType,false),
95 * DataTypes.createStructField("creationtime",DataTypes.StringType,false),
96 * DataTypes.createStructField("label",DataTypes.StringType,true),
97 * DataTypes.createStructField("platform",DataTypes.StringType,false) ));
98 */
99
100 Dataset<Row> ds = session.createDataFrame(rdd, Comment.class);
101 ds.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark").option("zkUrl", DB_PHOENIX_URL)
102 .option("table", "comment").save();
103 ;
104
105 return successCount;
106 }
107
108 @SuppressWarnings("unchecked")
109 private static <U> JavaRDD<Comment> getCommentRDD(List<Map<String, Object>> list) {
110 JavaRDD<Map<String, Object>> originalRDD = jsc.parallelize(list);
111 JavaRDD<List<Comment>> listCommentRDD = originalRDD.map(new Function<Map<String, Object>, List<Comment>>() {
112
113 private static final long serialVersionUID = 1L;
114
115 List<Comment> dataList = new ArrayList<>();
116
117 public List<Comment> call(Map<String, Object> v1) throws Exception {
118 Set<Entry<String, Object>> entrySet = v1.entrySet();
119 Iterator<Entry<String, Object>> iterator = entrySet.iterator();
120 while (iterator.hasNext()) {
121 Entry<String, Object> entry = iterator.next();
122 String referenceName = entry.getKey();
123 String platform = referenceName.split("#")[0];
124 List<Comment> commentList = (List<Comment>) entry.getValue();
125 commentList.forEach(cm -> {
126 cm.setPlatform(platform);
127 dataList.add(cm);
128 });
129 println(referenceName + "评论量------------------:" + commentList.size());
130 }
131 return dataList;
132 }
133 }).persist(StorageLevel.MEMORY_ONLY());
134
135 JavaRDD<Comment> commentRDD = listCommentRDD.flatMap(new FlatMapFunction<List<Comment>, Comment>() {
136
137 private static final long serialVersionUID = 1L;
138
139 @Override
140 public Iterator<Comment> call(List<Comment> t) throws Exception {
141 return t.iterator();
142 }
143 });
144
145 long totalSize = commentRDD.count();
146 println("评论总量:-----------" + totalSize);
147
148 // 设置id
149 JavaRDD<Comment> resultRDD = commentRDD.zipWithIndex().map(new Function<Tuple2<Comment, Long>, Comment>() {
150
151 private static final long serialVersionUID = 1L;
152
153 @Override
154 public Comment call(Tuple2<Comment, Long> v1) throws Exception {
155 v1._1.setId(Integer.valueOf(v1._2.toString()));
156 return v1._1;
157 }
158 });
159 return resultRDD;
160 }
161
162 private static List<Comment> getCommentList(JavaRDD<String> rdd, String referenceName) {
163 List<Comment> commentList = new ArrayList<Comment>();
164
165 String originalStr = rdd.reduce(new Function2<String, String, String>() {
166 private static final long serialVersionUID = 1L;
167
168 public String call(String v1, String v2) throws Exception {
169 return v1.trim() + v2.trim();
170 }
171 });
172 String uuid = UUID.randomUUID().toString();
173 originalStr = originalStr.replace("][", "]" + uuid + "[");
174
175 // 解析json
176 String[] pages = originalStr.split(uuid);
177 for (String page : pages) {
178 try {
179 JSONArray jsonArray = JSON.parseArray(page);
180 for (Object obj : jsonArray) {
181 JSONObject jsonObject = (JSONObject) obj;
182 // String referenceName = jsonObject.getString("referenceName");
183 String creationTime = jsonObject.getString("creationTime");
184 String content = jsonObject.getString("content");
185 println("referenceName:" + referenceName);
186 println("creationTime:" + creationTime);
187 println("content:" + content);
188
189 // 封装
190 Comment comment = new Comment();
191 comment.setName(referenceName);
192 comment.setCreationtime(creationTime);
193 comment.setContent(content);
194
195 JSONArray labelArray = jsonObject.getJSONArray("label");
196 if (labelArray != null) {
197 String label = "";
198 for (Object labelObj : labelArray) {
199 JSONObject labelObject = (JSONObject) labelObj;
200 label += labelObject.getString("labelName") + "#";
201 }
202 comment.setLabel(label);
203 println("label:" + label);
204 }
205 commentList.add(comment);
206 }
207 } catch (Exception e) {
208 continue;
209 }
210 }
211 return commentList;
212 }
213
214 private static class MyFileVisitor extends SimpleFileVisitor<Path> {
215 private List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
216 String platform = null;
217
218 @Override
219 public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
220 println("当前访问的文件夹是------" + dir.toAbsolutePath() /* + dir.getFileName() */);
221 String fileName = dir.getFileName().toString();
222 if (fileName.contains("_")) {
223 platform = fileName.split("_")[0];
224 }
225 if (platform != null) {
226 if (fileName.contains("-")) {
227 String referenceName = fileName.split("-")[0];
228 JavaRDD<String> rdd = jsc.textFile(dir.toAbsolutePath().toString());
229 List<Comment> commentList = getCommentList(rdd, referenceName);
230 Map<String, Object> map = new HashMap<String, Object>();
231 // 平台_品牌--评论
232 map.put(platform + "#" + referenceName, commentList);
233 list.add(map);
234 }
235 }
236
237 return super.preVisitDirectory(dir, attrs);
238 }
239
240 @Override
241 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
242 return super.visitFile(file, attrs);
243 }
244
245 @Override
246 public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
247 return super.visitFileFailed(file, exc);
248 }
249
250 @Override
251 public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
252 return super.postVisitDirectory(dir, exc);
253 }
254
255 public List<Map<String, Object>> getData() {
256 return list;
257 }
258
259 }
260
261 private static void println(Object obj) {
262 System.out.println(obj);
263 }
264
265 }
可能会报如下异常,但并不影响,似乎和我使用的hbase版本有关
在终端进行查看
StorgeData还可以再进行优化,比如解析json的时候可以直接构造rdd而不是用list,也可以改造成集群上运行的版本,但由于我的数据量不多,直接用本地模式就足够了,此外SaveMode只能是SaveMode.Overwrite
其他模式phoenix都不支持,实际测试时发现还是append,生成id时用了zipWithIndex,但连续的id容易造成集群热点问题,使用phoenix建表时最好加盐
与phoenix交互时也可以用spark的jdbc,可以参考https://blog.csdn.net/xiongbingcool/article/details/81458602
附测试用例:
1 /**
2 *
3 *@author Tele
4 *测试spark与phoenix集成
5 */
6 public class SparkPhoneix {
7 private static SparkConf conf = new SparkConf().setAppName("demo").setMaster("local");
8 private static JavaSparkContext jsc = new JavaSparkContext(conf);
9 private static SparkSession session = new SparkSession(jsc.sc());
10
11 //链接信息
12 private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop002,hadoop003,hadoop004";
13 /*private static final String DB_PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
14 private static final String DB_PHOENIX_USER = "";
15 private static final String DB_PHOENIX_PASS = "";
16 private static final String DB_PHOENIX_FETCHSIZE = "10000";*/
17
18 public static void main(String[] args) {
19 Dataset<Row> ds = session.read().format("org.apache.phoenix.spark")
20 .option("zkUrl", DB_PHOENIX_URL)
21 .option("table", "TEST")
22 .load();
23 ds.createOrReplaceTempView("test");
24
25 ds.show();
26
27
28 //插入数据测试_SALT,ID,INFO.ITEM,INFO.CONTENT,INFO.LABEL
29 StructType schema = DataTypes.createStructType(Arrays.asList(
30 DataTypes.createStructField("id",DataTypes.IntegerType,false),
31 DataTypes.createStructField("item",DataTypes.StringType,false),
32 DataTypes.createStructField("content",DataTypes.StringType,false),
33 DataTypes.createStructField("label",DataTypes.StringType,true)
34 ));
35
36
37 // 创建数据
38 List<Row> list = new ArrayList<Row>();
39 Row row1 = RowFactory.create(3,"iphone","不错",null);
40 list.add(row1);
41
42 Dataset<Row> dataset = session.createDataFrame(list,schema);
43 //对于phoenix只能使用overwrite模式,但实际操作时发现是append数据
44 dataset.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark")
45 .option("zkUrl", DB_PHOENIX_URL)
46 .option("table", "TEST").save();;
47 ds.show();
48 session.stop();
49 jsc.close();
50 }
51 }
以上是关于毕设三 spark与phoenix集成插入数据/解析json数组的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark Phoenix 集成在 oozie 工作流程中失败
Spark 实战系列Phoenix 整合 spark 进行查询分析