.net core多线程新增数据,使用线程安全类型ReadOnlySpan<T>

Posted 棉晗榜

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了.net core多线程新增数据,使用线程安全类型ReadOnlySpan<T>相关的知识,希望对你有一定的参考价值。

using ConsoleApp_ContentCheck.IData;
using ConsoleApp_ContentCheck.IDataImpl;
using ConsoleApp_ContentCheck.Models;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleCleanData

    /// <summary>
    /// 清理栏目数据。
    /// </summary>
    /// 创建时间:2022-8-29 14:41:01
    public class CleanChannel
    

        /// <summary>
        /// 检查已经处理的数据
        /// </summary> 
        static ConcurrentDictionary<string, bool> check_worked = new ConcurrentDictionary<string, bool>();


        /// <summary>
        /// 栏目数据,筛选出最新
        /// </summary>
        /// 创建时间:2022-8-30 16:20:05。 
        public void FilterChannel()
        
            Task.Run(() =>
            
                int index = 1;
                int skip = 0;
                while (true)
                
                    try
                    
                        ISpider_website_navDAL spireNavDAL = new ISpider_website_navDAL_Impl();

                        //每个线程处理8条,开四个线程
                        var query = spireNavDAL.GetQuery().OrderBy(x => x.Id).Where(g => g.Filter_do == 0);
                        var list = query.Skip(skip).Take(32).ToList();
                        if (list?.Count == 0)
                        
                            goto DO_SLEEP;
                        
                        Console.WriteLine($"获取到待处理数据list?.Count条");
                        var list1 = list.Skip(0).Take(8).ToList();
                        var list2 = list.Skip(8).Take(8).ToList();
                        var list3 = list.Skip(16).Take(8).ToList();
                        var list4 = list.Skip(24).Take(8).ToList();

                        var task1 = Task.Run(() =>
                        
                            if (list1?.Count > 0)
                            
                                foreach (var item in list1)
                                
                                    if (check_worked.ContainsKey(item.Id))
                                    
                                        Console.WriteLine("栏目id=" + item.Id + ",被其他线程处理中,跳过");
                                        continue;
                                    
                                    check_worked.TryAdd(item.Id, true);

                                    //处理栏目,检查
                                    Spider_website_nav[] arr =  item ;
                                    ReadOnlySpan<Spider_website_nav> itemSpan = new ReadOnlySpan<Spider_website_nav>(arr);

                                    ArticleSaveTophp(itemSpan);
                                    bool k;
                                    check_worked.TryRemove(item.Id, out k);
                                
                            
                        );
                        var task2 = Task.Run(() =>
                        
                            if (list2?.Count > 0)
                            
                                foreach (var item in list2)
                                
                                    if (check_worked.ContainsKey(item.Id))
                                    
                                        Console.WriteLine("栏目id=" + item.Id + ",被其他线程处理中,跳过");
                                        continue;
                                    
                                    check_worked.TryAdd(item.Id, true);

                                    Spider_website_nav[] arr =  item ;
                                    ReadOnlySpan<Spider_website_nav> itemSpan = new ReadOnlySpan<Spider_website_nav>(arr);
                                    ArticleSaveToPHP(itemSpan);
                                    bool k;
                                    check_worked.TryRemove(item.Id, out k);
                                
                            
                        );
                        var task3 = Task.Run(() =>
                        
                            if (list3?.Count > 0)
                            
                                foreach (var item in list3)
                                
                                    if (check_worked.ContainsKey(item.Id))
                                    
                                        Console.WriteLine("栏目id=" + item.Id + ",被其他线程处理中,跳过");
                                        continue;
                                    
                                    check_worked.TryAdd(item.Id, true);

                                    //处理文章,检查
                                    Spider_website_nav[] arr =  item ;
                                    ReadOnlySpan<Spider_website_nav> itemSpan = new ReadOnlySpan<Spider_website_nav>(arr);
                                    ArticleSaveToPHP(itemSpan);
                                    bool k;
                                    check_worked.TryRemove(item.Id, out k);
                                
                            
                        );
                        var task4 = Task.Run(() =>
                        
                            if (list4?.Count > 0)
                            
                                foreach (var item in list4)
                                
                                    if (check_worked.ContainsKey(item.Id))
                                    
                                        Console.WriteLine("栏目id=" + item.Id + ",被其他线程处理中,跳过");
                                        continue;
                                    
                                    check_worked.TryAdd(item.Id, true);
                                    Spider_website_nav[] arr =  item ;
                                    ReadOnlySpan<Spider_website_nav> itemSpan = new ReadOnlySpan<Spider_website_nav>(arr);

                                    ArticleSaveToPHP(itemSpan);
                                    bool k;
                                    check_worked.TryRemove(item.Id, out k);
                                
                            
                        );

                        //Task.WaitAll(task1, task4);
                        Task.WaitAny(task1, task2, task3, task4);
                        //Task.WaitAll(task1, task2, task3, task4);

                        if (index % 2 == 0)
                        
                            skip = 32;
                        
                        else
                        
                            skip = 0;
                        
                        index++;
                        continue;

                    DO_SLEEP:
                        Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "没有要筛选的栏目数据,休眠120秒...");
                        Thread.Sleep(120000);
                    
                    catch (Exception ex)
                    
                        var ex1 = ex.InnerException ?? ex;
                        string msg = "栏目数据清理检查异常:" + ex1.Message;
                        Console.WriteLine(msg);
                        LogHelpter.AddLog(msg);
                    
                
            );

        

        /// <summary>
        /// 处理栏目保存到PHP
        /// </summary>
        /// <param name="nav">栏目信息</param>
        /// 创建时间:2022-8-30 14:15:58
        public void ArticleSaveToPHP(ReadOnlySpan<Spider_website_nav> model)
        
            if (model == null)
            
                return;
             
            Spider_website_nav nav = model[0];
            try
            

                ISpider_website_navDAL spireNavDAL = new ISpider_website_navDAL_Impl();
                ICms_clean_navDAL navDAL = new Cms_clean_navDAL_Impl();

                //根据文章url生成hash主键
                string hashKEY = MD5Helpter.MD5Encryptbit32Back16HEX(nav.Id);
                Console.WriteLine($"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") .正在处理栏目:nav.Title,hashKEY=hashKEY");

                //检查栏目是否存在
                var task = navDAL.GetSingleOrDefaultAsync(x => x.Hash_varchar != null && x.Hash_varchar.Equals(hashKEY));
                Cms_clean_nav cmsNav = task.Result;

                //当前时间戳
                int utxNow = (int)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds;

                if (cmsNav == null)
                
                    //新增栏目
                    cmsNav = new Cms_clean_nav()
                    
                        Create_time = utxNow,
                        Hash_varchar = hashKEY,
                        Level = nav.Level,
                        Parent_id = nav.ParentId, //可为空
                        Spider_id = nav.Id,
                        Title = nav.Title,
                        Url = nav.Url,
                        Web_id = Convert.ToInt32(nav.WebsiteId)
                    ;
                    int count = navDAL.Add(cmsNav);
                    if (count > 0)
                    
                        string msg = $"栏目【cmsNav.Title】新增成功,Cms_clean_nav主键:cmsNav.Id";
                        Console.WriteLine(msg);
                        LogHelpter.AddLog(msg);
                    
                
                else
                
                    //检查是否有内容变动         
                    if (nav.Title.Equals(cmsNav.Title))
                    
                        //没有变动
                        Console.WriteLine($"栏目=nav.Title,hashKEY=hashKEY,内容没有变动");
                        goto MARK_STAT;
                    

                    //修改文章内容,标题
                    cmsNav.Title = nav.Title;
                    var taskUpdate = navDAL.UpdateAsync(cmsNav, new string[]  "Title" );
                    int count = taskUpdate.Result;
                    if (count > 0)
                    
                        string msg = $"栏目【nav.Title】修改成功,Cms_clean_nav主键:cmsNav.Id";
                        Console.WriteLine(msg);
                        LogHelpter.AddLog(msg);
                    
                

            MARK_STAT:

                //修改文章状态为,已经处理了
                nav.Filter_do = 1;
                var taskChange = spireNavDAL.UpdateAsync(nav, new string[]  "Filter_do" );
                taskChange.Wait();
                if (taskChange.Result > 0)
                
                    string sp = $"spider_website_nav栏目主键nav.Id,修改Filter_do标记为已处理,成功";
                    Console.WriteLine(sp);
                    LogHelpter.AddLog(sp);
                
                else
                
                    string sp = $"spider_website_nav栏目主键nav.Id,修改Filter_do为1失败";
                    Console.WriteLine(sp);
                    LogHelpter.AddLog(sp);
                
            
            catch (Exception ex)
            
                var ex1 = ex.InnerException ?? ex;
                string msg = "栏目保存到PHP失败:" + ex1.Message + ",栏目Spider_id=" + nav.Id;
                Console.WriteLine(msg);
                LogHelpter.AddLog(msg);
                     
        

    


开始启动:

// See https://aka.ms/new-console-template for more information
//清理文章数据,保证每个文章唯一。
//根据url来生成hash主键值,
//只清理文章、栏目数据,保证每条记录内容唯一
//根据文章url生成hash值,作为对比文章的唯一识别
//PHP平台那边只允许有唯一的url,hash值

using ConsoleApp_ContentCheck.Models;
using ConsoleCleanData;

Console.WriteLine("开始清理栏目数据,保证 唯一...");
 
//栏目数据处理
new CleanChannel().FilterChannel();

//无限期等待。支持linux后台静默运行
Task.Delay(-1).Wait(-1);

以上是关于.net core多线程新增数据,使用线程安全类型ReadOnlySpan<T>的主要内容,如果未能解决你的问题,请参考以下文章

.NET面试题系列数据结构(ArrayListQueueStack)及线程安全问题

.NET Core 多线程的用法,以及用例

Java并发:五种线程安全类型线程安全的实现枚举类型

BlockingQueue

Java中阻塞队列的使用

.NET Core多线程通关 常见性能问题