完美数据迁移-MongoDB Stream的应用

Posted 美码师

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了完美数据迁移-MongoDB Stream的应用相关的知识,希望对你有一定的参考价值。

一、背景介绍

最近微服务架构火的不行,但本质上也只是风口上的一个热点词汇。 作为笔者的经验来说,想要应用一个新的架构需要带来的变革成本是非常高的。

尽管如此,目前还是有许多企业踏上了服务化改造的道路,这其中则免不了"旧改"的各种繁杂事。 所谓的"旧改",就是把现有的系统架构来一次重构,拆分成多个细粒度的服务后,然后找时间 升级割接一把,让新系统上线。这其中,数据的迁移往往会成为一个非常重要且繁杂的活儿。

拆分服务时数据迁移的挑战在哪?

  1. 首先是难度大,做一个迁移方案需要了解项目的前身今世,评估迁移方案、技术工具等等;


  2. 其次是成本高。由于新旧系统数据结构是不一样的,需要定制开发迁移转化功能。很难有一个通用的工具能一键迁移;


  3. 再者,对于一些容量大、可靠性要求高的系统,要能够不影响业务,出了问题还能追溯,因此方案上还得往复杂了想。


二、常见方案

按照迁移的方案及流程,可将数据迁移分为三类:


1. 停机迁移

最简单的方案,停机迁移的顺序如下:

采用停机迁移的好处是流程操作简单,工具成本低;然而缺点也很明显, 迁移过程中业务是无法访问的,因此只适合于规格小、允许停服的场景。


2. 业务双写

业务双写是指对现有系统先进行改造升级,支持同时对新库和旧库进行写入。 之后再通过数据迁移工具对旧数据做全量迁移,待所有数据迁移转换完成后切换到新系统。

示意图:

完美数据迁移-MongoDB Stream的应用

业务双写的方案是平滑的,对线上业务影响极小;在出现问题的情况下可重新来过,操作压力也会比较小。

笔者在早些年前尝试过这样的方案,整个迁移过程确实非常顺利,但实现该方案比较复杂, 需要对现有的代码进行改造并完成新数据的转换及写入,对于开发人员的要求较高。 在业务逻辑清晰、团队对系统有足够的把控能力的场景下适用。


3. 增量迁移

增量迁移的基本思路是先进行全量的迁移转换,待完成后持续进行增量数据的处理,直到数据追平后切换系统。

示意图:

完美数据迁移-MongoDB Stream的应用

关键点

  • 要求系统支持增量数据的记录。 对于MongoDB可以利用oplog实现这点,为避免全量迁移过程中oplog被冲掉, 在开始迁移前就必须开始监听oplog,并将变更全部记录下来。 如果没有办法,需要从应用层上考虑,比如为所有的表(集合)记录下updateTime这样的时间戳, 或者升级应用并支持将修改操作单独记录下来。


  • 增量数据的回放是持续的。 在所有的增量数据回放转换过程中,系统仍然会产生新的增量数据,这要求迁移工具 能做到将增量数据持续回放并将之追平,之后才能做系统切换。


MongoDB 3.6版本开始便提供了Change Stream功能,支持对数据变更记录做监听。 这为实现数据同步及转换处理提供了更大的便利,下面将探讨如何利用Change Stream实现数据的增量迁移。


三、Change Stream 介绍

Chang Stream(变更记录流) 是指collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更。 

在该特性出现之前,你可以通过拉取 oplog达到同样的目的;但 oplog 的处理及解析相对复杂且存在被回滚的风险,如果使用不当的话还会带来性能问题。 

Change Stream 可以与aggregate framework结合使用,对变更集进行进一步的过滤或转换。

由于Change Stream 利用了存储在 oplog 中的信息,因此对于单进程部署的MongoDB无法支持Change Stream功能, 其只能用于启用了副本集的独立集群或分片集群

监听的目标

名称 说明
单个集合 除系统库(admin/local/config)之外的集合,3.6版本支持
单个数据库 除系统库(admin/local/config)之外的数据库集合,4.0版本支持
整个集群 整个集群内除去系统库( (admin/local/config)之外的集合 ,4.0版本支持

变更事件

一个Change Stream Event的基本结构如下所示:

 
   
   
 
  1. {

  2.   _id : { <BSON Object> },

  3.   "operationType" : "<operation>",

  4.   "fullDocument" : { <document> },

  5.   "ns" : {

  6.      "db" : "<database>",

  7.      "coll" : "<collection"

  8.   },

  9.   "documentKey" : { "_id" : <ObjectId> },

  10.   "updateDescription" : {

  11.      "updatedFields" : { <document> },

  12.      "removedFields" : [ "<field>", ... ]

  13.   }

  14.   "clusterTime" : <Timestamp>,

  15.   "txnNumber" : <NumberLong>,

  16.   "lsid" : {

  17.      "id" : <UUID>,

  18.      "uid" : <BinData>

  19.   }

  20. }

字段说明


名称 说明
_id 变更事件的Token对象
operationType 变更类型(见下面介绍)
fullDocument 文档内容
ns 监听的目标
ns.db 变更的数据库
ns.coll 变更的集合
documentKey 变更文档的键值,含_id字段
updateDescription 变更描述
updateDescription.updatedFields 变更中更新字段
updateDescription.removedFields 变更中删除字段
clusterTime 对应oplog的时间戳
txnNumber 事务编号,仅在多文档事务中出现,4.0版本支持
lsid 事务关联的会话编号,仅在多文档事务中出现,4.0版本支持



Change Steram支持的变更类型有以下几个:

类型 说明
insert 插入文档
delete 删除文档
replace 替换文档,当执行replace操作指定upsert时,可能是insert事件
update 更新文档,当执行update操作指定upsert时,可能是insert事件
invalidate 失效事件,比如执行了collection.drop或collection.rename

利用以下的shell脚本,可以打印出集合 T_USER上的变更事件:

 
   
   
 
  1. watchCursor=db.T_USER.watch()

  2. while (!watchCursor.isExhausted()){

  3.   if (watchCursor.hasNext()){

  4.      printjson(watchCursor.next());

  5.   }

  6. }

下面提供一些样例,感受一下

insert 事件

 
   
   
 
  1. {

  2.    "_id": {

  3.        "_data": "825B5826D10000000129295A10046A31C593902B4A9C9907FC0AB1E3C0DA46645F696400645B58272321C4761D1338F4860004"

  4.    },

  5.    "operationType": "insert",

  6.    "clusterTime": Timestamp(1532503761, 1),

  7.    "fullDocument": {

  8.        "_id": ObjectId("5b58272321c4761d1338f486"),

  9.        "name": "LiLei",

  10.        "createTime": ISODate("2018-07-25T07:30:43.398Z")

  11.    },

  12.    "ns": {

  13.        "db": "appdb",

  14.        "coll": "T_USER"

  15.    },

  16.    "documentKey": {

  17.        "_id": ObjectId("5b58272321c4761d1338f486")

  18.    }

  19. }

update事件

 
   
   
 
  1. {

  2. "_id" : {

  3.  "_data" : "825B5829DF0000000129295A10046A31C593902B4A9C9907FC0AB1E3C0DA46645F696400645B582980ACEC5F345DB998EE0004"

  4. },

  5. "operationType" : "update",

  6. "clusterTime" : Timestamp(1532504543, 1),

  7. "ns" : {

  8.  "db" : "appdb",

  9.  "coll" : "T_USER"

  10. },

  11. "documentKey" : {

  12.  "_id" : ObjectId("5b582980acec5f345db998ee")

  13. },

  14. "updateDescription" : {

  15.  "updatedFields" : {

  16.   "age" : 15

  17.  },

  18.  "removedFields" : [ ]

  19. }

  20. }

replace事件

 
   
   
 
  1. {

  2.    "_id" : {

  3.        "_data" : "825B58299D0000000129295A10046A31C593902B4A9C9907FC0AB1E3C0DA46645F696400645B582980ACEC5F345DB998EE0004"

  4.    },

  5.    "operationType" : "replace",

  6.    "clusterTime" : Timestamp(1532504477, 1),

  7.    "fullDocument" : {

  8.        "_id" : ObjectId("5b582980acec5f345db998ee"),

  9.        "name" : "HanMeimei",

  10.        "age" : 12

  11.    },

  12.    "ns" : {

  13.        "db" : "appdb",

  14.        "coll" : "T_USER"

  15.    },

  16.    "documentKey" : {

  17.        "_id" : ObjectId("5b582980acec5f345db998ee")

  18.    }

  19. }

delete事件

 
   
   
 
  1. {

  2.    "_id" : {

  3.        "_data" : "825B5827A90000000229295A10046A31C593902B4A9C9907FC0AB1E3C0DA46645F696400645B58272321C4761D1338F4860004"

  4.    },

  5.    "operationType" : "delete",

  6.    "clusterTime" : Timestamp(1532503977, 2),

  7.    "ns" : {

  8.        "db" : "appdb",

  9.        "coll" : "T_USER"

  10.    },

  11.    "documentKey" : {

  12.        "_id" : ObjectId("5b58272321c4761d1338f486")

  13.    }

  14. }

invalidate 事件 执行db.T_USER.drop() 可输出

 
   
   
 
  1. {

  2.    "_id" : {

  3.        "_data" : "825B582D620000000329295A10046A31C593902B4A9C9907FC0AB1E3C0DA04"

  4.    },

  5.    "operationType" : "invalidate",

  6.    "clusterTime" : Timestamp(1532505442, 3)

  7. }

更多的Change Event 信息可以参考这里


四、实现增量迁移

本次设计了一个简单的论坛帖子迁移样例,用于演示如何利用Change Stream实现完美的增量迁移方案。 背景如下: 现有的系统中有10W个帖子,每个帖子都属于一个频道(channel),如下表


频道名 英文简称
美食 Food
情感 Emotion
宠物 Pet
家居 House
征婚 Marriage
教育 Education
旅游 Travel



新系统中频道字段将采用英文简称,同时要求能支持平滑升级。 根据前面篇幅的叙述,我们将使用Change Stream 功能实现一个增量迁移的方案。

相关表的转换如下图:


完美数据迁移-MongoDB Stream的应用


原理 

topic 是帖子原表,在迁移开始前将开启watch任务持续获得增量数据,并记录到 topic_incr表中; 接着执行全量的迁移转换,之后再持续对增量表数据进行迁移,直到无新的增量为止。

接下来我们使用Java程序来完成相关代码,mongodb-java--driver 在 3.6 版本后才支持 watch 功能 需要确保升级到对应版本:

 
   
   
 
  1. <dependency>

  2.     <groupId>org.mongodb</groupId>

  3.     <artifactId>mongo-java-driver</artifactId>

  4.     <version>3.6.4</version>

  5. </dependency>



1. 定义Channel频道的转换表


 
   
   
 
  1. public static enum Channel {

  2.    Food("美食"),

  3.    Emotion("情感"),

  4.    Pet("宠物"),

  5.    House("家居"),

  6.    Marriage("征婚"),

  7.    Education("教育"),

  8.    Travel("旅游")

  9.    ;

  10.    private final String oldName;

  11.    public String getOldName() {

  12.        return oldName;

  13.    }

  14.    private Channel(String oldName) {

  15.        this.oldName = oldName;

  16.    }

  17.    /**

  18.     * 转换为新的名称

  19.     *

  20.     * @param oldName

  21.     * @return

  22.     */

  23.    public static String toNewName(String oldName) {

  24.        for (Channel channel : values()) {

  25.            if (channel.oldName.equalsIgnoreCase(oldName)) {

  26.                return channel.name();

  27.            }

  28.        }

  29.        return "";

  30.    }

  31.    /**

  32.     * 返回一个随机频道

  33.     *

  34.     * @return

  35.     */

  36.    public static Channel random() {

  37.        Channel[] channels = values();

  38.        int idx = (int) (Math.random() * channels.length);

  39.        return channels[idx];

  40.    }

  41. }



2. 为 topic 表预写入1w条记录


 
   
   
 
  1. private static void preInsertData() {

  2.    MongoCollection<Document> topicCollection = getCollection(coll_topic);

  3.    // 分批写入,共写入1w条数据

  4.    int current = 0;

  5.    int batchSize = 100;

  6.    while (current < 10000) {

  7.        List<Document> topicDocs = new ArrayList<Document>();

  8.        for (int j = 0; j < batchSize; j++) {

  9.            Document topicDoc = new Document();

  10.            Channel channel = Channel.random();

  11.            topicDoc.append(field_channel, channel.getOldName());

  12.            topicDoc.append(field_nonce, (int) (Math.random() * nonce_max));

  13.            topicDoc.append("title", "This is the tilte -- " + UUID.randomUUID().toString());

  14.            topicDoc.append("author", "LiLei");

  15.            topicDoc.append("createTime", new Date());

  16.            topicDocs.add(topicDoc);

  17.        }

  18.        topicCollection.insertMany(topicDocs);

  19.        current += batchSize;

  20.        logger.info("now has insert {} records", current);

  21.    }

  22. }

上述实现中,每个帖子都分配了随机的频道(channel)


3. 开启监听任务,将topic上的所有变更写入到增量表


 
   
   
 
  1. MongoCollection<Document> topicCollection = getCollection(coll_topic);

  2. MongoCollection<Document> topicIncrCollection = getCollection(coll_topic_incr);

  3. // 启用 FullDocument.update_lookup 选项

  4. cursor = topicCollection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();

  5. while (cursor.hasNext()) {

  6.    ChangeStreamDocument<Document> changeEvent = cursor.next();

  7.    OperationType type = changeEvent.getOperationType();

  8.    logger.info("{} operation detected", type);

  9.    if (type == OperationType.INSERT || type == OperationType.UPDATE || type == OperationType.REPLACE

  10.            || type == OperationType.DELETE) {

  11.        Document incrDoc = new Document(field_op, type.getValue());

  12.        incrDoc.append(field_key, changeEvent.getDocumentKey().get("_id"));

  13.        incrDoc.append(field_data, changeEvent.getFullDocument());

  14.        topicIncrCollection.insertOne(incrDoc);

  15.    }

  16. }

代码中通过watch 命令获得一个MongoCursor对象,用于遍历所有的变更。 FullDocument.UPDATE_LOOKUP选项启用后,在update变更事件中将携带完整的文档数据(FullDocument)。

watch()命令提交后,mongos会与分片上的mongod(主节点)建立订阅通道,这可能需要花费一点时间。


4. 为了模拟线上业务的真实情况,启用几个线程对topic表进行持续写操作;


 
   
   
 
  1. private static void startMockChanges() {

  2.    threadPool.submit(new ChangeTask(OpType.insert));

  3.    threadPool.submit(new ChangeTask(OpType.update));

  4.    threadPool.submit(new ChangeTask(OpType.replace));

  5.    threadPool.submit(new ChangeTask(OpType.delete));

  6. }

ChangeTask 实现逻辑如下:

 
   
   
 
  1. while (true) {

  2.    logger.info("ChangeTask {}", opType);

  3.    if (opType == OpType.insert) {

  4.        doInsert();

  5.    } else if (opType == OpType.update) {

  6.        doUpdate();

  7.    } else if (opType == OpType.replace) {

  8.        doReplace();

  9.    } else if (opType == OpType.delete) {

  10.        doDelete();

  11.    }

  12.    sleep(200);

  13.    long currentAt = System.currentTimeMillis();

  14.    if (currentAt - startAt > change_during) {

  15.        break;

  16.    }

  17. }

每一个变更任务会不断对topic产生写操作,触发一系列ChangeEvent产生。

  • doInsert:生成随机频道的topic后,执行insert

  • doUpdate:随机取得一个topic,将其channel字段改为随机值,执行update

  • doReplace:随机取得一个topic,将其channel字段改为随机值,执行replace

  • doDelete:随机取得一个topic,执行delete


doUpdate为例,实现代码如下:

 
   
   
 
  1. private void doUpdate() {

  2.    MongoCollection<Document> topicCollection = getCollection(coll_topic);

  3.    Document random = getRandom();

  4.    if (random == null) {

  5.        logger.info("update skip");

  6.        return;

  7.    }

  8.    String oldChannel = random.getString(field_channel);

  9.    Channel channel = Channel.random();

  10.    random.put(field_channel, channel.getOldName());

  11.    random.put("createTime", new Date());

  12.    topicCollection.updateOne(new Document("_id", random.get("_id")), new Document("$set", random));

  13.    counter.onChange(oldChannel, channel.getOldName());

  14. }



5. 启动一个全量迁移任务,将 topic 表中数据迁移到 topic_new 新表


 
   
   
 
  1. final MongoCollection<Document> topicCollection = getCollection(coll_topic);

  2. final MongoCollection<Document> topicNewCollection = getCollection(coll_topic_new);

  3. Document maxDoc = topicCollection.find().sort(new Document("_id", -1)).first();

  4. if (maxDoc == null) {

  5.    logger.info("FullTransferTask detect no data, quit.");

  6.    return;

  7. }

  8. ObjectId maxID = maxDoc.getObjectId("_id");

  9. logger.info("FullTransferTask maxId is {}..", maxID.toHexString());

  10. AtomicInteger count = new AtomicInteger(0);

  11. topicCollection.find(new Document("_id", new Document("$lte", maxID)))

  12.        .forEach(new Consumer<Document>() {

  13.            @Override

  14.            public void accept(Document topic) {

  15.                Document topicNew = new Document(topic);

  16.                // channel转换

  17.                String oldChannel = topic.getString(field_channel);

  18.                topicNew.put(field_channel, Channel.toNewName(oldChannel));

  19.                topicNewCollection.insertOne(topicNew);

  20.                if (count.incrementAndGet() % 100 == 0) {

  21.                    logger.info("FullTransferTask progress: {}", count.get());

  22.                }

  23.            }

  24.        });

  25. logger.info("FullTransferTask finished, count: {}", count.get());

在全量迁移开始前,先获得当前时刻的的最大 _id 值(可以将此值记录下来)作为终点。 随后逐个完成迁移转换。


6. 在全量迁移完成后,便开始最后一步:增量迁移

注:增量迁移过程中,变更操作仍然在进行

 
   
   
 
  1. final MongoCollection<Document> topicIncrCollection = getCollection(coll_topic_incr);

  2. final MongoCollection<Document> topicNewCollection = getCollection(coll_topic_new);

  3. ObjectId currentId = null;

  4. Document sort = new Document("_id", 1);

  5. MongoCursor<Document> cursor = null;

  6. // 批量大小

  7. int batchSize = 100;

  8. AtomicInteger count = new AtomicInteger(0);

  9. try {

  10.    while (true) {

  11.        boolean isWatchTaskStillRunning = watchFlag.getCount() > 0;

  12.        // 按ID增量分段拉取

  13.        if (currentId == null) {

  14.            cursor = topicIncrCollection.find().sort(sort).limit(batchSize).iterator();

  15.        } else {

  16.            cursor = topicIncrCollection.find(new Document("_id", new Document("$gt", currentId)))

  17.                    .sort(sort).limit(batchSize).iterator();

  18.        }

  19.        boolean hasIncrRecord = false;

  20.        while (cursor.hasNext()) {

  21.            hasIncrRecord = true;

  22.            Document incrDoc = cursor.next();

  23.            OperationType opType = OperationType.fromString(incrDoc.getString(field_op));

  24.            ObjectId docId = incrDoc.getObjectId(field_key);

  25.            // 记录当前ID

  26.            currentId = incrDoc.getObjectId("_id");

  27.            if (opType == OperationType.DELETE) {

  28.                topicNewCollection.deleteOne(new Document("_id", docId));

  29.            } else {

  30.                Document doc = incrDoc.get(field_data, Document.class);

  31.                // channel转换

  32.                String oldChannel = doc.getString(field_channel);

  33.                doc.put(field_channel, Channel.toNewName(oldChannel));

  34.                // 启用upsert

  35.                UpdateOptions options = new UpdateOptions().upsert(true);

  36.                topicNewCollection.replaceOne(new Document("_id", docId),

  37.                        incrDoc.get(field_data, Document.class), options);

  38.            }

  39.            if (count.incrementAndGet() % 10 == 0) {

  40.                logger.info("IncrTransferTask progress, count: {}", count.get());

  41.            }

  42.        }

  43.        // 当watch停止工作(没有更多变更),同时也没有需要处理的记录时,跳出

  44.        if (!isWatchTaskStillRunning && !hasIncrRecord) {

  45.            break;

  46.        }

  47.        sleep(200);

  48.    }

  49. } catch (Exception e) {

  50.    logger.error("IncrTransferTask ERROR", e);

  51. }

增量迁移的实现是一个*不断 tail *的过程,利用 *_id 字段的有序特性 * 进行分段迁移; 即记录下当前处理的 _id 值,循环拉取在 该 _id 值之后的记录进行处理。

增量表(topic_incr)中除了DELETE变更之外,其余的类型都保留了整个文档, 因此可直接利用 replace + upsert 追加到新表。


7. 最后,运行整个程序


 
   
   
 
  1. [2018-07-26 19:44:16] INFO ~ IncrTransferTask progress, count: 2160

  2. [2018-07-26 19:44:16] INFO ~ IncrTransferTask progress, count: 2170

  3. [2018-07-26 19:44:27] INFO ~ all change task has stop watch task quit.

  4. [2018-07-26 19:44:27] INFO ~ IncrTransferTask finished, count: 2175

  5. [2018-07-26 19:44:27] INFO ~ TYPE 美食:1405

  6. [2018-07-26 19:44:27] INFO ~ TYPE 宠物:1410

  7. [2018-07-26 19:44:27] INFO ~ TYPE 征婚:1428

  8. [2018-07-26 19:44:27] INFO ~ TYPE 家居:1452

  9. [2018-07-26 19:44:27] INFO ~ TYPE 教育:1441

  10. [2018-07-26 19:44:27] INFO ~ TYPE 情感:1434

  11. [2018-07-26 19:44:27] INFO ~ TYPE 旅游:1457

  12. [2018-07-26 19:44:27] INFO ~ ALLCHANGE 12175

  13. [2018-07-26 19:44:27] INFO ~ ALLWATCH 2175

查看 topic 表和 topic_new 表,发现两者数量是相同的。 

为了进一步确认一致性,我们对两个表的分别做一次聚合统计:

topic表

 
   
   
 
  1. db.topic.aggregate([{

  2.    "$group":{

  3.        "_id":"$channel",

  4.        "total": {"$sum": 1}

  5.        }

  6.    },

  7.    {

  8.        "$sort": {"total":-1}

  9.        }

  10.    ])

topic_new表

 
   
   
 
  1. db.topic_new.aggregate([{

  2.    "$group":{

  3.        "_id":"$channel",

  4.        "total": {"$sum": 1}

  5.        }

  6.    },

  7.    {

  8.        "$sort": {"total":-1}

  9.        }

  10.    ])

前者输出结果:

后者输出结果:

前后对比的结果是一致的!


五、后续优化

前面的章节演示了一个增量迁移的样例,在投入到线上运行之前,这些代码还得继续优化:

  • 写入性能,线上的数据量可能会达到亿级,在全量、增量迁移时应采用合理的批量化处理; 另外可以通过增加并发线程,添置更多的Worker,分别对不同业务库、不同表进行处理以提升效率。 增量表存在幂等性,即回放多次其最终结果还是一致的,但需要保证表级有序,即一个表同时只有一个线程在进行增量回放。

  • 容错能力,一旦 watch 监听任务出现异常,要能够从更早的时间点开始(使用startAtOperationTime参数), 而如果写入时发生失败,要支持重试。

  • 回溯能力,做好必要的跟踪记录,比如将转换失败的ID号记录下来,旧系统的数据需要保留, 以免在事后追究某个数据问题时找不着北。

  • 数据转换,新旧业务的差异不会很简单,通常需要借助大量的转换表来完成。

  • 一致性检查,需要根据业务特点开发自己的一致性检查工具,用来证明迁移后数据达到想要的一致性级别。


BTW,数据迁移一定要结合业务特性、架构差异来做考虑,否则还是在耍流氓。


小结

服务化系统中扩容、升级往往会进行数据迁移,对于业务量大,中断敏感的系统通常会采用平滑迁移的方式。 MongoDB 3.6 版本后提供了 Change Stream 功能以支持应用订阅数据的变更事件流, 本文使用 Stream 功能实现了增量平滑迁移的例子,这是一次尝试,相信后续这样的应用场景会越来越多。 


附参考文档

百亿级数据迁移-58沈剑 

MongoDB-ChangeStream 

Use-ChangeStream To Handle Temperature


以上是关于完美数据迁移-MongoDB Stream的应用的主要内容,如果未能解决你的问题,请参考以下文章

爆赞!千亿级Mysql数据迁移mongodb成本节省及性能优化实践

【MongoDB-数据迁移】

Mongoose / Mongodb 迁移到 MySQL

将Kafka Streams代码迁移到Spring Cloud Stream吗?

无法安装 django-activity-stream 数据库表

如何将应用完美迁移至Android P版本