flask_day05:信号 Django信号 flask-script sqlalchemy 创建操作数据表

Posted xiao-fu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flask_day05:信号 Django信号 flask-script sqlalchemy 创建操作数据表相关的知识,希望对你有一定的参考价值。

鲁棒性

链路,链路追踪,

上下游,大的单体应用,上游还是前端,后端是Django写的

回顾

1.导出项目依赖 pipreqs
2.函数和方法
3.local对象
	并发编程中的一个对象,它可以保证多线程并发访问数据安全
  本质原理是:不同的线程,操作的是自己的数据
  不支持协程
4.自己定义local,支持线程和协程
	注意点一:
	try:
    # 只要解释没有装greenlet,这句话就会报错
   	# 一旦装了,有两种情况,使用了协程和没用协程,无论使用不使用,用getcurrent都能拿到协程id号
    from greenlet import getcurrent as get_ident
  except Exception as e:
    from threading import get_ident
  # 注意点二:重写类的 __setattr__ 和 __getatte__
  对象点属性 取值 不存在会触发 __getattr__
  对象点属性 设置值 不存在时会触发 __setattr__
  
  # 注意点三:由于重写了__setattr__个__getattr__
  类内部使用 self.storage	会递归
  使用类调用对象的方法,他就是普通函数,有几个值传几个值
  object.__setattr__(self,\'storage\',)
  等同于:self.storage=
  等价于:setattr(self,\'stotrage\',)  会递归
  
5.flask是如何实现这个local类的
	def __setattr__(self, name, value):
    ident = self.__ident_func__()
    storage = self.__storage__
    try:
      storage[ident][name] = value
    except KeyError:
      storage[ident] = name:value
      def __getattr__(self, name):
        try:
            return self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)
            
            
           

  def __getattr__(self, k):
        ident = get_ident()
        return self.storage[ident][k]
 
   def __setattr__(self, k, v):
        ident = get_ident() #如果用协程,这就是协程号,如果是线程,这就是线程号
        if ident in self.storage:  #\'协程id号\':arg:1,\'协程id号\':arg:2,\'协程id号\':arg:3
            self.storage[ident][k] = v
        else:
            self.storage[ident] = k: v
            
6.偏函数  :提前传值,返回一个对象,后期可以调用这个对象,传入后续的值


7.请求上下文源码分析(ctx 对象),整个flask的执行流程
	-一旦请求来了----》会执行 Flask类的对象app()---》触发Flask __call__--->self.wsgi_app(environ, start_response)
    -Flask类wsgi_app 方法  大约 2417行
     def wsgi_app(self, environ, start_response):
        #1 返回了一个ctx,请求上下文对象,RequestContext 的对象,里面有session,request
        ctx = self.request_context(environ)
        try:
            try:
                # 2 ctx.push---->RequestContext的push---》382行 
                # _request_ctx_stack.push(self)--self是ctx---》是全局变量
                # 是LocalStack()的对象
                ctx.push()
                # 匹配路由执行视图函数,请求扩展
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:  # noqa: B001
                error = sys.exc_info()[1]
                raise
            # 把结果返回给wsgi服务器
            return response(environ, start_response)
        finally:
            if self.should_ignore_error(error):
                error = None
            # 把当前放进去的ctx剔除,当次请求结束了
            ctx.auto_pop(error)

	-是LocalStack()的对象 的push ,传入了ctx
        def push(self, obj):
            # self._local是 Flask自己定义的兼容线程和协程的Local
            #self._local中反射 stack,会根据不同线程或协程,返回不同线程的stack
            #rv是None的
            rv = getattr(self._local, "stack", None)
            if rv is None:
                # rv=[]
                # self._local.stack=rv
          		#self._local=\'协程id号1\':stack:[],\'协程id号2\':stack:[]
                self._local.stack = rv = []
            rv.append(obj)
            #self._local=\'协程id号1\':stack:[ctx,],\'协程id号2\':stack:[]
            return rv
        
        
        
        
   - 在视图函数中:request = LocalProxy(partial(_lookup_req_object, "request"))
    	-print(request.method) # 执行requets对象的 __getattr__
        -LocalProxy的__getattr__-->核心:
        -return getattr(self._get_current_object(), name)
    	-self._get_current_object() 是 ctx中的真正request对象,那method,自如就拿到当次请求的method
        -def _get_current_object(self):
            if not hasattr(self.__local, "__release_local__"):
                #object.__setattr__(self, "_LocalProxy__local", local),初始化传入的
                # local 是 partial(_lookup_req_object, "request")
                # 
                # getattr(_lookup_req_object(\'request\'), \'method\')
                # getattr(当次请求的reuqest, \'method\')
                return self.__local() # self中的 __local,隐藏属性
            try:
                return getattr(self.__local, self.__name__)
            except AttributeError:
                raise RuntimeError("no object bound to %s" % self.__name__)
                
                
   -def _lookup_req_object(name):
        # 这里把当前线程下 的ctx取出来了
        top = _request_ctx_stack.top
        if top is None:
            raise RuntimeError(_request_ctx_err_msg)
        return getattr(top, name) # 去ctx中反射request,返回的就是当次请求的requets
    
# django flask 同步框架,部署的时候,使用uwsgi部署,uwsgi是进程线程架构,并发量不高
# 可以通过uwsgi+gevent,部署成异步程序

信号

Flask框架中的信号基于blinker(安装这个模块),其主要就是让开发者可以在flask请求过程中定制一些用户行为,flask 和 Django都有

观察者模式,又叫发布 -订阅(Publish/Subscribe) 23种设计模式之一

安装:pip install blinker

信号:signal 翻译过来的,并发编程中学过,信号量Semaphore,是两个不同的概念

比如:用户表新增一条记录时,就记录一下日志

方案一:在每个增加后,都写一行代码 --->>> 后期要删除,比较麻烦

方案二:使用信号量,写一个函数,绑定内置信号,只要程序执行到这,就会执行这个函数

内置信号:flask少一些,Django多一些

request_started = _signals.signal(\'request-started\')                # 请求到来前执行
request_finished = _signals.signal(\'request-finished\')              # 请求结束后执行
 
before_render_template = _signals.signal(\'before-render-template\')  # 模板渲染前执行
template_rendered = _signals.signal(\'template-rendered\')            # 模板渲染后执行
 
got_request_exception = _signals.signal(\'got-request-exception\')    # 请求执行出现异常时执行
 
request_tearing_down = _signals.signal(\'request-tearing-down\')      # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal(\'appcontext-tearing-down\')# 应用上下文执行完毕后自动执行(无论成功与否)
 
appcontext_pushed = _signals.signal(\'appcontext-pushed\')            # 应用上下文push时执行
appcontext_popped = _signals.signal(\'appcontext-popped\')            # 应用上下文pop时执行
message_flashed = _signals.signal(\'message-flashed\')                # 调用flask在其中添加数据时,自动触发

使用内置信号量的步骤

1.写一个函数

2.绑定内置信号

3.等待被触发

自定义信号

1.定义出信号

session_set = _signals.signal(\'session_set\')

2.写一个函数

def test1(*args, **kwargs):
  print(args)
  print(kwargs)
  print(\'session设置值了\')

3.绑定自定义的信号

session_set.connect(test1)

4.触发信号的执行(咱们做)

session_set.send(\'lqz\')  # 触发信号的执行

Django信号

https://www.cnblogs.com/liuqingzheng/articles/9803403.html

Model signals
    pre_init                    # django的modal执行其构造方法前,自动触发
    post_init                   # django的modal执行其构造方法后,自动触发
    pre_save                    # django的modal对象保存前,自动触发
    post_save                   # django的modal对象保存后,自动触发
    pre_delete                  # django的modal对象删除前,自动触发
    post_delete                 # django的modal对象删除后,自动触发
    m2m_changed                 # django的modal中使用m2m字段操作第三张表(add,remove,clear)前后,自动触发
    class_prepared              # 程序启动时,检测已注册的app中modal类,对于每一个类,自动触发
Management signals
    pre_migrate                 # 执行migrate命令前,自动触发
    post_migrate                # 执行migrate命令后,自动触发
Request/response signals
    request_started             # 请求到来前,自动触发
    request_finished            # 请求结束后,自动触发
    got_request_exception       # 请求异常后,自动触发
Database Wrappers
    connection_created          # 创建数据库连接时,自动触发

django中使用内置信号

1.写一个函数

def callBack(*args, **kwargs):
  print(args)
  print(kwargs)

2.绑定信号

方式一:

post_save.connect(callBack)

方式二:

from django.db.models.signals import pre_save
from django.dispatch import receiver
@receiver(pre_save)
def my_callback(senderm,**kwargs):
  print(\'对象创建成功\')
  print(sender)
	print(kwargs)

3.等待触发

flask-script

django中有命令:python manage.py runserver。。。

flask启动项目,像Django一样,通过命令启动

Flask==2.2.2

Flask_Script==2.0.3

借助于:flask-script 实现

安装:pip install flask-script

修改代码:

from flask_script import Manager
manager=Manager(app)
manager.run()

用命令启动

python manage.py runserver

自定制命令

1.简单自定制命令

    @manager.command
    def custom(arg):
        # 命令的代码,比如:初始化数据库, 有个excel表格,使用命令导入到mysql中
        print(arg)

2.复杂一些的自定制命令

    @manager.option(\'-n\', \'--name\', dest=\'name\')
    @manager.option(\'-u\', \'--url\', dest=\'url\')
    def cmd(name, url):
        # python run.py cmd -n lqz -u xxx
        # python run.py cmd --name lqz --url uuu
        print(name, url)

django 中如何自定制命令?

sqlalchemy 快速使用

原生操作的快速使用

# 先不是orm,而是原生sql


# 第一步:导入
from sqlalchemy import create_engine
# 第二步:生成引擎对象
engine = create_engine(
    "mysql+pymysql://root@127.0.0.1:3306/cnblogs",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 第三步:使用引擎获取连接,操作数据库
conn = engine.raw_connection()
cursor=conn.cursor()
cursor.execute(\'select * from aritcle\')
print(cursor.fetchall())

创建操作数据表

# 第一步:导入
from sqlalchemy import create_engine
import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

# 第二步:执行declarative_base,得到一个类
Base = declarative_base()


# 第三步:继承生成的Base类
class User(Base):
    # 第四步:写字段
    id = Column(Integer, primary_key=True)  # 生成一列,类型是Integer,主键
    name = Column(String(32), index=True, nullable=False)  # name列varchar32,索引,不可为空
    email = Column(String(32), unique=True)
    # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    ctime = Column(DateTime, default=datetime.datetime.now)
    # extra = Column(Text, nullable=True)

    # 第五步:写表名 如果不写以类名为表名
    __tablename__ = \'users\'  # 数据库表名称

    # 第六步:建立联合索引,联合唯一
    __table_args__ = (
        UniqueConstraint(\'id\', \'name\', name=\'uix_id_name\'),  # 联合唯一
        Index(\'ix_id_name\', \'name\', \'email\'),  # 索引
    )


class Book(Base):
    __tablename__ = \'books\'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
# 第七步:把表同步到数据库中


# 不会创建库,只会创建表
engine = create_engine(
    "mysql+pymysql://root@127.0.0.1:3306/aaa",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 把表同步到数据库  (把被Base管理的所有表,都创建到数据库)
Base.metadata.create_all(engine)


# 把所有表删除
# Base.metadata.drop_all(engine)

django的信号

django的信号

Django中提供了“信号调度”,用于在框架执行操作时解耦。通俗来讲,就是一些动作发生的时候,信号允许特定的发送者去提醒一些接受者。

Django提供一种信号机制。其实就是观察者模式,又叫发布-订阅(Publish/Subscribe) 。当发生一些动作的时候,发出信号,然后监听了这个信号的函数就会执行。

1. Django内置信号

Model signals
    pre_init                    # django的model执行其构造方法前,自动触发
    post_init                   # django的model执行其构造方法后,自动触发
    pre_save                    # django的model对象保存前,自动触发
    post_save                   # django的model对象保存后,自动触发
    pre_delete                  # django的model对象删除前,自动触发
    post_delete                 # django的model对象删除后,自动触发
    m2m_changed                 # django的model中使用m2m字段操作第三张表(add,remove,clear)前后,自动触发
    class_prepared              # 程序启动时,检测已注册的app中modal类,对于每一个类,自动触发
Management signals
    pre_migrate                 # 执行migrate命令前,自动触发
    post_migrate                # 执行migrate命令后,自动触发
Request/response signals
    request_started             # 请求到来前,自动触发
    request_finished            # 请求结束后,自动触发
    got_request_exception       # 请求异常后,自动触发
Test signals
    setting_changed             # 使用test测试修改配置文件时,自动触发
    template_rendered           # 使用test测试渲染模板时,自动触发
Database Wrappers
    connection_created          # 创建数据库连接时,自动触发

对于Django内置的信号,仅需注册指定信号,当程序执行相应操作时,自动触发注册函数:

注册信号,写入与project同名的文件夹下的_init_.py文件中,也是换数据库引擎的地方。

# 导入需要的信号
from django.core.signals import request_finished
from django.core.signals import request_started
from django.core.signals import got_request_exception
from django.db.models.signals import class_prepared
from django.db.models.signals import pre_init, post_init
from django.db.models.signals import pre_save, post_save
from django.db.models.signals import pre_delete, post_delete
from django.db.models.signals import m2m_changed
from django.db.models.signals import pre_migrate, post_migrate
from django.test.signals import setting_changed
from django.test.signals import template_rendered
from django.db.backends.signals import connection_created

################################################ 方法一 ############################################
# 写一个回调函数触发信号后要执行的函数
def callback(sender, **kwargs):
    print("xxoo_callback")
    print(sender,kwargs)

# 信号连接的哪个回调函数
post_save.connect(callback)
# 模板Signal.connect(receiver, sender=None, weak=True, dispatch_uid=None)
receiver :当前信号连接的回调函数,也就是处理信号的回调函数。 
sender :指定从哪个发送方接收信号。 
weak : 是否弱引用
dispatch_uid :信号接收器的唯一标识符,以防信号多次发送。

################################################ 方法二 ############################################
from django.dispatch import receiver

@receiver(post_save)  # 通过加装饰器的方式,指定信号触发完后执行哪个回调函数
def callback(sender, **kwargs):
    print("xxoo_callback")
    print(sender,kwargs)

指定发送者

from django.db.models.signals import pre_save, post_save
from django.dispatch import receiver
from myapp.models import MyModel

@receiver(post_save,sender=MyModel)  # 指定某个model触发信号后执行该回调函数。
def callback(sender, **kwargs):
    print("xxoo_callback")
    print(sender,kwargs)

2. 自定义信号

  • 定义信号

    # 在某个py文件中写入下面代码
    import django.dispatch
    pizza_done = django.dispatch.Signal(providing_args=["toppings", "size"])
  • 注册信号

    # 在_init_.py 中注册信号
    from 路径 import pizza_done
    
    def callback(sender, **kwargs):
        print("callback")
        print(sender,kwargs)
    
    pizza_done.connect(callback)
  • 触发信号

    from 路径 import pizza_done
    
    pizza_done.send(sender='seven',toppings=123, size=456)

由于内置信号的触发者已经集成到Django中,所以其会自动调用,而对于自定义信号则需要开发者在自定义的位置触发。

例子:

from django.shortcuts import HttpResponse
import time
import django.dispatch
from django.dispatch import receiver
 
# 定义一个信号
work_done = django.dispatch.Signal(providing_args=['path', 'time'])
 
def create_signal(request):
    url_path = request.path
    print("我已经做完了工作。现在我发送一个信号出去,给那些指定的接收器。")
 
    # 发送信号,将请求的IP地址和时间一并传递过去
    work_done.send(create_signal, path=url_path, time=time.strftime("%Y-%m-%d %H:%M:%S"))
    return HttpResponse("200,ok")
 
@receiver(work_done, sender=create_signal)
def my_callback(sender, **kwargs):
    print("我在%s时间收到来自%s的信号,请求url为%s" % (kwargs['time'], sender, kwargs["path"]))

3. 防止重复信号

在某些情况下,将接收器连接到信号的代码可能会运行多次。这可能会导致您的接收器功能被多次注册,因此对于单个信号事件将被多次调用。

如果此行为有问题(例如在保存模型时使用信号发送电子邮件),请传递一个唯一标识符作为dispatch_uid参数,以识别您的接收方功能。该标识符通常是一个字符串,尽管任何可哈希对象都足够。最终结果是,对于每个唯一dispatch_uid值,您的接收器功能将仅与信号绑定一次:

from django.core.signals import request_finished

request_finished.connect(my_callback, dispatch_uid="my_unique_identifier")

以上是关于flask_day05:信号 Django信号 flask-script sqlalchemy 创建操作数据表的主要内容,如果未能解决你的问题,请参考以下文章

如何从 django 信号返回数据?

Django 信号作为 IPC

在models.py 之外接收django-allauth 信号?

没有收到 django-paypal IPN 信号

使用 statprof 分析 Django 视图 - 不能在线程中使用信号

Django 记录 user_login_failed 信号的用户 IP