从 .CSV 文件比较/插入/更新 MySQL 数据库中的产品的最佳方法是啥
Posted
技术标签:
【中文标题】从 .CSV 文件比较/插入/更新 MySQL 数据库中的产品的最佳方法是啥【英文标题】:What's the best way to compare / insert / update products in a MySQL db from a .CSV file从 .CSV 文件比较/插入/更新 MySQL 数据库中的产品的最佳方法是什么 【发布时间】:2015-06-27 08:33:38 【问题描述】:在我们公司,我们每天早上从供应商的 FTP 服务器中提取一个 .CSV 文件并更新我们的产品数据(价格、库存等)。
我们为此任务编写了一个 cron,因为它应该自动运行。
当前脚本在大多数情况下都有效。但是,有时我们会收到错误消息:“Allowed memory size of 134217728 bytes exhausted (tried to allocate 75 bytes)”。
我们使用 CodeIgniter 和 DataMapper ORM。一个可能的设计错误可能是脚本使用对象而不是数组的事实...
每次检查 49000 行。
任何人都可以帮助我们找到另一种方法吗?
以下脚本是文件复制后运行的函数。
// Include auth connection params
$udb = $this->_completeParams($db);
// Check if an update was downloaded
$supplier = new Supplier(NULL,$udb);
$supplier->where(array('alias'=>'XX','name'=>'xxxxxxxxx'))->get(1);
$cronStart = date('Y-m-d H:i:s');
$cronStartDate = date('Y-m-d');
//mail($this->adminMail, 'CRON', 'Gestart:' .$cronStart, $this->headerMail);
//$message .= '1: '.memory_get_usage()."\r\n";
if($supplier->import_found)
//if(true)
$rows = 0;
$updated = 0;
$new = 0;
//$aAvailable = array();
$message .= '<h3>Start: '.$cronStart.'</h3>' . "\r\n";
$object = new Supplier_product(NULL,$udb);
$cat = new Supplier_category(NULL, $udb);
$manu = new Supplier_manufacturer(NULL, $udb);
$auvibel = new Supplier_auvibel(NULL, $udb);
$bebat = new Supplier_bebat(NULL, $udb);
$recupel = new Supplier_recupel(NULL, $udb);
$reprobel = new Supplier_reprobel(NULL, $udb);
$files = glob($this->tempDir.'XXXXX/prices/*');
foreach($files as $file)
$ext = pathinfo($file, PATHINFO_EXTENSION);
$data = ($ext == 'txt')?$this->_csvToArray($file, ';'):false;
// If the CSV data is in $data
if($data !== false)
$totalCount = count($data);
for($i = 0; $i <= $totalCount; $i++)
//$aAvailable[] = $data[$i]['ArtID'];
$rows++;
//$message .= 'loop start: '.memory_get_usage()."\r\n";
$object->where(array('art_id'=>$data[$i]['ArtID'],'supplier_id'=>$supplier->id))->get(1);
$auvibel->select('value')->where(array('art_id'=>$data[$i]['ArtID'], 'supplier_id'=>$supplier->id))->get(1);
$auvibel->value = ($auvibel->exists())?$auvibel->value:0;
$bebat->select('value')->where(array('art_id'=>$data[$i]['ArtID'], 'supplier_id'=>$supplier->id))->get(1);
$bebat->value = ($bebat->exists())?$bebat->value:0;
$recupel->select('value')->where(array('art_id'=>$data[$i]['ArtID'], 'supplier_id'=>$supplier->id))->get(1);
$recupel->value = ($recupel->exists())?$recupel->value:0;
$reprobel->select('value')->where(array('art_id'=>$data[$i]['ArtID'], 'supplier_id'=>$supplier->id))->get(1);
$reprobel->value = ($reprobel->exists())?$reprobel->value:0;
$intrastat = 0;
$data[$i]['LP_Eur'] = ($data[$i]['LP_Eur'] != '')?str_replace(',', '.', $data[$i]['LP_Eur']):0;
$data[$i]['DE_Eur'] = ($data[$i]['DE_Eur'] != '')?str_replace(',', '.', $data[$i]['DE_Eur']):0;
$data[$i]['D1_Eur'] = ($data[$i]['D1_Eur'] != '')?str_replace(',', '.', $data[$i]['D1_Eur']):0;
$data[$i]['D1_Eur'] = ($data[$i]['D2_Eur'] != '')?str_replace(',', '.', $data[$i]['D2_Eur']):0;
$data[$i]['PricePersonal_Eur'] = ($data[$i]['PricePersonal_Eur'] != '')?str_replace(',', '.', $data[$i]['PricePersonal_Eur']):0;
$data[$i]['BackorderDate'] = ($data[$i]['BackorderDate'] != '')?date('Y-m-d', strtotime($data[$i]['BackorderDate'])):NULL;
$data[$i]['ModifDate'] = ($data[$i]['ModifDate'] != '')?date('Y-m-d', strtotime($data[$i]['ModifDate'])):NULL;
if($object->exists())
if($object->allow_cron_update) //if($data[$i]['ModifDate'] != $object->modified)
// Check if category group exists
$cat->select('id')->where(array(
'supplier_id' => $supplier->id,
'name_a' => $data[$i]['Class1'],
'name_b' => $data[$i]['Class2'],
'name_c' => $data[$i]['Class3'],
))->get(1);
if(!$cat->exists())
// Category should be added
$cat->supplier_id = $supplier->id;
$cat->name_a = $data[$i]['Class1'];
$cat->name_b = $data[$i]['Class2'];
$cat->name_c = $data[$i]['Class3'];
$cat->save();
// Log as notification: New supplier categorie
$this->_notify('Niewe categorie',array(
'body' => $supplier->name.' heeft "'.$cat->name_a.' - '.$cat->name_b.' - '.$cat->name_c.'" als nieuwe categorie toegevoegd.',
'controller' => 'leveranciers',
'trigger' => 'new_supplier_category',
'url' => base_url().'leveranciers/item/'.$supplier->id.'/categorien',
'icon' => 'icon-truck',
'udb' => $udb,
));
// Check if manufacturer exists
$manu->select('id')->where(array(
'name' => $data[$i]['PublisherName']
))->get(1);
if(!$manu->exists())
// Manufacturer should be added
$manu->name = $data[$i]['PublisherName'];
$manu->save($supplier);
// Add the product to the database
$object->art_id = $data[$i]['ArtID'];
$object->supplier_id = $supplier->id;
$object->supplier_category_id = $cat->id;
$object->supplier_manufacturer_id = $manu->id;
$object->part_id = $data[$i]['PartID'];
$object->ean_code = $data[$i]['EanCode'];
$object->name = $data[$i]['Description'];
$object->description = NULL;
$object->version = $data[$i]['Version'];
$object->language = $data[$i]['Language'];
$object->media = $data[$i]['Media'];
$object->trend = $data[$i]['Trend'];
$object->price_group = $data[$i]['PriceGroup'];
$object->price_code = $data[$i]['PriceCode'];
$object->eur_lp = $data[$i]['LP_Eur'];
$object->eur_de = $data[$i]['DE_Eur'];
$object->eur_d1 = $data[$i]['D1_Eur'];
$object->eur_d2 = $data[$i]['D2_Eur'];
$object->eur_personal = $data[$i]['PricePersonal_Eur'];
$object->stock = $data[$i]['Stock'];
$object->backorder = ($data[$i]['BackorderDate'] != '' && !empty($data[$i]['BackorderDate']))?$data[$i]['BackorderDate']:NULL;
$object->modified = ($data[$i]['ModifDate'] != '' && !empty($data[$i]['ModifDate']))?$data[$i]['ModifDate']:NULL;
$object->flag = 'MODIFIED';
$object->auvibel = $auvibel->value;
$object->bebat = $bebat->value;
$object->intrastat = $intrastat;
$object->recupel = $recupel->value;
$object->reprobel = $reprobel->value;
$object->save();
$updated++;
elseif(($object->auvibel != $auvibel) || ($object->bebat != $bebat) || ($object->recupel != $recupel) || ($object->reprobel != $reprobel))
$object->auvibel = $auvibel->value;
$object->bebat = $bebat->value;
$object->intrastat = $intrastat;
$object->recupel = $recupel->value;
$object->reprobel = $reprobel->value;
$object->save();
else
// Check if category group exists
$cat->select('id')->where(array(
'supplier_id' => $supplier->id,
'name_a' => $data[$i]['Class1'],
'name_b' => $data[$i]['Class2'],
'name_c' => $data[$i]['Class3'],
))->get(1);
if(!$cat->exists())
// Category should be added
$cat->supplier_id = $supplier->id;
$cat->name_a = $data[$i]['Class1'];
$cat->name_b = $data[$i]['Class2'];
$cat->name_c = $data[$i]['Class3'];
$cat->save();
// Log as notification: New supplier categorie
$this->_notify('Niewe categorie',array(
'body' => $supplier->name.' heeft "'.$cat->name_a.' - '.$cat->name_b.' - '.$cat->name_c.'" als nieuwe categorie toegevoegd.',
'controller' => 'leveranciers',
'trigger' => 'new_supplier_category',
'url' => '[hidden-url]'.$supplier->id.'/categorien',
'icon' => 'icon-truck',
'udb' => $udb,
));
// Check if manufacturer exists
$manu->select('id')->where(array(
'name' => $data[$i]['PublisherName']
))->get(1);
if(!$manu->exists())
// Manufacturer should be added
$manu->name = $data[$i]['PublisherName'];
$manu->save($supplier);
// Add the product to the database
$object->art_id = $data[$i]['ArtID'];
$object->supplier_id = $supplier->id;
$object->supplier_category_id = $cat->id;
$object->supplier_manufacturer_id = $manu->id;
$object->part_id = $data[$i]['PartID'];
$object->ean_code = $data[$i]['EanCode'];
$object->name = $data[$i]['Description'];
$object->description = NULL;
$object->version = (($data[$i]['Version'] != '')?$data[$i]['Version']:NULL);
$object->language = (($data[$i]['Language'] != '')?$data[$i]['Language']:NULL);
$object->media = (($data[$i]['Media'] != '')?$data[$i]['Media']:NULL);
$object->trend = (($data[$i]['Trend'] != '')?$data[$i]['Trend']:NULL);
$object->price_group = (($data[$i]['PriceGroup'] != '')?$data[$i]['PriceGroup']:NULL);
$object->price_code = (($data[$i]['PriceCode'] != '')?$data[$i]['PriceCode']:NULL);
$object->eur_lp = (($data[$i]['LP_Eur'] != '')?$data[$i]['LP_Eur']:NULL);
$object->eur_de = (($data[$i]['DE_Eur'] != '')?$data[$i]['DE_Eur']:NULL);
$object->eur_d1 = (($data[$i]['D1_Eur'] != '')?$data[$i]['D1_Eur']:NULL);
$object->eur_d2 = (($data[$i]['D2_Eur'] != '')?$data[$i]['D2_Eur']:NULL);
$object->eur_personal = $data[$i]['PricePersonal_Eur'];
$object->stock = $data[$i]['Stock'];
$object->backorder = ($data[$i]['BackorderDate'] != '' && !empty($data[$i]['BackorderDate']))?$data[$i]['BackorderDate']:NULL;
$object->modified = ($data[$i]['ModifDate'] != '' && !empty($data[$i]['ModifDate']))?$data[$i]['ModifDate']:NULL;
$object->flag = NULL;
$object->auvibel = $auvibel->value;
$object->bebat = $bebat->value;
$object->intrastat = $intrastat;
$object->recupel = $recupel->value;
$object->reprobel = $reprobel->value;
$object->save();
//$object->clear_cache();
$new++;
//$message .= 'loop end A: '.memory_get_usage().' - '.$i."\r\n";
$object->clear();
$cat->clear();
$manu->clear();
$auvibel->clear();
$bebat->clear();
$recupel->clear();
$reprobel->clear();
unset($data[$i]);
//$message .= 'loop end B: '.memory_get_usage()."\r\n";
unset($manu);
unset($auvibel);
unset($bebat);
unset($recupel);
unset($reprobel);
if(is_file($file))
unlink($file);
$object->clear();
//$message .= 'BEFORE MARK EOL: '.memory_get_usage()."\r\n";
/**
* Mark products as EOL when not found in file
*/
$eolCount = 0;
$eol = $object
->group_start()
->where('flag IS NULL')
->or_where('flag !=', 'EOL')
->group_end()
->where('supplier_id', $supplier->id)
->group_start()
->group_start()->where('updated IS NOT NULL')->where('updated <',$cronStart)->group_end()
->or_group_start()->where('updated IS NULL')->where('created <',$cronStart)->group_end()
->group_end()
->get_iterated();
$p = new Product(NULL,$udb);
//unset($aAvailable);
foreach($eol as $i => $product)
$product->flag = "EOL";
$product->save();
if($product->art_id != NULL)
// The 'copied' products should be marked eol in the webshop!
$p->where('art_code',$product->art_id)->where('supplier_product_id', $product->id)->get();
if($p->exists())
$p->eol = date('Y-m-d H:i:s');
$p->save();
$p->clear();
$product->clear();
$eolCount++;
//unset($eol[$i]);
//$message .= 'INSIDE MARK EOL: '.memory_get_usage()."\r\n";
unset($product);
$object->clear();
//$message .= 'AFTER MARK EOL: '.memory_get_usage()."\r\n";
if($eolCount > 0)
// Log as notification: supplier products marked EOL
$this->_notify('EOL melding',array(
'body' => "Er ".(($eolCount == 1)?'is een product':'zijn '.$eolCount.' producten')." gemarkeerd als EOL",
'controller' => 'leveranciers',
'trigger' => 'eol_supplier_product',
'url' => '[hidden-url]'.$supplier->id.'/artikels',
'icon' => 'icon-truck',
'udb' => $udb,
));
// After looping files build e-mail.
$message .= 'Totaal: '.$rows. "\r\n";
$message .= 'new: '.$new. "\r\n";
$message .= 'updated: '.$updated. "\r\n";
$message .= 'EOL: '.$eolCount. "\r\n";
$subject = 'Import XXXXX Update';
// No updates found
else
$subject = 'Import XXXXX No Update Found';
$message .= "\r\n";
$message .= '<h3>Einde: '.date('Y-m-d H:i:s').'</h3>' . "\r\n";
mail($this->adminMail, $subject, $message, $this->headerMail);
// Remove import_found marker for supplier
$supplier->import_found = false;
$supplier->save();
【问题讨论】:
我们不会通读代码墙来猜测问题,但显然您的内存不足。减少您在脚本中执行的“东西”数量,或提高 php 的内存限制。这是您仅有的两个选择。 您可以在 tmp 表中使用 mysqlsLOAD DATA INFILE
。然后在 tmp 表上添加一个触发器,并在验证成功时将其插入到真实表中。这将大大改善您的导入,大约需要 1 秒。
我建议你将你的 csv 文件分块成更小的文件,以避免内存耗尽错误....***.com/questions/16732590/…
@MarcB 如果您认为这是一堵代码墙,您真的应该看看完整的控制器文件。我粘贴了所有相关代码,否则人们会一直要求我发布到目前为止的代码。
@Daan 感谢您的建议。尽管我们CRM的te表中的字段名称与文件中的字段名称不同。我不喜欢编写复杂的 SQL 触发器或存储过程。
【参考方案1】:
我们也遇到过类似的情况。经过多次尝试使脚本变得更好后,我们决定需要另一种方法来完成导入工作,而不是花费大约 10 个小时。
我们所做的是转储所有 PHP 代码,而是使用 mysqlimport
将 CSV 文件的内容直接加载到表中。该表现在包含我们需要的所有内容,但不是对我们有用的形式(没有结构,某些字段需要一些处理等)
但是,因为现在所有内容都在数据库中,所以我们可以通过查询来做任何我们想做的事情。
比如删除所有不再在导入文件中的数据,就是DELETE FROM structured_table AS st LEFT JOIN unstructured_table AS ut ON st.someField = ut.someField WHERE ut.someField IS NULL;
,更新现有记录就是UPDATE structured_table AS st INNER JOIN unstructured_table AS ut ON st.someField = ut.someField SET st.anotherField = CONCAT(ut.aField, ' ', ut.yetAnotherField);
。
显然,对于复杂的导入脚本,您的查询会更复杂,您需要更多的查询。您甚至可能需要投入一些存储过程来对各个字段进行处理。但是,如果您可以采用这种方法,您最终会得到一个可以处理大量数据并且非常可扩展的流程。
【讨论】:
我尝试了这个想法(在 php 中使用了 LOAD DATA LOCAL INFILE),现在文件在不到 20 秒内被导入。但所有其他处理(如我们系统中的类别名称在不同的表中)可能是更大的问题。 对于类别,您需要两个查询:一个用于插入缺失的记录,另一个用于更新您的产品以根据“supplier_id”设置“supplier_category_id”。 (也许一个更新类别?)这样的事情会做:INSERT INTO categories (supplier_id, class_a, class_b, class_c) (SELECT id, Class1, Class2, Class3 FROM import_table AS i WHERE NOT EXISTS (SELECT * FROM categories AS c WHERE it.id = c.supplier_id))
用于新记录,加上UPDATE products AS p INNER JOIN categories AS c USING(supplier_id) SET p.supplier_category_id = c.id)
将产品链接到它们的类别。
我真的应该在 SQL 课上多加注意 :) 不是我不明白,而是我不喜欢查询的可读性 :p 我会试一试。
但是,我认为这会变得复杂。供应商提供单独的 Recupel 费用等文件,这些文件应添加到产品价格中。毕竟我可能会选择 Ken Cheungs 的解决方案..
最好的解决方案是在您的组织中对您有用。如果您对 SQL 感到不舒服,那么使用非常繁重的 SQL 解决方案可能会成为维护问题。不过请记住,您也可以选择混合解决方案:在几个查询中完成基本的繁重工作,而在 PHP 中完成真正复杂的工作。【参考方案2】:
我也有类似的情况...每天比较大约 20M 条记录以更新一些记录的更改并添加/删除增量。数据源也是 CSV。我使用perl,而我认为php也可以。
-
每条记录必须有一个链接键,产品的 SKU?或类似的东西。可能已经是您的数据库表中的主键/唯一键。
您知道要比较和更新的字段的 lst。
第1步:从数据库中读取所有记录,使用链接键作为命名索引存储在一个数组中。
1.1:value是所有需要比较的字段的concat,或者concat结果的md5()来节省内存。
第 2 步:遍历 CSV 文件,提取每行的链接键和新值。
2.1:如果链接键不在数组中,则向 DB 插入操作。
2.2:isset() 返回 true,因此比较值(或值 concat 的 md5()),如果不同,则对 DB 执行 UPDATE 操作。
2.3:从数组中删除该条目。
第 3 步:读取 CSV 结束时,数组中剩余的条目是要删除的记录。
在我的例子中,它使用少于 2GB 的 RAM 来运行该进程并运行大约 3 分钟,这应该是可行且可以接受的。
【讨论】:
PS:我担心的另一个问题是,我不能允许数据库表/记录被长时间处理的查询锁定。 20M 记录加入 20M 记录...... 过去我们确实在锁定表/记录方面遇到了一些麻烦,我们改用innoDB来解决它。这个解决方案有点适合我们的需求。对于 concat 值,您的意思是:name;ean;art;...;... php 比较大小的字符串不是很“重”吗? 如果您使用 innoDB,请注意所有更改只会增加 ibdata1 文件的大小,包括 DELETE,并且您无法回收磁盘空间。如果在 my.cnf 中有“innodb_file_per_table”,则每个表可以有单独的 ibd 文件,如果删除它可以回收,但性能略有下降。当第一个字符不匹配时,字符串比较停止。如果您可以预测哪个字段是最有可能更改的字段,请将其放在左侧以加快检查速度。正如我早期写的那样,我使用 perl,但只有 PHP 在命令行模式下或通过 cron 直接执行时应该会更好地工作。 我们仅将 innoDB 用于 cron 和 webshop 前端使用的表。 CRM 中的所有其他表仍然是默认设置的 MyISAM。 如果您使用数据库方法,即将所有记录加载到 MySQL (InnoDB),请检查“所有其他表”(MyISAM),然后删除(或删除)该表;请查看my.cnf 和MYSQL 的datadir 下ibdata1 的文件大小。 InnoDB 支持行级锁定,解决了锁定问题。但磁盘空间回收最终将是另一个问题。以上是关于从 .CSV 文件比较/插入/更新 MySQL 数据库中的产品的最佳方法是啥的主要内容,如果未能解决你的问题,请参考以下文章