大厂技术内幕字节跳动原来是这么做数据迁移的!
Posted JavaEdge.
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大厂技术内幕字节跳动原来是这么做数据迁移的!相关的知识,希望对你有一定的参考价值。
数据迁移
目标
- 能够描述项目数据迁移的方案
- 了解hbase的特点
- 能够熟悉数据迁移中的数据包装和转换
- 能够完成文章数据的全量和增量迁移
- 能够完成热点文章数据的迁移
1 为什么需要自动同步
因为mysql保存着我们爬取的以及自建的数据,对于爬取的数据,数据量比较大,使用mysql 存储会影响mysql的性能,并且我们需要对数据进行流式计算,对数据进行各种统计,mysq满足不了我们的需求,我们就将mysql中的全量数据同步到HBASE中,由HBASE保存海量数据,mysql中的全量数据会定期进行删除。
HBASE中保存着海量数据,我们需要计算出热点数据,并将数据同步到mysql以及MONGODB中,mysql中保存主体关系数据,MONGODB保存着具体数据信息。
因为热点数据也会失效,今天是热数据,明天就不是了,也需要定期对热点数据进行删除,我们定时删除一个月之前的热点数据,保持本月的热数据。
2 迁移方案
2.1 需求分析
2.1.1 功能需求
有了大量数据集基础后,实时计算后的热点数据需要保存起来,因为mysql保存大量文章数据会影响mysql的性能,所以采用mysql+mongoDB的方式进行存储。
2.1.1 全量数据迁移方案
通过定时任务将mysql中爬取或者自建的文章同步到HBASE中,并将同步过的数据状态改为已同步,下次同步的时候就不会再次同步这些数据了。
2.1.2 热数据迁移方案
HBASE中有全量数据,大数据端计算出热点数据,需要将这些热点数据同步到MYSQL和MONGDB中,用于页面显示
2.2 设计思路
-
将mysql数据库中的全量数据定时读取出来,将多个对象打包成一个对象,保存到HBASE中,保存成功后更新数据库中的状态改为已同步,下一次就不会同步该条数据了。
-
使用KAFKA监听热点数据计算结果,接收到热点数据信息后,从HBASE得到打包的数据,并将数据进行拆分,将关系数据保存到mysql中,将具体数据保存到mongodb中。
-
因为热点数据会失效,定期清除mysql和mongodb中的过期数据
2.3 数据同步注意的问题
-
HBASE数据主要靠rowKey进行查询的,rowKey设计就用mysql中的主键ID作为rowKey,查询的时候直接根据Rowkey获取数据
-
因为需要同步到HBASE的数据是多个数据表的数据,一条数据由多个对象组成,存储的时候使用列族区分不同的对象,里面存储不同的字段。
3 项目中集成hbase与Mongodb
在leadnews-common 集成:
- hbase.properties
- mongo.properties
- pom.xml
<!--mongoDB-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<!--HBase-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
host文件配置:
在服务器host文件中配置域名,根据自己的服务器地址更改
172.16.1.52 javaedge
4 常用组件介绍
4.1 Hbase相关操作
Hbase 操作工具类用于将数据存储到Hbase中,其中有些方法用于存储或删除。
4.1.1 项目导入
导入资料文件夹中的项目leadnews-migration
4.1.2 公共存储类
StorageData
公共存储数据表,由多个StorageEntity组成
StorageData 是最重要的一个存储对象,他是保存一个bean信息的类,负责存储bean信息以及转换和反向转换bean 。
该类用到一个重要的工具类ReflectUtils 反射工具类和DataConvertUtils数据类型转换工具类主要用于日期类型的转换
主要方法
- 添加StorageEntry方法
public void addStorageEntry(StorageEntry entry)
该方法有几个重载方法,用于向StorageEntry列表中添加StorageEntry对象的
- 获取该对象对应的Object对象
public Object getObjectValue()
该方法用于将存储的实体数据转换为Bean的实体,用了ReflectUtils反射工具类进行操作
- 将Bean 转换为StorageData的存储结构
public static StorageData getStorageData(Object bean)
该方法用于将不同的bean转换为同一种存储结构进行存储
StorageEntity
公共代码存储的实体
StorageEntry
公共存储对象的一个key-value的字段
4.1.3 Hbase操作相关工具类
(1)HBaseConstants 类
配置类Hbase存储的的表名称
public class HBaseConstants {
public static final String APARTICLE_QUANTITY_TABLE_NAME = "APARTICLE_QUANTITY_TABLE_NAME";
}
(2)HBaseInvok
hbase的的回调操作类
/**
* Hbase 的回调类
* 用于我们操作的时候就行回调
*/
public interface HBaseInvok {
/**
* 回调方法
*/
public void invok();
}
(3)HBaseStorage
hbase 的存储对象 继承自StorageEntity
(4)HBaseClent
hbase client操作的工具类
(5)HBaseConfig
用于将HbaseClient对象的相关配置
(6)HBaseStorageClient
Hbase 存储客户端工具类 是对HbaseClient工具类的封装
这个类是自己封装的存储客户端
该类位于heima-leadnews-common 包下的com.heima.hbase.HBaseStorageClient
其中用到了HBaseClent 客户端工具,他是一个操作工具类,不需要我们具体的写拿过来用就可以
4.1.4 测试代码
@SpringBootTest
@RunWith(SpringRunner.class)
public class HbaseTest {
@Autowired
private HBaseClent hBaseClent;
@Test
public void testCreateTable(){
List<String> columnFamily = new ArrayList<>();
columnFamily.add("test_cloumn_family1");
columnFamily.add("test_cloumn_family2");
boolean ret = hBaseClent.creatTable("hbase_test_table_name", columnFamily);
}
@Test
public void testDelTable(){
hBaseClent.deleteTable("hbase_test_table_name");
}
@Test
public void testSaveData(){
String []columns ={"name","age"};
String [] values = {"zhangsan","28"};
hBaseClent.putData("hbase_test_table_name","test_row_key_001","test_cloumn_family1",columns,values);
}
@Test
public void testFindByRowKey(){
Result hbaseResult = hBaseClent.getHbaseResult("hbase_test_table_name", "test_row_key_001");
System.out.println(hbaseResult);
}
}
4.2 MongoDB操作工具类
mongoDB是一个文档型数据库,也需要存储多个不同的对象,我们也用到了HBASE中用到的StorageEntity 存储结构,我们下面会讲
我们用到了Spring MongoTemplate 来操作数据库
介绍以下我们的实体
(1)MongoConstant
mongoDB操作的常量定义了操作mongodb的表名称
代码位置:com.heima.common.mongo.constants.MongoConstant
public class MongoConstant {
public static final String APARTICLE_MIGRATION_TABLE = "APARTICLE_MIGRATION_TABLE";
}
(2)MongoStorageEntity
MongoStorageEntity 是我们存储MongoDB数据的存储结构主要是基于StorageEntity 结构来的
mongoDB操作的实体类继承了StorageEntity 制定了 表明以及实体类型
代码位置:com.heima.common.mongo.entity.MongoStorageEntity
/**
* mongoDB 存储实体
* Document 是指表明是什么
*/
@Document(collection = "mongo_storage_data")
@Setter
@Getter
public class MongoStorageEntity extends StorageEntity {
/**
* 主键的Key
*
* @Id 标明该字段是主键
*/
@Id
private String rowKey;
}
(3)MongoDBconfigure
对mongdb操作的配置类
代码位置:com.heima.common.mongo.MongoDBconfigure
@Configuration
@PropertySource("classpath:mongo.properties")
public class MongoDBconfigure {
@Value("${mongo.host}")
private String host;
@Value("${mongo.port}")
private int port;
@Value("${mongo.dbname}")
private String dbName;
@Bean
public MongoTemplate getMongoTemplate() {
return new MongoTemplate(getSimpleMongoDbFactory());
}
public SimpleMongoDbFactory getSimpleMongoDbFactory() {
return new SimpleMongoDbFactory(new MongoClient(host, port), dbName);
}
}
(4)测试代码
@SpringBootTest(classes = MigrationApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class MongoTest {
@Autowired
private MongoTemplate mongotemplate;
@Autowired
private HBaseStorageClient hBaseStorageClient;
@Test
public void test() {
Class<?>[] classes = new Class<?>[]{ApArticle.class, ApArticleContent.class, ApAuthor.class};
//List<Object> entityList = hBaseStorageClient.getHbaseDataEntityList(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, "1", Arrays.asList(classes));
List<String> strList = Arrays.asList(classes).stream().map(x -> x.getName()).collect(Collectors.toList());
List<StorageData> storageDataList = hBaseStorageClient.gethBaseClent().getStorageDataList(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, "1", strList);
MongoStorageEntity mongoStorageEntity = new MongoStorageEntity();
mongoStorageEntity.setDataList(storageDataList);
mongoStorageEntity.setRowKey("1");
MongoStorageEntity tmp = mongotemplate.findById("1", MongoStorageEntity.class);
if (null != tmp) {
mongotemplate.remove(tmp);
}
MongoStorageEntity tq = mongotemplate.insert(mongoStorageEntity);
System.out.println(tq);
}
@Test
public void test1() {
MongoStorageEntity mongoStorageEntity = mongotemplate.findById("1", MongoStorageEntity.class);
if (null != mongoStorageEntity && null != mongoStorageEntity.getDataList()) {
mongoStorageEntity.getDataList().forEach(x -> {
System.out.println(x.getObjectValue());
});
}
}
}
5 业务层代码
5.1 Habse操作实体类
(1)ArticleCallBack
Hbase相关回调操作的工具类
(2)ArticleHBaseInvok
Hbase 对回调对象的封装,以及对回调的invoke执行对象
(3)ArticleQuantity
对整个需要存储的对象的封装。
5.2 文章配置接口
5.2.1 mapper
ApArticleConfigMapper中新增方法
ApArticleConfigMapper.xml
5.2.2 service
对文章配置操作的service
ApArticleConfigServiceImpl是对ApArticleConfig的操作
5.3 文章内容接口
5.3.1 mapper定义
ApArticleContentMapper新增方法
List<ApArticleContent> selectByArticleIds(List<String> articleIds);
5.3.2 service
对文章内容操作的Service
public interface ApArticleContenService {
List<ApArticleContent> queryByArticleIds(List<String> ids);
ApArticleContent getByArticleIds(Integer id);
}
ApArticleContenServiceImpl
对ApArticleConten相关的操作
代码位置:com.heima.migration.service.impl.ApArticleContenServiceImpl
@Service
public class ApArticleContenServiceImpl implements ApArticleContenService {
@Autowired
private ApArticleContentMapper apArticleContentMapper;
@Override
public List<ApArticleContent> queryByArticleIds(List<String> ids) {
return apArticleContentMapper.selectByArticleIds(ids);
}
@Override
public ApArticleContent getByArticleIds(Integer id) {
return apArticleContentMapper.selectByArticleId(id);
}
}
7.4 文章接口
7.4.1 mapper定义
ApArticleMapper新增方法
/**
* 查询
*
* @param apArticle
* @return
*/
List<ApArticle> selectList(ApArticle apArticle);
/**
* 更新
* @param apArticle
*/
void updateSyncStatus(ApArticle apArticle);
ApArticleMapper.xml
<sql id="Base_Column_Where">
<where>
<if test="title!=null and title!=''">
and title = #{title}
</if>
<if test="authorId!=null and authorId!=''">
and author_id = #{authorId}
</if>
<if test="authorName!=null and authorName!=''">
and author_name = #{authorName}
</if>
<if test="channelId!=null and channelId!=''">
and channel_id = #{channelId}
</if>
<if test="channelName!=null and channelName!=''">
and channel_name = #{channelName}
</if>
<if test="layout!=null and layout!=''">
and layout = #{layout}
</if>
<if test="flag!=null and flag!=''">
and flag = #{flag}
</if>
<if test="views!=null and views!=''">
and views = #{views}
</if>
<if test="syncStatus!=null">
and sync_status = #{syncStatus}
</if>
</where>
</sql>
<select id="selectList" resultMap="resultMap">
select
<include refid="Base_Column_List"/>
from ap_article
<include refid="Base_Column_Where"/>
</select>
<update id="updateSyncStatus">
UPDATE ap_article SET sync_status = #{syncStatus} WHERE id=#{id}
</update>
7.4.2 service
对ApArticle操作的Service
接口位置:com.heima.migration.service.ApArticleService
public interface ApArticleService {
public ApArticle getById(Long id);
/**
* 获取未同步的数据
*
* @return
*/
public List<ApArticle> getUnsyncApArticleList();
/**
* 更新同步状态
*
* @param apArticle
*/
void updateSyncStatus(ApArticle apArticle);
}
ApArticleServiceImpl
对ApArticleService相关的操作
代码位置:com.heima.migration.service.impl.ApArticleServiceImpl
@Log4j2
@Service
public class ApArticleServiceImpl implements ApArticleService {
@Autowired
private ApArticleMapper apArticleMapper;
public ApArticle getById(Long id) {
return apArticleMapper.selectById(id);
}
/**
* 获取未同步的数据
*
* @return
*/
public List<ApArticle> getUnsyncApArticleList() {
ApArticle apArticleQuery = new ApArticle();
apArticleQuery.setSyncStatus(false);
return apArticleMapper.selectList(apArticleQuery);
}
/**
* 更新数据同步状态
*
* @param apArticle
*/
public void updateSyncStatus(ApArticle apArticle) {
log.info("开始更新数据同步状态,apArticle:{}", apArticle);
if (null != apArticle) {
apArticle.setSyncStatus(true);
apArticleMapper.updateSyncStatus(apArticle);
}
}
}
7.5 文章作者接口
7.5.1 mapper定义
ApAuthorMapper
List<ApAuthor> selectByIds(List<Integer> ids);
ApAuthorMapper.xml
<select id="selectByIds" resultMap="BaseResultMap">
select * from ap_author
where id in
<foreach item="item" index="index" collection="list" open="(" separator="," close=")">
#{item}
</foreach>
</select>
7.5.2 service
对ApAuthor操作的Service
接口位置:com.heima.migration.service.ApAuthorService
public interface ApAuthorService {
List<ApAuthor> queryByIds(List<Integer> ids);
ApAuthor getById(Long id);
}
ApAuthorServiceImpl
对ApAuthor相关的操作
代码位置:com.heima.migration.service.impl.ApAuthorServiceImpl
@Service
public class ApAuthorServiceImpl implements ApAuthorService {
@Autowired
private ApAuthorMapper apAuthorMapper;
@Override
public List<ApAuthor> queryByIds(List<Integer> ids) {
return apAuthorMapper.selectByIds(ids以上是关于大厂技术内幕字节跳动原来是这么做数据迁移的!的主要内容,如果未能解决你的问题,请参考以下文章
是面试官放水,还是公司太缺人?这都没挂,字节跳动原来这么容易进...
Android大厂面试经验分享(OPPO,字节,华为,阿里)