大数据开发Elasticsearch推荐系统
Posted 数据信息化
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据开发Elasticsearch推荐系统相关的知识,希望对你有一定的参考价值。
Elasticsearch
之基于MapReduce的数据迁移至HBase(三)
Data migration to HBase based on MapReduce
简介
本文将介绍如何利用Mapreduce把mysql中的数据导入到Hbase中
01
项目数据准备
(1)将备份的数据导入到Mysql
mysql -uroot -p12345678 </opt/shell/novel.detail db_novel
(2)脚本执行前修改mysql配置信息
mysql > show variables like '%innodb_buffer_pool_size%
(3)添加内容
sudo vi /etc/my.cnf
[mysqld]
innodb_buffer_pool_size=67108864
02
数据迁移
(1)创建HBase表
create 'novel_detail' , 't'
(2)Rowkey的设计
String rowkey = JavaUtils.md5(novelName_chapterName);
(3)应注意的问题
为什么要使用MD5?
为了避免数据热点问题,防止大量数据在一个region上
设置一个region的大小:hbase.hregion.max.filesize
03
数据迁移代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Date;
public class Mysql2HbaseMapReduce {
public static class Mysql2HbaseMapper extends Mapper<LongWritable ,NovelDetail,NovelDetail, NullWritable>{
protected void map(LongWritable key, NovelDetail value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
public static class Mysql2HbaseReduce extends TableReducer<NovelDetail, NullWritable, ImmutableBytesWritable> {
protected void reduce(NovelDetail key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
int detailId = key.getId();
String authorName = key.getAuthorName();
String novelName = key.getNovelName();
String chapterName = key.getChapterName();
String chapterUrl = key.getChapterUrl();
// int novelId = key.getNovelId();
//rowkey的设计非常重要
//这是rowkey,是小说名字加上章节名字组成的rowkey
String rowkey = JavaUtils.md5(novelName + chapterName);
//java操作hbase先将rowkey写入
final Put put = new Put(Bytes.toBytes(rowkey));
//接下来就是迭代赋值
for (NullWritable v:values) {
put.addColumn(Bytes.toBytes("t"),Bytes.toBytes("id"),Bytes.toBytes(detailId));
put.addColumn(Bytes.toBytes("t"),Bytes.toBytes("authorName"),Bytes.toBytes(authorName));
put.addColumn(Bytes.toBytes("t"),Bytes.toBytes("novelName"),Bytes.toBytes(novelName));
put.addColumn(Bytes.toBytes("t"),Bytes.toBytes("chapterName"),Bytes.toBytes(chapterName));
put.addColumn(Bytes.toBytes("t"),Bytes.toBytes("chapterUrl"),Bytes.toBytes(chapterUrl));
// put.addColumn(Bytes.toBytes("t"),Bytes.toBytes("novelId"),Bytes.toBytes(novelId));
}
//key value
context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)),put);
}
}
public static class Mysql2HbaseDriver extends Configured implements Tool{
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","bigdata-pro-m01.kfk.com");
configuration.set("hbase.zookeeper.property.clientPort","2181");
ToolRunner.run(configuration,new Mysql2HbaseDriver(),args);
}
public int run(String[] strings) {
Configuration conf = this.getConf();
try {
DBConfiguration.configureDB(conf,
"com.mysql.jdbc.Driver",
"jdbc:mysql://bigdata-pro-m01.kfk.com:3306/db_novel?useSSL=false",
"root",
"12345678");
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(Mysql2HbaseDriver.class);
job.setMapperClass(Mysql2HbaseMapper.class);
job.setMapOutputKeyClass(NovelDetail.class);
job.setMapOutputValueClass(NullWritable.class);
TableMapReduceUtil.initTableReducerJob("novel_detail", Mysql2HbaseReduce.class, job);
//设置输入格式是从database中读取
job.setInputFormatClass(DBInputFormat.class);
DBInputFormat.setInput(job, NovelDetail.class, " novel_detail", null, "id", "id", "author_name", "novel_name", "chapter_name", "chapter_url");
job.waitForCompletion(true);
}catch (Throwable e){
e.printStackTrace();
}
return 0;
}
}
}
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class NovelDetail implements WritableComparable<NovelDetail>, DBWritable {
//要和表中对应
private int id;
private String authorName;
private String novelName;
private String chapterName;
private String chapterUrl;
private int novelId;
public NovelDetail() {
}
public NovelDetail(int id, String authorName, String novelName, String chapterName, String chapterUrl, int novelId) {
this.id = id;
this.authorName = authorName;
this.novelName = novelName;
this.chapterName = chapterName;
this.chapterUrl = chapterUrl;
// this.novelId = novelId;
}
public int compareTo(NovelDetail o) {
return this.id - o.id;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(id);//写出用write ,id用int
dataOutput.writeUTF(authorName);
dataOutput.writeUTF(novelName);
dataOutput.writeUTF(chapterName);
dataOutput.writeUTF(chapterUrl);
// dataOutput.writeInt(novelId);
}
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readInt(); //id是int类型,就用readint
this.authorName = dataInput.readUTF();//name是string类型,就用readutf
this.novelName = dataInput.readUTF();
this.chapterName = dataInput.readUTF();
this.chapterUrl = dataInput.readUTF();
// this.novelId = dataInput.readInt();
}
public void write(PreparedStatement preparedStatement) throws SQLException {
//类似于jdbc,使用的PreparedStatement进行赋值
int index = 1;
preparedStatement.setInt(index++,id);
preparedStatement.setString(index++,authorName);
preparedStatement.setString(index++,novelName);
preparedStatement.setString(index++,chapterName);
preparedStatement.setString(index++,chapterUrl);
// preparedStatement.setInt(index,novelId);
}
public void readFields(ResultSet resultSet) throws SQLException {
//类似于jbdc的查询
int index = 1;
id = resultSet.getInt(index++);
authorName = resultSet.getString(index++);
novelName = resultSet.getString(index++);
chapterName = resultSet.getString(index++);
chapterUrl = resultSet.getString(index++);
// novelId = resultSet.getInt(index);
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getAuthorName() {
return authorName;
}
public void setAuthorName(String authorName) {
this.authorName = authorName;
}
public String getNovelName() {
return novelName;
}
public void setNovelName(String novelName) {
this.novelName = novelName;
}
public String getChapterName() {
return chapterName;
}
public void setChapterName(String chapterName) {
this.chapterName = chapterName;
}
public String getChapterUrl() {
return chapterUrl;
}
public void setChapterUrl(String chapterUrl) {
this.chapterUrl = chapterUrl;
}
public int getNovelId() {
return novelId;
}
public void setNovelId(int novelId) {
this.novelId = novelId;
}
}
import org.apache.commons.codec.digest.DigestUtils;
public class JavaUtils {
//MD5方法
public static String md5(String key){
//加密后的字符串
String encodeStr = DigestUtils.md5Hex(key);
//除了这种md5加密的方式,你也可以自己指定rowkey的其他设计类型
return encodeStr;
}
}
以上是关于大数据开发Elasticsearch推荐系统的主要内容,如果未能解决你的问题,请参考以下文章
Elasticsearch全文检索技术 一篇文章即可从入门到精通(Elasticsearch安装,安装kibana,安装ik分词器,数据的增删改查,全文检索查询,聚合aggregations)(代码片
干货基于 Mahout 和 Elasticsearch 推荐引擎组件解析
深入了解推荐引擎组件(基于Apache Mahout和Elasticsearch)