代码发布5 发布流程, 基于channel-layers实现群发功能, 节点图标的创建, 节点状态动态改变
Posted ludingchao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了代码发布5 发布流程, 基于channel-layers实现群发功能, 节点图标的创建, 节点状态动态改变相关的知识,希望对你有一定的参考价值。
给任务单的展示页面,添加一个去发布的按钮,点击进入发布界面(项目基本介绍,gojs渲染图标)
"""静态文件配置""" # 1.配置文件中直接指定查找路径(从上往下查找) 方式一 STATICFILES_DIRS = [ os.path.join(BASE_DIR,‘static1‘), os.path.join(BASE_DIR,‘static2‘), os.path.join(BASE_DIR,‘static3‘), os.path.join(BASE_DIR,‘static4‘), ] # 2.直接利用模版语法 (这样就不需要在配置文件中设定,直接指定查找路径) 方式二 {% load staticfiles %} <script src="{% static ‘js/go.js‘ %}"></script>
当多个用户打开相同的发布页面的时候,只要有一个人在操作,其他人的界面都应该看到效果(channle-layers)
channle-layers基本使用
-
配置文件中配置参数
CHANNEL_LAYERS = { ‘default‘:{ ‘BACKEND‘:‘channels.layers.InMemoryChannelLayer‘ } }
- 如何获取无名有名分组中url携带的参数 (因为此处用类,无法像方法获得有名分组参数)
task_id = self.scope[‘url_route‘][‘kwargs‘].get(‘task_id‘)
代码:
# routing.py application = ProtocolTypeRouter({ ‘websocket‘:URLRouter([ re_path(r‘^publish/(?P<task_id>d+)‘,consumers.PublishConsumer) ]) }) # consumers.py class PublishConsumer(WebsocketConsumer): def websocket_connect(self, message): self.accept() # 获取url中无名有名分组的参数 self.scope是一个大字典 里面包含了前端所有的数据 task_id = self.scope[‘url_route‘][‘kwargs‘].get(‘task_id‘) # scope[‘url_route‘]为固定参数,获取有名分组参数 # task_id = self.scope[‘url_route‘][‘args‘].get(‘task_id‘) # 无名分组获取参数
- 链接对象自动加入对应的群聊
from asgiref.sync import async_to_sync async_to_sync(self.channel_layer.group_add)(task_id,self.channel_name)
- 给特定的群中发消息
async_to_sync(self.channel_layer.group_send)(task_id,{‘type‘:‘my.send‘,‘message‘:{‘code‘:‘init‘,‘data‘:node_list}}) """ 后面字典的键是固定的 就叫type和message type后面指定的值就是负责发送消息的方法(将message后面的数据交由type后面指定的方法发送给对应的群聊中) 针对type后面的方法名 有一个固定的变化格式 my.send >>> my_send xxx.ooo >>> xxx_ooo """
- 在类中需要定义一个专门发送消息的方法
def my_send(self,event): # event为侯敏的消息对象 message = event.get(‘message‘) # {‘code‘:‘init‘,‘data‘:node_list} # 发送数据 self.send(json.dumps(message)) """ 内部原理就类似于是循环当前群组里面所有的链接对象 然后依次执行send方法 for self in self_list: self.send """
- 断开链接之后去对应的群聊中剔除群成员
async_to_sync(self.channel_layer.group_discard)(task_id,self.channel_name)
其实可以直接使用任务的主键值作为群号
节点数据无论是初始化的还是动态修改的,都应该是动态生成的而不是直接写死的
其次,当一个任务单以及发布之后,应该保存它的发布节点数据
也就意味着我们需要开设一个模型表用开存储节点相关的所有数据
class Node(models.Model): """存储节点数据""" text = models.CharField(verbose_name=‘节点文字‘,max_length=32) # 一个任务单有多个节点 一对多的关系 task = models.ForeignKey(verbose_name=‘发布任务单‘,to=‘DeployTask‘) status_choices = ( (‘lightgray‘,‘待发布‘), (‘green‘,‘成功‘), (‘red‘,‘失败‘), ) status = models.CharField(verbose_name=‘状态‘,max_length=32,choices=status_choices,default=‘lightgray‘) # 子节点 父节点 bbs子评论与根评论 parent = models.ForeignKey(verbose_name=‘父节点‘,to=‘self‘,null=True,blank=True) # 一个服务器可以有多个节点 一对多的关系 server = models.ForeignKey(verbose_name=‘服务器‘,to=‘Server‘,null=True,blank=True)
当用户点击初始化图标的时候,需要操作上述的表查询或者新增记录
# 先去模型表中创建数据 之后再构造gojs所需要的数据类型返回即可 # 群发 node_object_list = [] # 1 先去节点表中创建数据(1 先不考虑钩子节点 只创建基本的节点) start_node = models.Node.objects.create(text=‘开始‘, task_id=task_id ) node_object_list.append(start_node) download_node = models.Node.objects.create(text=‘下载‘, task_id=task_id, parent=start_node ) node_object_list.append(download_node) upload_node = models.Node.objects.create(text=‘上传‘, task_id=task_id, parent=download_node ) node_object_list.append(upload_node) # 1.1 服务器相关的节点 一个项目可以有多个服务器 task_obj = models.DeployTask.objects.filter(pk=task_id).first() for server_obj in task_obj.project.servers.all(): server_node = models.Node.objects.create(text=server_obj.hostname, task_id=task_id, parent=upload_node, server=server_obj ) node_object_list.append(server_node) # 2 再将数据构造成gojs所需的格式返回给前端 [{}] node_list = [] # [{},{},{},{}] for node_obj in node_object_list: temp_dic = { ‘key‘:str(node_obj.pk), ‘text‘:node_obj.text, ‘color‘:node_obj.status, } # 针对parant字段 需要做判断 再考虑是否创建键值对 if node_obj.parent: # 我们用数据的主键值作为key temp_dic[‘parent‘] = str(node_obj.parent_id) node_list.append(temp_dic) async_to_sync(self.channel_layer.group_send)(task_id,{‘type‘:‘my.send‘,‘message‘:{‘code‘:‘init‘,‘data‘:node_list}})
小bug优化
# 1 当一个任务以及初始化过了 再次点击初始化按钮 数据库不应该再写入数据 # 先判断当前任务单是否已经初始化过图标数据了 node_queryset = models.Node.objects.filter(task_id=task_id) if not node_queryset: # 创建和构造数据的操作 else: node_object_list = node_queryset # 2 当用户以及给一个任务单初始化过图标之后。打开页面不应该在此点击按钮 而是直接展示出来 在后端建立连接的方法内 直接查询并返回数据 # 查询当前任务单是否已经初始化过图标 如果有直接查询出来展示到前端 减少用户操作 node_queryset = models.Node.objects.filter(task_id=task_id) if node_queryset: node_list = [] # [{},{},{},{}] for node_obj in node_queryset: temp_dic = { ‘key‘: str(node_obj.pk), ‘text‘: node_obj.text, ‘color‘: node_obj.status, } # 针对parant字段 需要做判断 再考虑是否创建键值对 if node_obj.parent: # 我们用数据的主键值作为key temp_dic[‘parent‘] = str(node_obj.parent_id) node_list.append(temp_dic) # 发送数据 单发/群发??? 单发 self.send(text_data=json.dumps({‘code‘:‘init‘,‘data‘:node_list}))
-
直接自己规定死只能写shell脚本或者python脚本
-
兼容各个类型的脚本(脚本表中再开设一个用来标示脚本类型的字段)
-
通过文件头来指定
判断用户是否书写的钩子脚本
# 判断是否有下载前的钩子脚本 if task_obj.before_download_script: # 再创建一个下载前的节点 start_node = models.Node.objects.create(text=‘下载前‘, task_id=task_id, parent=start_node ) node_object_list.append(start_node) """利用变量名只想的问题来实现箭头指向"""
代码优化
# 1 wesocket_recevie方法内的代码过于冗杂 根据功能的不同做优化 """ 将创建节点数据的代码 和 构造gojs所需要的代码封装成两个函数 """ def create_node(task_id,task_obj): """创建节点数据""" node_object_list = [] # 先判断当前任务单是否已经初始化过图标数据了 node_queryset = models.Node.objects.filter(task_id=task_id) if not node_queryset: # 1 先去节点表中创建数据(1 先不考虑钩子节点 只创建基本的节点) start_node = models.Node.objects.create(text=‘开始‘, task_id=task_id ) node_object_list.append(start_node) # 判断是否有下载前的钩子脚本 if task_obj.before_download_script: # 再创建一个下载前的节点 start_node = models.Node.objects.create(text=‘下载前‘, task_id=task_id, parent=start_node ) node_object_list.append(start_node) download_node = models.Node.objects.create(text=‘下载‘, task_id=task_id, parent=start_node ) node_object_list.append(download_node) # 判断是否有下载后的钩子脚本 if task_obj.after_download_script: # 再创建一个下载后的节点 download_node = models.Node.objects.create(text=‘下载后‘, task_id=task_id, parent=download_node ) node_object_list.append(download_node) upload_node = models.Node.objects.create(text=‘上传‘, task_id=task_id, parent=download_node ) node_object_list.append(upload_node) # 1.1 服务器相关的节点 一个项目可以有多个服务器 task_obj = models.DeployTask.objects.filter(pk=task_id).first() for server_obj in task_obj.project.servers.all(): server_node = models.Node.objects.create(text=server_obj.hostname, task_id=task_id, parent=upload_node, server=server_obj ) node_object_list.append(server_node) # 判断是否有发布前的钩子 if task_obj.before_deploy_script: # 再创建一个下载后的节点 server_node = models.Node.objects.create(text=‘发布前‘, task_id=task_id, parent=server_node, server=server_obj ) node_object_list.append(server_node) # 先再创建一个发布节点 deploy_node = models.Node.objects.create(text=‘发布‘, task_id=task_id, parent=server_node, server=server_obj ) node_object_list.append(deploy_node) # 判断是否有发布后的钩子 if task_obj.after_deploy_script: # 再创建一个下载后的节点 after_deploy_node = models.Node.objects.create(text=‘发布后‘, task_id=task_id, parent=deploy_node, server=server_obj ) node_object_list.append(after_deploy_node) else: node_object_list = node_queryset return node_object_list def convert_data_to_gojs(node_object_list): """将数据转化成gojs所需要的类型""" # 2 再将数据构造成gojs所需的格式返回给前端 [{}] node_list = [] # [{},{},{},{}] for node_obj in node_object_list: temp_dic = { ‘key‘: str(node_obj.pk), ‘text‘: node_obj.text, ‘color‘: node_obj.status, } # 针对parant字段 需要做判断 再考虑是否创建键值对 if node_obj.parent: # 我们用数据的主键值作为key temp_dic[‘parent‘] = str(node_obj.parent_id) node_list.append(temp_dic) return node_list
强调:有点一定要是先写好,之后再去优化,不要同步进行
节点动态变化
节点背后对应的操作,无外乎就是下载代码(gitpython模块的运用)
其次就是将下载好的代码上传到远程服务器上(paramiko模块的运用)
# 给发布按钮绑定事件 点击触发节点背后对应的所有的操作
consumers.py代码示例
from channels.generic.websocket import WebsocketConsumer from channels.exceptions import StopConsumer import json from asgiref.sync import async_to_sync from app01 import models def create_node(task_id,task_obj): """创建节点数据""" node_object_list = [] # 先判断当前任务单是否已经初始化过图标数据了 node_queryset = models.Node.objects.filter(task_id=task_id) if not node_queryset: # 1 先去节点表中创建数据(1 先不考虑钩子节点 只创建基本的节点) start_node = models.Node.objects.create(text=‘开始‘,task_id=task_id) node_object_list.append(start_node) # 判断是否有下载前的钩子脚本 if task_obj.before_download_script: # 再创建一个下载前的节点 start_node = models.Node.objects.create(text=‘下载前‘,task_id=task_id,parent=start_node) node_object_list.append(start_node) download_node = models.Node.objects.create(text=‘下载‘,task_id=task_id,parent=start_node) node_object_list.append(download_node) # 判断是否有下载后的钩子脚本 if task_obj.after_download_script: # 再创建一个下载后的节点 download_node = models.Node.objects.create(text=‘下载后‘,task_id=task_id,parent=download_node) node_object_list.append(download_node) upload_node = models.Node.objects.create(text=‘上传‘,task_id=task_id,parent=download_node) node_object_list.append(upload_node) # 1.1 服务器相关的节点 一个项目可以有多个服务器 task_obj = models.DeployTask.objects.filter(pk=task_id).first() for server_obj in task_obj.project.servers.all(): server_node = models.Node.objects.create(text=server_obj.hostname,task_id=task_id, parent=upload_node,server=server_obj) node_object_list.append(server_node) # 判断是否有发布前的钩子 if task_obj.before_deploy_script: # 再创建一个下载后的节点 server_node = models.Node.objects.create(text=‘发布前‘,task_id=task_id, parent=server_node,server=server_obj) node_object_list.append(server_node) # 先再创建一个发布节点 deploy_node = models.Node.objects.create(text=‘发布‘,task_id=task_id, parent=server_node,server=server_obj) node_object_list.append(deploy_node) # 判断是否有发布后的钩子 if task_obj.after_deploy_script: # 再创建一个下载后的节点 after_deploy_node = models.Node.objects.create(text=‘发布后‘,task_id=task_id, parent=deploy_node,server=server_obj) node_object_list.append(after_deploy_node) else: node_object_list = node_queryset return node_object_list def convert_data_to_gojs(node_object_list): """将数据转化成gojs所需要的类型""" # 2 再将数据构造成gojs所需的格式返回给前端 [{}] node_list = [] # [{},{},{},{}] for node_obj in node_object_list: temp_dic = { ‘key‘: str(node_obj.pk), ‘text‘: node_obj.text, ‘color‘: node_obj.status, } # 针对parant字段 需要做判断 再考虑是否创建键值对 if node_obj.parent: # 我们用数据的主键值作为key temp_dic[‘parent‘] = str(node_obj.parent_id) node_list.append(temp_dic) return node_list class PublishConsumer(WebsocketConsumer): def websocket_connect(self, message): self.accept() # 获取url中有名分组的参数 self.scope是一个大字典 里面包含了前端所有的数据 task_id = self.scope[‘url_route‘][‘kwargs‘].get(‘task_id‘) # 应该将该用户添加到对应的群聊中 固定写法 async_to_sync(self.channel_layer.group_add)(task_id,self.channel_name) # 第一个参数是群号 必须是字符串格式 # 第二个参数类似于群成员的唯一标示 # 查询当前任务单是否已经初始化过图标 如果有直接查询出来展示到前端 减少用户操作 node_queryset = models.Node.objects.filter(task_id=task_id) if node_queryset: node_list = convert_data_to_gojs(node_queryset) # 发送数据 单发/群发??? 单发 self.send(text_data=json.dumps({‘code‘:‘init‘,‘data‘:node_list})) def websocket_receive(self, message): task_id = self.scope[‘url_route‘][‘kwargs‘].get(‘task_id‘) task_obj = models.DeployTask.objects.filter(pk=task_id).first() text = message.get(‘text‘) # 由于请求图标的数据 可以分为两部分 第一部分是初始化 第二部分是动态变化 # 我们对text做判断 来区分到底需要哪部分数据 if text == ‘init‘: # node_list = [ # {"key": "start", "text": ‘开始‘, "figure": ‘Ellipse‘, "color": "lightgreen"}, # {"key": "download", "parent": ‘start‘, "text": ‘下载代码‘, "color": "lightgreen", "link_text": ‘执行中...‘}, # {"key": "compile", "parent": ‘download‘, "text": ‘本地编译‘, "color": "lightgreen"}, # ] # 要对数据进行序列化处理 单发 # self.send(text_data=json.dumps({"code":‘init‘,"data":node_list})) # 一个操作节点模型表完成数据展示 # 群发 # 1.创建节点数据 node_object_list = create_node(task_id,task_obj) # 2.构造gojs所需要的数据类型 node_list = convert_data_to_gojs(node_object_list) async_to_sync(self.channel_layer.group_send)(task_id,{‘type‘:‘my.send‘,‘message‘:{‘code‘:‘init‘,‘data‘:node_list}}) """ 后面字典的键是固定的 就叫type和message type后面指定的值就是负责发送消息的方法(将message后面的数据交由type后面指定的方法发送给对应的群聊中) 针对type后面的方法名 有一个固定的变化格式 my.send >>> my_send xxx.ooo >>> xxx_ooo """ if text == ‘deploy‘: """ 1 先默认让所有的节点执行成功 2 再真正的执行命令操作 第一步 开始节点 开始节点无需任何操作 直接成功即可 第二步 下载前 执行本地脚本 执行成功或者失败 第三步 下载 利用gitpython做操作 第四步 下载后 执行本地脚本 执行成功或者失败 第五步 上传 paramiko模块 第六步 链接每天服务器 上传代码 发布前钩子 发布 发布后钩子 """ def my_send(self,event): message = event.get(‘message‘) # {‘code‘:‘init‘,‘data‘:node_list} # 发送数据 self.send(json.dumps(message)) """ 内部原理就类似于是循环当前群组里面所有的链接对象 然后依次执行send方法 for self in self_list: self.send """ def websocket_disconnect(self, message): task_id = self.scope[‘url_route‘][‘kwargs‘].get(‘task_id‘) async_to_sync(self.channel_layer.group_discard)(task_id,self.channel_name) raise StopConsumer()
deploy.html代码示例
{% extends ‘base.html‘ %} {% load staticfiles %} {% block content %} <h1>发布任务</h1> {# 1 操作按钮区域#} <div style="margin: 10px 0;"> <button class="btn btn-primary" onclick="initDiagram()">初始化图标</button> <button class="btn btn-primary" onclick="releaseTask()">发布任务</button> </div> {# 2 基本信息展示区#} <table class="table table-hover table-striped"> <tbody> <tr> <td>项目名称:{{ project_obj.title }}</td> <td>环境:{{ project_obj.get_env_display }}</td> </tr> <tr> <td>版本:{{ task_obj.tag }}</td> <td>状态:{{ task_obj.get_status_display }}</td> </tr> <tr> <td colspan="2">仓库地址:{{ project_obj.repo }}</td> </tr> <tr> <td colspan="2">线上路径:{{ project_obj.path }}</td> </tr> <tr> <td colspan="2"> <div>关联服务器</div> <ul> {% for server_obj in project_obj.servers.all %} <li>{{ server_obj.hostname }}</li> {% endfor %} </ul> </td> </tr> </tbody> </table> {# 3 图形动态展示区#} <div id="diagramDiv" style="width:100%; min-height:450px; background-color: #DAE4E4;"></div> {% endblock %} {% block js %} <script src="{% static ‘js/go.js‘ %}"></script> <script> var ws; var diagram; function initWebSocket() { ws = new WebSocket(‘ws://127.0.0.1:8000/publish/{{ task_obj.pk }}/‘); ws.onmessage = function (event) { {#console.log(typeof event.data) // {‘code‘:‘‘,‘data‘:‘‘}#} // 容易出错 var res = JSON.parse(event.data) if (res.code===‘init‘){ diagram.model = new go.TreeModel(res.data); } } } function initTable() { var $ = go.GraphObject.make; diagram = $(go.Diagram, "diagramDiv",{ layout: $(go.TreeLayout, { angle: 0, nodeSpacing: 20, layerSpacing: 70 }) }); // 创建一个节点模版 diagram.nodeTemplate = $(go.Node, "Auto", $(go.Shape, { figure: "RoundedRectangle", fill: ‘yellow‘, stroke: ‘yellow‘ }, new go.Binding("figure", "figure"), new go.Binding("fill", "color"), new go.Binding("stroke", "color")), $(go.TextBlock, {margin: 8}, new go.Binding("text", "text")) ); // 创建一个箭头模版 diagram.linkTemplate = $(go.Link, {routing: go.Link.Orthogonal}, $(go.Shape, {stroke: ‘yellow‘}, new go.Binding(‘stroke‘, ‘link_color‘)), $(go.Shape, {toArrow: "OpenTriangle", stroke: ‘yellow‘}, new go.Binding(‘stroke‘, ‘link_color‘)) ); // 这里的数据后期就可以通过后端来获取 {#var nodeDataArray = [#} {# {key: "start", text: ‘开始‘, figure: ‘Ellipse‘, color: "lightgreen"},#} {# {key: "download", parent: ‘start‘, text: ‘下载代码‘, color: "lightgreen", link_text: ‘执行中...‘},#} {# {key: "compile", parent: ‘download‘, text: ‘本地编译‘, color: "lightgreen"},#} {# {key: "zip", parent: ‘compile‘, text: ‘打包‘, color: "red", link_color: ‘red‘},#} {# {key: "c1", text: ‘服务器1‘, parent: "zip"},#} {# {key: "c11", text: ‘服务重启‘, parent: "c1"},#} {# {key: "c2", text: ‘服务器2‘, parent: "zip"},#} {# {key: "c21", text: ‘服务重启‘, parent: "c2"},#} {# {key: "c3", text: ‘服务器3‘, parent: "zip"},#} {# {key: "c31", text: ‘服务重启‘, parent: "c3"}#} {#];#} {#diagram.model = new go.TreeModel(nodeDataArray);#} // 动态控制节点颜色变化 //var node = diagram.model.findNodeDataForKey("zip"); // diagram.model.setDataProperty(node, "color", "lightgreen"); } $(function () { // 页面加载完毕 直接先初始化websocket对象和图标对象 initWebSocket() initTable() }) function initDiagram() { // 朝后端要初始化图标的数据 ws.send(‘init‘) } function releaseTask() { // 朝后端发送执行任务的命令 ws.send(‘deploy‘) } </script> {% endblock %}
以上是关于代码发布5 发布流程, 基于channel-layers实现群发功能, 节点图标的创建, 节点状态动态改变的主要内容,如果未能解决你的问题,请参考以下文章
毕业设计 - 题目: 基于协同过滤的电影推荐系统 - Django 在线电影推荐协同过滤
Android 5.0 Camera系统源码分析:Camera预览3A流程