Tekton DAG代码
Posted A.T.R_p.orc
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Tekton DAG代码相关的知识,希望对你有一定的参考价值。
/* Copyright 2019 The Tekton Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package dag import ( "errors" "fmt" "strings" "github.com/tektoncd/pipeline/pkg/list" "k8s.io/apimachinery/pkg/util/sets" ) // Task is an interface for all types that could be in a DAG type Task interface HashKey() string Deps() []string // Tasks is an interface for lists of types that could be in a DAG type Tasks interface Items() []Task // Node represents a Task in a pipeline. type Node struct // Task represent the PipelineTask in Pipeline Task Task // Prev represent all the Previous task Nodes for the current Task Prev []*Node // Next represent all the Next task Nodes for the current Task Next []*Node // Graph represents the Pipeline Graph type Graph struct // Nodes represent map of PipelineTask name to Node in Pipeline Graph Nodes map[string]*Node // Returns an empty Pipeline Graph func newGraph() *Graph return &GraphNodes: map[string]*Node func (g *Graph) addPipelineTask(t Task) (*Node, error) if _, ok := g.Nodes[t.HashKey()]; ok return nil, errors.New("duplicate pipeline task") newNode := &Node Task: t, g.Nodes[t.HashKey()] = newNode return newNode, nil // Build returns a valid pipeline Graph. Returns error if the pipeline is invalid func Build(tasks Tasks, deps map[string][]string) (*Graph, error) d := newGraph() // Add all Tasks mentioned in the `PipelineSpec` for _, pt := range tasks.Items() if _, err := d.addPipelineTask(pt); err != nil return nil, fmt.Errorf("task %s is already present in Graph, can\'t add it again: %w", pt.HashKey(), err) // Process all from and runAfter constraints to add task dependency for pt, taskDeps := range deps for _, previousTask := range taskDeps if err := addLink(pt, previousTask, d.Nodes); err != nil return nil, fmt.Errorf("couldn\'t add link between %s and %s: %w", pt, previousTask, err) return d, nil // GetSchedulable returns a set of PipelineTask names that can be scheduled, // given a list of successfully finished doneTasks. It returns tasks which have // all dependencies marked as done, and thus can be scheduled. If the specified // doneTasks are invalid (i.e. if it is indicated that a Task is done, but the // previous Tasks are not done), an error is returned. func GetSchedulable(g *Graph, doneTasks ...string) (sets.String, error) roots := getRoots(g) tm := sets.NewString(doneTasks...) d := sets.NewString() visited := sets.NewString() for _, root := range roots schedulable := findSchedulable(root, visited, tm) for _, task := range schedulable d.Insert(task.HashKey()) var visitedNames []string for v := range visited visitedNames = append(visitedNames, v) notVisited := list.DiffLeft(doneTasks, visitedNames) if len(notVisited) > 0 return nil, fmt.Errorf("invalid list of done tasks; some tasks were indicated completed without ancestors being done: %v", notVisited) return d, nil func linkPipelineTasks(prev *Node, next *Node) error // Check for self cycle if prev.Task.HashKey() == next.Task.HashKey() return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.HashKey()) // Check if we are adding cycles. path := []stringnext.Task.HashKey(), prev.Task.HashKey() if err := lookForNode(prev.Prev, path, next.Task.HashKey()); err != nil return fmt.Errorf("cycle detected: %w", err) next.Prev = append(next.Prev, prev) prev.Next = append(prev.Next, next) return nil func lookForNode(nodes []*Node, path []string, next string) error for _, n := range nodes path = append(path, n.Task.HashKey()) if n.Task.HashKey() == next return errors.New(getVisitedPath(path)) if err := lookForNode(n.Prev, path, next); err != nil return err return nil func getVisitedPath(path []string) string // Reverse the path since we traversed the Graph using prev pointers. for i := len(path)/2 - 1; i >= 0; i-- opp := len(path) - 1 - i path[i], path[opp] = path[opp], path[i] return strings.Join(path, " -> ") func addLink(pt string, previousTask string, nodes map[string]*Node) error prev, ok := nodes[previousTask] if !ok return fmt.Errorf("task %s depends on %s but %s wasn\'t present in Pipeline", pt, previousTask, previousTask) next := nodes[pt] if err := linkPipelineTasks(prev, next); err != nil return fmt.Errorf("couldn\'t create link from %s to %s: %w", prev.Task.HashKey(), next.Task.HashKey(), err) return nil func getRoots(g *Graph) []*Node n := []*Node for _, node := range g.Nodes if len(node.Prev) == 0 n = append(n, node) return n func findSchedulable(n *Node, visited sets.String, doneTasks sets.String) []Task if visited.Has(n.Task.HashKey()) return []Task visited.Insert(n.Task.HashKey()) if doneTasks.Has(n.Task.HashKey()) schedulable := []Task // This one is done! Take note of it and look at the next candidate for _, next := range n.Next if _, ok := visited[next.Task.HashKey()]; !ok schedulable = append(schedulable, findSchedulable(next, visited, doneTasks)...) return schedulable // This one isn\'t done! Return it if it\'s schedulable if isSchedulable(doneTasks, n.Prev) // FIXME(vdemeester) return []Taskn.Task // This one isn\'t done, but it also isn\'t ready to schedule return []Task func isSchedulable(doneTasks sets.String, prevs []*Node) bool if len(prevs) == 0 return true collected := []string for _, n := range prevs if doneTasks.Has(n.Task.HashKey()) collected = append(collected, n.Task.HashKey()) return len(collected) == len(prevs)
以上是关于Tekton DAG代码的主要内容,如果未能解决你的问题,请参考以下文章
如何使用预定义的 GitLab CI 变量和流式传输到 GitLab Pipeline 日志的 Tekton 日志直接从 GitLab CI 触发 Tekton Pipeline