Pipelines

Posted 摧残一生 涅槃重生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pipelines相关的知识,希望对你有一定的参考价值。

用于执行高性能的I/O,且代码不复杂

依赖库:System.IO.Pipelines

创建

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

基本用法

// 对于socket进行读写操作
async Task ProcessLinesAsync(Socket socket)

	// 声明一个Pipe
    var pipe = new Pipe();
    // 创建一个写入任务,大意是将socket中接受的数据写入到pipe中
    Task writing = FillPipeAsync(socket, pipe.Writer);
    // 创建一个读取任务,大意是将pipe获得数据读取出来
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);

// 写入任务代码
async Task FillPipeAsync(Socket socket, PipeWriter writer)

	// 定义每个buffer为512
    const int minimumBufferSize = 512;
	// 死循环
    while (true)
    
        // 获取512个byte的内存
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        
            // 尝试从socket中读取byte数组,写入到memory中
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            //如果没有读取到数据,则不继续执行
            if (bytesRead == 0)
            
                break;
            
            // 告知 PipeWriter 有多少数据已写入缓冲区
            writer.Advance(bytesRead);
        
        catch (Exception ex)
        
            LogError(ex);
            break;
        

        // 告知PipReader,数据可使用,如果不调用则reader一直认为不可使用
        FlushResult result = await writer.FlushAsync();
		// 写入的数据已经全部操作完成
        if (result.IsCompleted)
        
            break;
        
    

     // 写入完成,告诉PipeReader没有多余的数据进行操作了
    await writer.CompleteAsync();

// 读取写入的数据
async Task ReadPipeAsync(PipeReader reader)

    while (true)
    
    	// 读取write的数据
        ReadResult result = await reader.ReadAsync();
        // 获取byte数组,该数据只读
        ReadOnlySequence<byte> buffer = result.Buffer;

		// 将result.Buffer中的数据全部提取出来,感觉类似于数据库中的游标卡尺,一步一步的读取数据
        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        
            // Process the line.
            ProcessLine(line);
        

        // 告知 PipeReader 已消耗和检查了多少数据
        reader.AdvanceTo(buffer.Start, buffer.End);

        // 停止读取,因为在buffer中没有多余的数据了
        if (result.IsCompleted)
        
            break;
        
    

    // 结束,告知管道可以释放内存
    await reader.CompleteAsync();

// 将buffer复制到line中,里面可以自己写逻辑,例如我们的数据是以字符总数开头,以某个字符结尾等等
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)

    // 查找结尾的字符串
    SequencePosition? position = buffer.PositionOf((byte)\'\\n\');
	// 如果结尾字符串未找到,则返回false
    if (position == null)
    
        line = default;
        return false;
    

    // 保存要截取的数据
    line = buffer.Slice(0, position.Value);
    // 将剩余的数据保存起来
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;

实际中的差异

实际使用时,网络数据复制和读取后分析占用内存大小是存在差异的,因此需要根据特定环境进行配置。

假设分析数据慢于网络复制数据,则需要让网络数据多存一些然后再进行分析:

// 设置
// PauseWriterThreshold:确定在调用 PipeWriter.FlushAsync 暂停之前应缓冲多少数据。
// ResumeWriterThreshold:确定在恢复对 PipeWriter.FlushAsync 的调用之前,读取器必须观察多少数据。
// 缓冲了10个byte后调用FlushAsync,分析了5个byte后调用PipeWriter.FlushAsync
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeWriter.FlushAsync:

  • Pipe 中的数据量超过 PauseWriterThreshold 时,返回不完整的 ValueTask<FlushResult>
  • 低于 ResumeWriterThreshold 时,返回完整的 ValueTask<FlushResult>

使用两个值可防止快速循环,如果只使用一个值,则可能发生这种循环。

PipeScheduler

提供对异步回调运行位置的控制,默认情况下:

  • 使用当前的 SynchronizationContext
  • 如果没有 SynchronizationContext,它将使用线程池运行回调。
public static void Main(string[] args)

	// 声明一个单线程的任务
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // 将声明的两个任务进行赋值
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);


public class SingleThreadPipeScheduler : PipeScheduler

    // 声明一个只读块,用于存放调用方法和读取的数据
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
     // 创建单线程
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    
        _thread = new Thread(DoWork);
        _thread.Start();
    

    private void DoWork()
    
    	// 从只读块中获取数据
        foreach (var item in _queue.GetConsumingEnumerable())
        
        	// 执行方法
            item.Action(item.State);
        
    
	// 添加任务数据
    public override void Schedule(Action<object?> action, object? state)
    
    	// 可以根据state进行判断是否进行数据解析或者其他操作
        if (state is not null)
        
            _queue.Add((action, state));
        
    

流(Stream)

StreamPipeReaderOptions 允许使用以下参数控制 PipeReader 实例的创建:

StreamPipeWriterOptions 允许使用以下参数控制 PipeWriter 实例的创建:

重要

使用 Create 方法创建 PipeReaderPipeWriter 实例时,需要考虑 Stream 对象的生存期。 如果在读取器或编写器使用该方法完成操作后,你需要访问流,则需要在创建选项上将 LeaveOpen 标志设置为 true。 否则,流将关闭。

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program

    static async Task Main()
    
        using var stream = File.OpenRead("Demo1.txt");
		// 创建一个读取的留
        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            // 写入到命令行中
            Console.OpenStandardOutput(), 
            // 重要中提到的还需要访问流,需要设置为常开,否则读取完比stream就自动关闭
            new StreamPipeWriterOptions(leaveOpen: true));
		// 读取和写入的核心方法
        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        
            //点击键盘的C
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != \'C\')
            
               //当只有输入了C,才会程序会关闭
            
            userCanceled = true;
            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        );
		// 执行任务
        await Task.WhenAny(cancelProcessingTask, processMessagesTask);
        Console.WriteLine(
            $"\\n\\nProcessing (userCanceled ? "cancelled" : "completed").\\n");
    
	// 读取和写入的核心方法
    static async Task ProcessMessagesAsync(PipeReader reader, PipeWriter writer)
    
        try
        
            while (true)
            
                // 读取文本中的数据
                ReadResult readResult = await reader.ReadAsync();
                // 获得byte数组
                ReadOnlySequence<byte> buffer = readResult.Buffer;
                try
                
                    // 如果没已取消,则跳出
                    if (readResult.IsCanceled)
                    
                        break;
                    
					// 尝试读取通道中的数据
                    if (TryParseLines(ref buffer, out string message))
                    
                        // 写入到通道中
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);
						// 如果写入完成或被取消则跳出
                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        
                            break;
                        
                    
					// 如果读取完成,则跳出
                    if (readResult.IsCompleted)
                    
                        // 读取完成,但是数据依然存在,则提示不正确(通过实际业务截取每段的数据,当截取后依然还有存留,表示截取的不正确)
                        if (!buffer.IsEmpty)
                        
                            throw new InvalidDataException("Incomplete message.");
                        
                        break;
                    
                
                finally
                
                    // 告知读取通道已完成那些数据的读取
                    reader.AdvanceTo(buffer.Start, buffer.End);
                
            
        
        catch (Exception ex)
        
            Console.Error.WriteLine(ex);
        
        finally
        
            //完成读写
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        
    

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    
        SequencePosition? position;
        StringBuilder outputMessage = new();
        while(true)
        
            // 判断回车,如果存在返回位置点
            position = buffer.PositionOf((byte)\'\\n\');
			// 如果位置点为空,就跳出
            if (!position.HasValue)
                break;
			// 获取这段数据
            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();
			// 保存剩余的数据供下次使用
            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        ;
		// 获取截出的数据
        message = outputMessage.ToString();
        return message.Length != 0;
    

    //将数据写入通道,本例子就是写入到控制台
    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));

我可以通过 CLI 验证 azure-pipelines.yml 文件吗?

【中文标题】我可以通过 CLI 验证 azure-pipelines.yml 文件吗?【英文标题】:Can I validate an azure-pipelines.yml file via CLI? 【发布时间】:2019-05-13 22:01:09 【问题描述】:

我正在尝试为 monorepo 构建一个 azure-pipelines.yml,并且我正在努力弄清楚如何在我前进的过程中调试该文件。

是否有诸如az deployment validate ./azure-pipelines.yml 之类的命令可用?

如果是这样,你建议我怎么做?我不断进行更改,将它们推送到我的存储库,然后让 Azure DevOps 运行它,但失败了。

【问题讨论】:

【参考方案1】:

选项 1

现在有一个 Visual Studio Code 扩展,它提供了在将管道 YAML 代码提交到您的存储库之前对其进行验证的能力。

扩展名为Azure Pipelines YAML Validator。作者明确表示它使用在线服务进行验证,即您的管道将被上传以供检查。所以你必须相信作者,你的数据是被小心处理的,你应该确保没有像秘密这样的敏感数据存储在你的管道代码中(无论如何这是一个坏习惯)。

选项 2

pull request:Test-VSTeamYamlPipeline 还引入了一个新的 PowerShell cmdlet。要使用该 cmdlet 进行验证,您需要安装 PowerShell 模块 VSTeam(需要 PowerShell 5.0+ 或 PS Core):

Install-Module -Name VSTeam -Scope CurrentUser

接下来您需要指定您的 Azure DevOps 项目:

Set-VSTeamAccount -Account <Name of your Azure DevOps org> -PersonalAccessToken <Your PAT with access to your project>

(还有连接到自托管 Azure DevOps 服务器的选项 - 请参阅 docs。)您的 PAT 必须至少具有“构建:读取和执行”权限。

然后,您可以根据已设置的管道验证在本地进行的更改。这意味着您之前必须已经基于提交的 YAML 文件创建了一个管道。对于验证,您将需要 项目名称管道 ID。要获取这些信息,请在 Azure DevOps 中打开管道并从 URL 中提取信息:

https://dev.azure.com/<DEVOPS_ORG>/<PROJECT_NAME/_build?definitionId=<PIPELINE_ID>

验证请求如下所示

Test-VSTeamYamlPipeline -Project <PROJECT_NAME> -PipelineId <PIPELINE_ID> -FilePath '<PATH_TO_YAML_FILE>'

如果你有语法错误,那么你会得到一个指示错误的输出:

Test-VSTeamYamlPipeline: /your-pipeline.yml (Line: 12, Col: 13): Unexpected value ''

如果一切正常,输出有点神秘,对我来说看起来像这样:

Name url                                                                                                      state   Id
---- ---                                                                                                      -----   --
     https://dev.azure.com/<DEVOPS_ORG>/<SOME_GUID>/_apis/pipelines/<PIPELINE_ID>/runs/-1 unknown -1

【讨论】:

【参考方案2】:

不,没有办法验证它。当您尝试运行它时 - 它会向您显示错误,这是验证它的唯一真正方法。

这个 VSCode 扩展提供语法高亮和自动补全。

https://marketplace.visualstudio.com/items?itemName=ms-azure-devops.azure-pipelines

【讨论】:

我添加了这个扩展,它适用于我的*** .yml 文件。但是,我使用的是单一存储库并且有多个 .yml 文件。但是,总的来说,我很欣赏这个扩展。谢谢! 好的,这并没有真正改变什么?还是该扩展名只能使用默认文件名?我还没用过 它抱怨我的子管道文件中缺少部分。我有一个主管道调用子存储库的模板,然后这些子存储库调用 build.yml 文件。对于 subrepo 模板,它抱怨缺少东西,不知道它是从父 yml 文件中调用的。 我正在做这样的事情:github.com/jayway/monorepo-azure-pipeline(我的 repo 归我公司所有,所以我无法发布) 问题跟踪此请求:github.com/Microsoft/azure-pipelines-yaml/issues/34

以上是关于Pipelines的主要内容,如果未能解决你的问题,请参考以下文章

Pipelines

我可以通过 CLI 验证 azure-pipelines.yml 文件吗?

scrapy pipelines导出各种格式

Azure DevOps 发布错误“Microsoft.TeamFoundation.DistributedTask.Pipelines.PipelineValidationException(类型

爬取知名社区技术文章_pipelines_4

StreamSets 多线程 Pipelines