Nodejs续集批量更新

Posted

技术标签:

【中文标题】Nodejs续集批量更新【英文标题】:Nodejs sequelize bulk upsert 【发布时间】:2018-06-15 22:27:28 【问题描述】:

有没有办法在 sequelize 中进行批量 upsert。另外,我可以指定用于检查重复项的键吗?

我尝试了以下方法,但没有成功:

Employee.bulkCreate(data, 
    updateOnDuplicate: true
);

虽然批量创建工作正常。上面的语句总是在数据库中创建新条目。

【问题讨论】:

【参考方案1】:

来自官方sequelizejs reference。

可以使用bulkCreateupdateOnDuplicate 选项来完成。

例如这样:

Employee.bulkCreate(dataArray, 
    
        fields:["id", "name", "address"] ,
        updateOnDuplicate: ["name"] 
     )

updateOnDuplicate 是一个字段数组,当主键(或可能是唯一键)匹配行时将更新这些字段。确保您的模型和 dataArray 中至少有一个唯一字段(比如说 id)用于 upsert。

【讨论】:

感谢followtest52,宾果游戏:) 不幸的是,文档说只有 mysql =( 现在也支持 Postgres -“如果行键已经存在(重复键更新时)要更新的字段?(仅 MySQL、MariaDB 和 Postgres >= 9.5 支持)。默认情况下,所有字段都会更新。” 值得一提:通常由 sequelize 自动更新的字段 updatedAt不会得到更新,除非它通过 updateOnDuplicate 明确传递 注意:如果您的表有 uniq 索引,则此解决方案将不起作用。 PR 正在进行中:github.com/sequelize/sequelize/pull/12516【参考方案2】:

更新

Sequelize 6.x 增加了对所有方言的所有 UPSERT 的支持,所以@followtest52 的回答对 PostgreSQL 也有效。

原创

由于答案不支持 PostgreSQL,使用 Sequelize 的 """"best"""" 替代方案是使用 ON CONFLICT 语句进行手动查询。示例(打字稿):

const values: Array<Array<number | string>> = [
    [1, 'Apple', 'Red', 'Yummy'],
    [2, 'Kiwi', 'Green', 'Yuck'],
]

const query = 'INSERT INTO fruits (id, name, color, flavor) VALUES ' +
     values.map(_ =>  return '(?)' ).join(',') +
     ' ON CONFLICT (id) DO UPDATE SET flavor = excluded.flavor;'

sequelize.query( query, values ,  type: sequelize.QueryTypes.INSERT )

这将构建如下查询:

INSERT INTO 
    fruits (id, name, color, flavor)
VALUES 
    (1, 'Apple', 'Red', 'Yummy'),
    (2, 'Kiwi', 'Green', 'Yuck')
ON CONFLICT (id) DO UPDATE SET 
    flavor = excluded.flavor;

可以说,这不是必须手动构建查询的理想解决方案,因为它违背了使用 sequelize 的目的,但如果它是一次性查询,您并不迫切需要,您可以使用此方法。

【讨论】:

不要忘记将该字段标记为唯一@Unique decorator in case of sequelize-typescript,否则此代码将尝试使用主键而不是name。 Postgres 14.1【参考方案3】:

2019 年更新

适用于所有方言,只要匹配某个最低版本

HERE是对相同源代码的引用

请注意,个别选项可能适用于所有方言,也可能不适用于所有方言 例如,updateOnDuplicate 仅适用于 MySQL、MariaDB、 SQLite 和 Postgres

ignoreDuplicates 选项不适用于 MSSQL

还要检查源代码中的这块代码

if (Array.isArray(options.updateOnDuplicate) && options.updateOnDuplicate.length) 
    options.updateOnDuplicate = _.intersection(
        _.without(Object.keys(model.tableAttributes), createdAtAttr),
        options.updateOnDuplicate
    );
 else 
    return Promise.reject(new Error('updateOnDuplicate option only supports non-empty array.'));

updateOnDuplicate 必须是一个数组,不能为真或假

按照以上几点,你的代码应该是这样的

Employee.bulkCreate(data, 
    updateOnDuplicate: ['employeeName', 'employeeAge'],
);

更新:

既然有人提到它不起作用,试试这个

models.Employee.bulkCreate(items, 
    returning: ['employeeId'],
    ignoreDuplicates: true
  )

【讨论】:

items 应该是一个对象数组,具有与您的模型相同的字段 您的更新适用于不需要更新值的情况。在 postgres 中说“如果它不存在,就让它......但如果它确实存在,不要改变任何东西”是一个很好的方式。我现在可以用它来解决我的部分问题。但是我仍然需要(a)更新“重复”上的行字段,然后如果它们不存在则创建这些行。尽量避免使用原始 SQL。 你试过这个 updateOnDuplicate: ['employeeName', 'employeeAge'] 它适用于我的桌子 这很奇怪,它对你有用。我正在使用带有最新版本的 sequelize 的 heroku postgres。当我使用 Page.bulkCreate(data.pages, returned: true, updateOnDuplicate: ['id'] ) 时,它会创建新实例,但不会更新旧实例。 你的续集版本是什么【参考方案4】:

2020 年 10 月 1 日更新 续集版本:^6.3.5

问题仍然存在。我们仍然无法使用唯一的复合索引 bulkUpsertbulkCreateupdateOnDuplicates 还不能使用唯一的复合索引。仍有 PR 等待合并,可能会解决此问题:-https://github.com/sequelize/sequelize/pull/12516https://github.com/sequelize/sequelize/pull/12547

解决方法

目前,如果有人想要快速解决方法,那么可以通过修改您自己的表属性、名称和数据来使用以下基于原始查询的包装器:-

const bulkUpsertIntoTable = async ( bulkUpsertableData ) => 
  try 
    /* eslint-disable */
   // id column will automatically be incremented if you have set it to auto-increment
   const query = `INSERT INTO "Table" ("non_id_attr1", "non_id_attr2", "non_id_attr3","createdAt", "updatedAt") VALUES $bulkUpsertableData
    .map((_) => "(?)")
    .join(
      ","
    ) ON CONFLICT ("non_id_attr1","non_id_attr2") DO UPDATE SET "non_id_attr1"=excluded."non_id_attr1", "non_id_attr2"=excluded."non_id_attr2", "non_id_attr3"=excluded."non_id_attr3",  "updatedAt"=excluded."updatedAt" RETURNING "id","non_id_attr1","non_id_attr2","non_id_attr3","createdAt","updatedAt";`;
    /* eslint-enable */

    return await models.sequelize.query(query, 
      replacements: bulkUpsertableData,//------> dont forget to pass your data here
      type: models.Sequelize.QueryTypes.INSERT,
      // transaction:t -----> if required to be done in transaction
    );
   catch (error) 
    console.error("Bulk Upserting into Table:", error);
    throw error;
  
;

重点是创建bulkUpsertableData,它应该是Array&lt;Array&gt; ie:- [[]]。示例创建:-

// with reference to above wrapper function
const bulkUpsertableData = Object.keys(myObjectData).map(type => [
      myObjectData[type],// -----> non_id_attr1
      type, // -----> non_id_attr2
      someOtherRandomValue, // -----> non_id_attr3
      new Date(), // -----> created_at
      new Date(), // -----> updated_at
]);

// response will have all the raw attributes mentioned in RETURNING clause
const upsertedTableResponse = await bulkUpsertIntoTable( bulkUpsertableData );

【讨论】:

感谢@Yedhin 的回答,我将根据您的代码发布更通用的解决方案。【参考方案5】:

2020 年 11 月 2 日更新

根据@Yedhin 的回答,这里有一个更通用的解决方案(打字稿):

export const bulkUpsert = async <T extends Model<T>, K extends keyof T>(
  items: Partial<T>[],
  model: ModelCtor<T>,
  conflictKeys: K[],
  excludeFromUpdate: K[] = [],
): Promise<[number, number]> => 
  if (!items.length) 
    return [0, 0];
  

  const  tableName, sequelize, name  = model;
  if (!sequelize) 
    throw new Error(`Sequelize not initialized on $name?`);
  

  const sample = items[0];
  const fields = Object.keys(sample) as K[];
  const createFields = `("$fields.join(`","`)")`;
  const updateFields = fields
    .filter((field) => ![...excludeFromUpdate, ...conflictKeys].includes(field))
    .map((field) => `"$field"=EXCLUDED."$field"`)
    .join(', ');
  const values = items.map(dataToSql(sequelize)).join(',');
  const onConflict = `ON CONFLICT ("$conflictKeys.join(`","`)")`;
  const returning = `"$fields.join('","')"`;

  const query = `INSERT INTO "$tableName" $createFields VALUES $values $onConflict DO UPDATE SET $updateFields RETURNING $returning;`;

  return sequelize.query(query, 
    replacements: items,
    type: QueryTypes.INSERT,
  );
;

const valueToSql = (sequelize: Sequelize) => (
  value: string | number | boolean | null | Date | string[] | Record<string, unknown>,
): string => 
  if (value === null) 
    return 'null';
  

  if (typeof value === 'boolean') 
    return value ? 'true' : 'false';
  

  if (typeof value !== 'object' || value instanceof Date) 
    return sequelize.escape(value);
  

  return sequelize.escape(JSON.stringify(value));
;


const dataToSql = <T extends Node<T>>(sequelize: Sequelize) => (data: Partial<T>): string =>
  `($Object.values(data).map(valueToSql(sequelize)).join(','))`;

【讨论】:

【参考方案6】:

2021 年 9 月更新

具有唯一复合索引的批量更新插入现在仅适用于 Sequelize v6.4.4。

https://github.com/sequelize/sequelize/pull/13345

【讨论】:

我如何实际使用复合索引并指定我想使用哪些键来确定重复项?看起来有一个名为upsertKeys 的属性可以传递bulkCreate,对吗?我在看这里github.com/sequelize/sequelize/pull/13345/commits/… updateOnDuplicate 还应该是一个数组吗?这就是文档所说的,但我认为它应该是一个布尔值【参考方案7】:

修改后的版本。哪个可以完成这项工作。

/**
 *
 * @param * data Raw JSON data
 * @param * model Sequalize model
 * @param * fields Columns thare need to be inserted/update.If none passed, it will extract fields from the data.
 * @returns response consists of data with type of action(upsert/create) performed for each record.
 */
export const bulkUpert = (data, model, fields = undefined) => 
  console.log("****Bulk insertion started****");
  if (!data.length) 
    return [0, 0];
  
  const  name, primaryKeyAttributes  = model;

  console.log(name, primaryKeyAttributes, fields);

  if (!sequelize) 
    throw new Error(`Sequalize not initialized on $name`);
  

  const extractFields = fields ? fields : Object.keys(data[0]);
  const createFields = extractFields.join(", ");
  const values = data.map(dataToSql()).join(", ");

  const query = `MERGE INTO
    [$name]
    WITH(HOLDLOCK)
    AS [targetTable]
    USING (
        VALUES $values
    )
    AS [sourceTable]
    (
      $createFields
    ) ON
    $getPrimaryQueryString(primaryKeyAttributes)
    WHEN MATCHED THEN
        UPDATE SET
            $getUpdateFieldsString(extractFields)
    WHEN NOT MATCHED THEN
        INSERT (
              $createFields
            )
        VALUES
            (
                $getInsertValuesString(extractFields)
            )
    OUTPUT $action, INSERTED.*;`;
  return sequelize.query(query);
;

const valueToSQL = () => (value) => 
  if (value === null) 
    return "null";
  

  if (typeof value === "boolean") 
    return value ? "true" : "false";
  

  if (typeof value !== "object" || value instanceof Date) 
    return sequelize.escape(value);
  

  return sequelize.escape(JSON.stringify(value));
;

const getPrimaryQueryString = (primaryKeyAttributes) => 
  let string = "";
  for (let i = 0; i < primaryKeyAttributes.length; i++) 
    string += `[targetTable].[$primaryKeyAttributes[i]] = [sourceTable].[$primaryKeyAttributes[i]]`;
    if (i != primaryKeyAttributes.length - 1) 
      string += " AND";
    
  
  return string;
;

const getUpdateFieldsString = (fields) => 
  let string = "";
  for (let i = 0; i < fields.length; i++) 
    string += `[targetTable].[$fields[i]] = [sourceTable].[$fields[i]]`;
    if (i != fields.length - 1) 
      string += ", ";
    
  
  return string;
;

const getInsertValuesString = (fields) => 
  let string = "";
  for (let i = 0; i < fields.length; i++) 
    string += `[sourceTable].[$fields[i]]`;
    if (i != fields.length - 1) 
      string += ", ";
    
  
  return string;
;

const dataToSql = () => (data) =>
  `($Object.values(data).map(valueToSQL()).join(","))`;

【讨论】:

以上是关于Nodejs续集批量更新的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法使用嵌套创建批量更新棱镜中的对象

续集更新交易

Mybatis批量更新,批量删除

为啥批量插入/更新更快?批量更新如何工作?

Thinkphp怎么批量更新数据

Thinkphp怎么批量更新数据