.net core多线程新增数据,使用线程安全类型ReadOnlySpan<T>
Posted 棉晗榜
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了.net core多线程新增数据,使用线程安全类型ReadOnlySpan<T>相关的知识,希望对你有一定的参考价值。
using ConsoleApp_ContentCheck.IData;
using ConsoleApp_ContentCheck.IDataImpl;
using ConsoleApp_ContentCheck.Models;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ConsoleCleanData
/// <summary>
/// 清理栏目数据。
/// </summary>
/// 创建时间:2022-8-29 14:41:01
public class CleanChannel
/// <summary>
/// 检查已经处理的数据
/// </summary>
static ConcurrentDictionary<string, bool> check_worked = new ConcurrentDictionary<string, bool>();
/// <summary>
/// 栏目数据,筛选出最新
/// </summary>
/// 创建时间:2022-8-30 16:20:05。
public void FilterChannel()
Task.Run(() =>
int index = 1;
int skip = 0;
while (true)
try
ISpider_website_navDAL spireNavDAL = new ISpider_website_navDAL_Impl();
//每个线程处理8条,开四个线程
var query = spireNavDAL.GetQuery().OrderBy(x => x.Id).Where(g => g.Filter_do == 0);
var list = query.Skip(skip).Take(32).ToList();
if (list?.Count == 0)
goto DO_SLEEP;
Console.WriteLine($"获取到待处理数据list?.Count条");
var list1 = list.Skip(0).Take(8).ToList();
var list2 = list.Skip(8).Take(8).ToList();
var list3 = list.Skip(16).Take(8).ToList();
var list4 = list.Skip(24).Take(8).ToList();
var task1 = Task.Run(() =>
if (list1?.Count > 0)
foreach (var item in list1)
if (check_worked.ContainsKey(item.Id))
Console.WriteLine("栏目id=" + item.Id + ",被其他线程处理中,跳过");
continue;
check_worked.TryAdd(item.Id, true);
//处理栏目,检查
Spider_website_nav[] arr = item ;
ReadOnlySpan<Spider_website_nav> itemSpan = new ReadOnlySpan<Spider_website_nav>(arr);
ArticleSaveTophp(itemSpan);
bool k;
check_worked.TryRemove(item.Id, out k);
);
var task2 = Task.Run(() =>
if (list2?.Count > 0)
foreach (var item in list2)
if (check_worked.ContainsKey(item.Id))
Console.WriteLine("栏目id=" + item.Id + ",被其他线程处理中,跳过");
continue;
check_worked.TryAdd(item.Id, true);
Spider_website_nav[] arr = item ;
ReadOnlySpan<Spider_website_nav> itemSpan = new ReadOnlySpan<Spider_website_nav>(arr);
ArticleSaveToPHP(itemSpan);
bool k;
check_worked.TryRemove(item.Id, out k);
);
var task3 = Task.Run(() =>
if (list3?.Count > 0)
foreach (var item in list3)
if (check_worked.ContainsKey(item.Id))
Console.WriteLine("栏目id=" + item.Id + ",被其他线程处理中,跳过");
continue;
check_worked.TryAdd(item.Id, true);
//处理文章,检查
Spider_website_nav[] arr = item ;
ReadOnlySpan<Spider_website_nav> itemSpan = new ReadOnlySpan<Spider_website_nav>(arr);
ArticleSaveToPHP(itemSpan);
bool k;
check_worked.TryRemove(item.Id, out k);
);
var task4 = Task.Run(() =>
if (list4?.Count > 0)
foreach (var item in list4)
if (check_worked.ContainsKey(item.Id))
Console.WriteLine("栏目id=" + item.Id + ",被其他线程处理中,跳过");
continue;
check_worked.TryAdd(item.Id, true);
Spider_website_nav[] arr = item ;
ReadOnlySpan<Spider_website_nav> itemSpan = new ReadOnlySpan<Spider_website_nav>(arr);
ArticleSaveToPHP(itemSpan);
bool k;
check_worked.TryRemove(item.Id, out k);
);
//Task.WaitAll(task1, task4);
Task.WaitAny(task1, task2, task3, task4);
//Task.WaitAll(task1, task2, task3, task4);
if (index % 2 == 0)
skip = 32;
else
skip = 0;
index++;
continue;
DO_SLEEP:
Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "没有要筛选的栏目数据,休眠120秒...");
Thread.Sleep(120000);
catch (Exception ex)
var ex1 = ex.InnerException ?? ex;
string msg = "栏目数据清理检查异常:" + ex1.Message;
Console.WriteLine(msg);
LogHelpter.AddLog(msg);
);
/// <summary>
/// 处理栏目保存到PHP
/// </summary>
/// <param name="nav">栏目信息</param>
/// 创建时间:2022-8-30 14:15:58
public void ArticleSaveToPHP(ReadOnlySpan<Spider_website_nav> model)
if (model == null)
return;
Spider_website_nav nav = model[0];
try
ISpider_website_navDAL spireNavDAL = new ISpider_website_navDAL_Impl();
ICms_clean_navDAL navDAL = new Cms_clean_navDAL_Impl();
//根据文章url生成hash主键
string hashKEY = MD5Helpter.MD5Encryptbit32Back16HEX(nav.Id);
Console.WriteLine($"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") .正在处理栏目:nav.Title,hashKEY=hashKEY");
//检查栏目是否存在
var task = navDAL.GetSingleOrDefaultAsync(x => x.Hash_varchar != null && x.Hash_varchar.Equals(hashKEY));
Cms_clean_nav cmsNav = task.Result;
//当前时间戳
int utxNow = (int)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds;
if (cmsNav == null)
//新增栏目
cmsNav = new Cms_clean_nav()
Create_time = utxNow,
Hash_varchar = hashKEY,
Level = nav.Level,
Parent_id = nav.ParentId, //可为空
Spider_id = nav.Id,
Title = nav.Title,
Url = nav.Url,
Web_id = Convert.ToInt32(nav.WebsiteId)
;
int count = navDAL.Add(cmsNav);
if (count > 0)
string msg = $"栏目【cmsNav.Title】新增成功,Cms_clean_nav主键:cmsNav.Id";
Console.WriteLine(msg);
LogHelpter.AddLog(msg);
else
//检查是否有内容变动
if (nav.Title.Equals(cmsNav.Title))
//没有变动
Console.WriteLine($"栏目=nav.Title,hashKEY=hashKEY,内容没有变动");
goto MARK_STAT;
//修改文章内容,标题
cmsNav.Title = nav.Title;
var taskUpdate = navDAL.UpdateAsync(cmsNav, new string[] "Title" );
int count = taskUpdate.Result;
if (count > 0)
string msg = $"栏目【nav.Title】修改成功,Cms_clean_nav主键:cmsNav.Id";
Console.WriteLine(msg);
LogHelpter.AddLog(msg);
MARK_STAT:
//修改文章状态为,已经处理了
nav.Filter_do = 1;
var taskChange = spireNavDAL.UpdateAsync(nav, new string[] "Filter_do" );
taskChange.Wait();
if (taskChange.Result > 0)
string sp = $"spider_website_nav栏目主键nav.Id,修改Filter_do标记为已处理,成功";
Console.WriteLine(sp);
LogHelpter.AddLog(sp);
else
string sp = $"spider_website_nav栏目主键nav.Id,修改Filter_do为1失败";
Console.WriteLine(sp);
LogHelpter.AddLog(sp);
catch (Exception ex)
var ex1 = ex.InnerException ?? ex;
string msg = "栏目保存到PHP失败:" + ex1.Message + ",栏目Spider_id=" + nav.Id;
Console.WriteLine(msg);
LogHelpter.AddLog(msg);
开始启动:
// See https://aka.ms/new-console-template for more information
//清理文章数据,保证每个文章唯一。
//根据url来生成hash主键值,
//只清理文章、栏目数据,保证每条记录内容唯一
//根据文章url生成hash值,作为对比文章的唯一识别
//PHP平台那边只允许有唯一的url,hash值
using ConsoleApp_ContentCheck.Models;
using ConsoleCleanData;
Console.WriteLine("开始清理栏目数据,保证 唯一...");
//栏目数据处理
new CleanChannel().FilterChannel();
//无限期等待。支持linux后台静默运行
Task.Delay(-1).Wait(-1);
以上是关于.net core多线程新增数据,使用线程安全类型ReadOnlySpan<T>的主要内容,如果未能解决你的问题,请参考以下文章