怎样将数据库数据写入到hdfs中

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了怎样将数据库数据写入到hdfs中相关的知识,希望对你有一定的参考价值。

如下面这个shell脚本:
#Oracle的连接字符串,其中包含了Oracle的地址,SID,和端口号
CONNECTURL=jdbc:oracle:thin:@20.135.60.21:1521:DWRAC2
#使用的用户名
ORACLENAME=kkaa
#使用的密码
ORACLEPASSWORD=kkaa123
#需要从Oracle中导入的表名
oralceTableName=tt
#需要从Oracle中导入的表中的字段名
columns=AREA_ID,TEAM_NAME
#将Oracle中的数据导入到HDFS后的存放路径
hdfsPath=apps/as/hive/$oralceTableName
#执行导入逻辑。将Oracle中的数据导入到HDFS中
sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath --num-mappers 1 --table $oralceTableName --columns $columns --fields-terminated-by '\001'
执行这个脚本之后,导入程序就完成了。
参考技术A 使用sqoop工具,或者使用DBInputFormat自己读取数据库数据然后写入到hdfs中本回答被提问者和网友采纳

hbase使用MapReduce操作4(实现将 HDFS 中的数据写入到 HBase 表中)

实现将 HDFS 中的数据写入到 HBase 表中

Runner类

 1 package com.yjsj.hbase_mr2;
 2 
 3 import com.yjsj.hbase_mr2.ReadFruitFromHDFSMapper;
 4 import com.yjsj.hbase_mr2.WriteFruitMRFromTxtReducer;
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.conf.Configured;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.hbase.HBaseConfiguration;
 9 import org.apache.hadoop.hbase.client.Put;
10 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.util.Tool;
15 import org.apache.hadoop.util.ToolRunner;
16 
17 import java.io.IOException;
18 
19 class Txt2FruitRunner extends Configured implements Tool {
20     public int run(String[] args) throws Exception {
21 //得到 Configuration
22         Configuration conf = this.getConf();
23 //创建 Job 任务
24         Job job = Job.getInstance(conf, this.getClass().getSimpleName());
25         job.setJarByClass(Txt2FruitRunner.class);
26         Path inPath = new Path("hdfs://master:9000/input_fruit/fruit.tsv");
27 
28         FileInputFormat.addInputPath(job, inPath);
29         //设置 Mapper
30         job.setMapperClass(ReadFruitFromHDFSMapper.class);
31         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
32         job.setMapOutputValueClass(Put.class);
33         //设置 Reducer
34         TableMapReduceUtil.initTableReducerJob("fruit_hdfs", WriteFruitMRFromTxtReducer.class, job);
35         //设置 Reduce 数量,最少 1 个
36         job.setNumReduceTasks(1);
37         boolean isSuccess = job.waitForCompletion(true);
38         if (!isSuccess) {
39             throw new IOException("Job running with error");
40         }
41         return isSuccess ? 0 : 1;
42     }
43 
44     public static void main(String[] args) throws Exception {
45         Configuration conf = HBaseConfiguration.create();
46         conf = HBaseConfiguration.create();
47         conf.set("hbase.zookeeper.quorum", "master,node1,node2");
48         conf.set("hbase.zookeeper.property.clientPort", "2181");
49         conf.set("hbase.master", "master:60000");
50         int status = ToolRunner.run(conf, new Txt2FruitRunner(), args);
51         System.exit(status);
52     }
53 }

Mapper类

 

 1 package com.yjsj.hbase_mr2;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.hbase.client.Put;
 6 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 7 import org.apache.hadoop.hbase.util.Bytes;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Mapper;
11 
12 public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
13     @Override
14     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
15         //从 HDFS 中读取的数据
16         String lineValue = value.toString();
17         //读取出来的每行数据使用	 进行分割,存于 String 数组
18         String[] values = lineValue.split("	");
19         //根据数据中值的含义取值
20         String rowKey = values[0];
21         String name = values[1];
22         String color = values[2];
23         //初始化 rowKey
24         ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
25         //初始化 put 对象
26         Put put = new Put(Bytes.toBytes(rowKey));
27         //参数分别:列族、列、值
28         put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
29         put.add(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));
30         context.write(rowKeyWritable, put);
31     }
32 }

 

 

 

Reduce类

 

package com.yjsj.hbase_mr2;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        //读出来的每一行数据写入到 fruit_hdfs 表中
        for (Put put : values) {
            context.write(NullWritable.get(), put);
        }
    }
}

 

以上是关于怎样将数据库数据写入到hdfs中的主要内容,如果未能解决你的问题,请参考以下文章

将 Spark Streaming 输出写入 HDFS 时跳过数据

hbase使用MapReduce操作4(实现将 HDFS 中的数据写入到 HBase 表中)

使用BulkLoad从HDFS批量导入数据到HBase

csv数据导入Hadoop中的HDFS

使用 Kafka HDFS Connect 写入 HDFS 时出错

Mysql 流增量写入 Hdfs --从 mysql 到 kafka