Pipelines
Posted 摧残一生 涅槃重生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pipelines相关的知识,希望对你有一定的参考价值。
用于执行高性能的I/O,且代码不复杂
依赖库:System.IO.Pipelines
创建
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
基本用法
// 对于socket进行读写操作
async Task ProcessLinesAsync(Socket socket)
// 声明一个Pipe
var pipe = new Pipe();
// 创建一个写入任务,大意是将socket中接受的数据写入到pipe中
Task writing = FillPipeAsync(socket, pipe.Writer);
// 创建一个读取任务,大意是将pipe获得数据读取出来
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
// 写入任务代码
async Task FillPipeAsync(Socket socket, PipeWriter writer)
// 定义每个buffer为512
const int minimumBufferSize = 512;
// 死循环
while (true)
// 获取512个byte的内存
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
// 尝试从socket中读取byte数组,写入到memory中
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
//如果没有读取到数据,则不继续执行
if (bytesRead == 0)
break;
// 告知 PipeWriter 有多少数据已写入缓冲区
writer.Advance(bytesRead);
catch (Exception ex)
LogError(ex);
break;
// 告知PipReader,数据可使用,如果不调用则reader一直认为不可使用
FlushResult result = await writer.FlushAsync();
// 写入的数据已经全部操作完成
if (result.IsCompleted)
break;
// 写入完成,告诉PipeReader没有多余的数据进行操作了
await writer.CompleteAsync();
// 读取写入的数据
async Task ReadPipeAsync(PipeReader reader)
while (true)
// 读取write的数据
ReadResult result = await reader.ReadAsync();
// 获取byte数组,该数据只读
ReadOnlySequence<byte> buffer = result.Buffer;
// 将result.Buffer中的数据全部提取出来,感觉类似于数据库中的游标卡尺,一步一步的读取数据
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
// Process the line.
ProcessLine(line);
// 告知 PipeReader 已消耗和检查了多少数据
reader.AdvanceTo(buffer.Start, buffer.End);
// 停止读取,因为在buffer中没有多余的数据了
if (result.IsCompleted)
break;
// 结束,告知管道可以释放内存
await reader.CompleteAsync();
// 将buffer复制到line中,里面可以自己写逻辑,例如我们的数据是以字符总数开头,以某个字符结尾等等
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
// 查找结尾的字符串
SequencePosition? position = buffer.PositionOf((byte)\'\\n\');
// 如果结尾字符串未找到,则返回false
if (position == null)
line = default;
return false;
// 保存要截取的数据
line = buffer.Slice(0, position.Value);
// 将剩余的数据保存起来
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
实际中的差异
实际使用时,网络数据复制和读取后分析占用内存大小是存在差异的,因此需要根据特定环境进行配置。
假设分析数据慢于网络复制数据,则需要让网络数据多存一些然后再进行分析:
// 设置
// PauseWriterThreshold:确定在调用 PipeWriter.FlushAsync 暂停之前应缓冲多少数据。
// ResumeWriterThreshold:确定在恢复对 PipeWriter.FlushAsync 的调用之前,读取器必须观察多少数据。
// 缓冲了10个byte后调用FlushAsync,分析了5个byte后调用PipeWriter.FlushAsync
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
- 当
Pipe
中的数据量超过PauseWriterThreshold
时,返回不完整的ValueTask<FlushResult>
。 - 低于
ResumeWriterThreshold
时,返回完整的ValueTask<FlushResult>
。
使用两个值可防止快速循环,如果只使用一个值,则可能发生这种循环。
PipeScheduler
提供对异步回调运行位置的控制,默认情况下:
- 使用当前的 SynchronizationContext。
- 如果没有
SynchronizationContext
,它将使用线程池运行回调。
public static void Main(string[] args)
// 声明一个单线程的任务
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// 将声明的两个任务进行赋值
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
public class SingleThreadPipeScheduler : PipeScheduler
// 声明一个只读块,用于存放调用方法和读取的数据
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
// 创建单线程
private readonly Thread _thread;
public SingleThreadPipeScheduler()
_thread = new Thread(DoWork);
_thread.Start();
private void DoWork()
// 从只读块中获取数据
foreach (var item in _queue.GetConsumingEnumerable())
// 执行方法
item.Action(item.State);
// 添加任务数据
public override void Schedule(Action<object?> action, object? state)
// 可以根据state进行判断是否进行数据解析或者其他操作
if (state is not null)
_queue.Add((action, state));
流(Stream)
StreamPipeReaderOptions 允许使用以下参数控制 PipeReader
实例的创建:
- StreamPipeReaderOptions.BufferSize 是从池中租用内存时使用的最小缓冲区大小(以字节为单位),默认值为
4096
。 - StreamPipeReaderOptions.LeaveOpen 标志确定在
PipeReader
完成之后基础流是否保持打开状态,默认值为false
。 - StreamPipeReaderOptions.MinimumReadSize 表示分配新缓冲区之前缓冲区中剩余字节的阈值,默认值为
1024
。 - StreamPipeReaderOptions.Pool 是分配内存时使用的
MemoryPool<byte>
,默认值为null
。
StreamPipeWriterOptions 允许使用以下参数控制 PipeWriter
实例的创建:
- StreamPipeWriterOptions.LeaveOpen 标志确定在
PipeWriter
完成之后基础流是否保持打开状态,默认值为false
。 - StreamPipeWriterOptions.MinimumBufferSize 表示从 Pool 租用内存时要使用的最小缓冲区大小,默认值为
4096
。 - StreamPipeWriterOptions.Pool 是分配内存时使用的
MemoryPool<byte>
,默认值为null
。
重要
使用
Create
方法创建PipeReader
和PipeWriter
实例时,需要考虑Stream
对象的生存期。 如果在读取器或编写器使用该方法完成操作后,你需要访问流,则需要在创建选项上将LeaveOpen
标志设置为true
。 否则,流将关闭。
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
static async Task Main()
using var stream = File.OpenRead("Demo1.txt");
// 创建一个读取的留
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
// 写入到命令行中
Console.OpenStandardOutput(),
// 重要中提到的还需要访问流,需要设置为常开,否则读取完比stream就自动关闭
new StreamPipeWriterOptions(leaveOpen: true));
// 读取和写入的核心方法
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
//点击键盘的C
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != \'C\')
//当只有输入了C,才会程序会关闭
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
);
// 执行任务
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\\n\\nProcessing (userCanceled ? "cancelled" : "completed").\\n");
// 读取和写入的核心方法
static async Task ProcessMessagesAsync(PipeReader reader, PipeWriter writer)
try
while (true)
// 读取文本中的数据
ReadResult readResult = await reader.ReadAsync();
// 获得byte数组
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
// 如果没已取消,则跳出
if (readResult.IsCanceled)
break;
// 尝试读取通道中的数据
if (TryParseLines(ref buffer, out string message))
// 写入到通道中
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
// 如果写入完成或被取消则跳出
if (flushResult.IsCanceled || flushResult.IsCompleted)
break;
// 如果读取完成,则跳出
if (readResult.IsCompleted)
// 读取完成,但是数据依然存在,则提示不正确(通过实际业务截取每段的数据,当截取后依然还有存留,表示截取的不正确)
if (!buffer.IsEmpty)
throw new InvalidDataException("Incomplete message.");
break;
finally
// 告知读取通道已完成那些数据的读取
reader.AdvanceTo(buffer.Start, buffer.End);
catch (Exception ex)
Console.Error.WriteLine(ex);
finally
//完成读写
await reader.CompleteAsync();
await writer.CompleteAsync();
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
// 判断回车,如果存在返回位置点
position = buffer.PositionOf((byte)\'\\n\');
// 如果位置点为空,就跳出
if (!position.HasValue)
break;
// 获取这段数据
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
// 保存剩余的数据供下次使用
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
;
// 获取截出的数据
message = outputMessage.ToString();
return message.Length != 0;
//将数据写入通道,本例子就是写入到控制台
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
我可以通过 CLI 验证 azure-pipelines.yml 文件吗?
【中文标题】我可以通过 CLI 验证 azure-pipelines.yml 文件吗?【英文标题】:Can I validate an azure-pipelines.yml file via CLI? 【发布时间】:2019-05-13 22:01:09 【问题描述】:我正在尝试为 monorepo 构建一个 azure-pipelines.yml,并且我正在努力弄清楚如何在我前进的过程中调试该文件。
是否有诸如az deployment validate ./azure-pipelines.yml
之类的命令可用?
如果是这样,你建议我怎么做?我不断进行更改,将它们推送到我的存储库,然后让 Azure DevOps 运行它,但失败了。
【问题讨论】:
【参考方案1】:选项 1
现在有一个 Visual Studio Code 扩展,它提供了在将管道 YAML 代码提交到您的存储库之前对其进行验证的能力。
扩展名为Azure Pipelines YAML Validator。作者明确表示它使用在线服务进行验证,即您的管道将被上传以供检查。所以你必须相信作者,你的数据是被小心处理的,你应该确保没有像秘密这样的敏感数据存储在你的管道代码中(无论如何这是一个坏习惯)。
选项 2
pull request:Test-VSTeamYamlPipeline 还引入了一个新的 PowerShell cmdlet。要使用该 cmdlet 进行验证,您需要安装 PowerShell 模块 VSTeam
(需要 PowerShell 5.0+ 或 PS Core):
Install-Module -Name VSTeam -Scope CurrentUser
接下来您需要指定您的 Azure DevOps 项目:
Set-VSTeamAccount -Account <Name of your Azure DevOps org> -PersonalAccessToken <Your PAT with access to your project>
(还有连接到自托管 Azure DevOps 服务器的选项 - 请参阅 docs。)您的 PAT 必须至少具有“构建:读取和执行”权限。
然后,您可以根据已设置的管道验证在本地进行的更改。这意味着您之前必须已经基于提交的 YAML 文件创建了一个管道。对于验证,您将需要 项目名称 和 管道 ID。要获取这些信息,请在 Azure DevOps 中打开管道并从 URL 中提取信息:
https://dev.azure.com/<DEVOPS_ORG>/<PROJECT_NAME/_build?definitionId=<PIPELINE_ID>
验证请求如下所示
Test-VSTeamYamlPipeline -Project <PROJECT_NAME> -PipelineId <PIPELINE_ID> -FilePath '<PATH_TO_YAML_FILE>'
如果你有语法错误,那么你会得到一个指示错误的输出:
Test-VSTeamYamlPipeline: /your-pipeline.yml (Line: 12, Col: 13): Unexpected value ''
如果一切正常,输出有点神秘,对我来说看起来像这样:
Name url state Id
---- --- ----- --
https://dev.azure.com/<DEVOPS_ORG>/<SOME_GUID>/_apis/pipelines/<PIPELINE_ID>/runs/-1 unknown -1
【讨论】:
【参考方案2】:不,没有办法验证它。当您尝试运行它时 - 它会向您显示错误,这是验证它的唯一真正方法。
这个 VSCode 扩展提供语法高亮和自动补全。
https://marketplace.visualstudio.com/items?itemName=ms-azure-devops.azure-pipelines
【讨论】:
我添加了这个扩展,它适用于我的*** .yml 文件。但是,我使用的是单一存储库并且有多个 .yml 文件。但是,总的来说,我很欣赏这个扩展。谢谢! 好的,这并没有真正改变什么?还是该扩展名只能使用默认文件名?我还没用过 它抱怨我的子管道文件中缺少部分。我有一个主管道调用子存储库的模板,然后这些子存储库调用 build.yml 文件。对于 subrepo 模板,它抱怨缺少东西,不知道它是从父 yml 文件中调用的。 我正在做这样的事情:github.com/jayway/monorepo-azure-pipeline(我的 repo 归我公司所有,所以我无法发布) 问题跟踪此请求:github.com/Microsoft/azure-pipelines-yaml/issues/34以上是关于Pipelines的主要内容,如果未能解决你的问题,请参考以下文章
我可以通过 CLI 验证 azure-pipelines.yml 文件吗?
Azure DevOps 发布错误“Microsoft.TeamFoundation.DistributedTask.Pipelines.PipelineValidationException(类型