大数据开发Elasticsearch推荐系统

Posted 数据信息化

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据开发Elasticsearch推荐系统相关的知识,希望对你有一定的参考价值。



Elasticsearch

之基于MapReduce的数据迁移至HBase(三)

Data migration to HBase based on MapReduce

简介

本文将介绍如何利用Mapreduce把mysql中的数据导入到Hbase中

01

项目数据准备

(1)将备份的数据导入到Mysql

mysql -uroot -p12345678 </opt/shell/novel.detail db_novel

(2)脚本执行前修改mysql配置信息

mysql > show variables like '%innodb_buffer_pool_size%

(3)添加内容

sudo vi /etc/my.cnf [mysqld]innodb_buffer_pool_size=67108864

02

数据迁移

(1)创建HBase表

create 'novel_detail' , 't'

(2)Rowkey的设计

String rowkey = JavaUtils.md5(novelName_chapterName);

(3)应注意的问题

为什么要使用MD5为了避免数据热点问题,防止大量数据在一个region设置一个region的大小:hbase.hregion.max.filesize

03

数据迁移代码

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;import java.util.Date;
public class Mysql2HbaseMapReduce {
public static class Mysql2HbaseMapper extends Mapper<LongWritable ,NovelDetail,NovelDetail, NullWritable>{
protected void map(LongWritable key, NovelDetail value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); } }
public static class Mysql2HbaseReduce extends TableReducer<NovelDetail, NullWritable, ImmutableBytesWritable> {
protected void reduce(NovelDetail key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { int detailId = key.getId(); String authorName = key.getAuthorName(); String novelName = key.getNovelName(); String chapterName = key.getChapterName(); String chapterUrl = key.getChapterUrl();// int novelId = key.getNovelId();
//rowkey的设计非常重要 //这是rowkey,是小说名字加上章节名字组成的rowkey String rowkey = JavaUtils.md5(novelName + chapterName); //java操作hbase先将rowkey写入 final Put put = new Put(Bytes.toBytes(rowkey));
//接下来就是迭代赋值 for (NullWritable v:values) { put.addColumn(Bytes.toBytes("t"),Bytes.toBytes("id"),Bytes.toBytes(detailId)); put.addColumn(Bytes.toBytes("t"),Bytes.toBytes("authorName"),Bytes.toBytes(authorName)); put.addColumn(Bytes.toBytes("t"),Bytes.toBytes("novelName"),Bytes.toBytes(novelName)); put.addColumn(Bytes.toBytes("t"),Bytes.toBytes("chapterName"),Bytes.toBytes(chapterName)); put.addColumn(Bytes.toBytes("t"),Bytes.toBytes("chapterUrl"),Bytes.toBytes(chapterUrl));// put.addColumn(Bytes.toBytes("t"),Bytes.toBytes("novelId"),Bytes.toBytes(novelId)); } //key value context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)),put);
} }
public static class Mysql2HbaseDriver extends Configured implements Tool{
public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum","bigdata-pro-m01.kfk.com"); configuration.set("hbase.zookeeper.property.clientPort","2181"); ToolRunner.run(configuration,new Mysql2HbaseDriver(),args); }
public int run(String[] strings) {
Configuration conf = this.getConf();
try { DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://bigdata-pro-m01.kfk.com:3306/db_novel?useSSL=false", "root", "12345678");
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(Mysql2HbaseDriver.class); job.setMapperClass(Mysql2HbaseMapper.class); job.setMapOutputKeyClass(NovelDetail.class); job.setMapOutputValueClass(NullWritable.class);
TableMapReduceUtil.initTableReducerJob("novel_detail", Mysql2HbaseReduce.class, job);
//设置输入格式是从database中读取 job.setInputFormatClass(DBInputFormat.class);
DBInputFormat.setInput(job, NovelDetail.class, " novel_detail", null, "id", "id", "author_name", "novel_name", "chapter_name", "chapter_url"); job.waitForCompletion(true); }catch (Throwable e){ e.printStackTrace(); }
return 0; } }}


import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;
public class NovelDetail implements WritableComparable<NovelDetail>, DBWritable {
//要和表中对应
private int id;
private String authorName;
private String novelName;
private String chapterName;
private String chapterUrl;
private int novelId;
public NovelDetail() { }
public NovelDetail(int id, String authorName, String novelName, String chapterName, String chapterUrl, int novelId) { this.id = id; this.authorName = authorName; this.novelName = novelName; this.chapterName = chapterName; this.chapterUrl = chapterUrl;// this.novelId = novelId; }

public int compareTo(NovelDetail o) { return this.id - o.id; }

public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(id);//写出用write ,id用int dataOutput.writeUTF(authorName); dataOutput.writeUTF(novelName); dataOutput.writeUTF(chapterName); dataOutput.writeUTF(chapterUrl);// dataOutput.writeInt(novelId); }

public void readFields(DataInput dataInput) throws IOException { this.id = dataInput.readInt(); //id是int类型,就用readint this.authorName = dataInput.readUTF();//name是string类型,就用readutf this.novelName = dataInput.readUTF(); this.chapterName = dataInput.readUTF(); this.chapterUrl = dataInput.readUTF();// this.novelId = dataInput.readInt(); }

public void write(PreparedStatement preparedStatement) throws SQLException { //类似于jdbc,使用的PreparedStatement进行赋值 int index = 1; preparedStatement.setInt(index++,id); preparedStatement.setString(index++,authorName); preparedStatement.setString(index++,novelName); preparedStatement.setString(index++,chapterName); preparedStatement.setString(index++,chapterUrl);// preparedStatement.setInt(index,novelId); }

public void readFields(ResultSet resultSet) throws SQLException { //类似于jbdc的查询 int index = 1; id = resultSet.getInt(index++); authorName = resultSet.getString(index++); novelName = resultSet.getString(index++); chapterName = resultSet.getString(index++); chapterUrl = resultSet.getString(index++);// novelId = resultSet.getInt(index); }
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getAuthorName() { return authorName; }
public void setAuthorName(String authorName) { this.authorName = authorName; }
public String getNovelName() { return novelName; }
public void setNovelName(String novelName) { this.novelName = novelName; }
public String getChapterName() { return chapterName; }
public void setChapterName(String chapterName) { this.chapterName = chapterName; }
public String getChapterUrl() { return chapterUrl; }
public void setChapterUrl(String chapterUrl) { this.chapterUrl = chapterUrl; }
public int getNovelId() { return novelId; }
public void setNovelId(int novelId) { this.novelId = novelId; }}


import org.apache.commons.codec.digest.DigestUtils;
public class JavaUtils {
    //MD5方法 public static String md5(String key){ //加密后的字符串 String encodeStr = DigestUtils.md5Hex(key); //除了这种md5加密的方式,你也可以自己指定rowkey的其他设计类型 return encodeStr; }}


以上是关于大数据开发Elasticsearch推荐系统的主要内容,如果未能解决你的问题,请参考以下文章

Elasticsearch全文检索技术 一篇文章即可从入门到精通(Elasticsearch安装,安装kibana,安装ik分词器,数据的增删改查,全文检索查询,聚合aggregations)(代码片

大数据开发推荐系统之架构原理

干货基于 Mahout 和 Elasticsearch 推荐引擎组件解析

深入了解推荐引擎组件(基于Apache Mahout和Elasticsearch)

[es和mysql数据库同步]推荐一个同步Mysql数据到Elasticsearch的工具

❤️大数据开发必备:推荐7款大数据开发神器工作效率提升1000%推荐收藏