Python之contextlib库及源码分析

Posted Fate0729

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python之contextlib库及源码分析相关的知识,希望对你有一定的参考价值。

Utilities for with-statement contexts
  __all__ = ["contextmanager", "closing", "AbstractContextManager",
               "ContextDecorator", "ExitStack", "redirect_stdout",
               "redirect_stderr", "suppress"]

AbstractContextManager(abc.ABC)

  上下文管理抽象类,子类必须实现__enter__(self)、__exit__(self)

class AbstractContextManager(abc.ABC):

    """An abstract base class for context managers."""

    def __enter__(self):
        """Return `self` upon entering the runtime context."""
        return self

    @abc.abstractmethod
    def __exit__(self, exc_type, exc_value, traceback):
        """Raise any exception triggered within the runtime context."""
        return None

    @classmethod
    def __subclasshook__(cls, C):
        if cls is AbstractContextManager:
            if (any("__enter__" in B.__dict__ for B in C.__mro__) and
                any("__exit__" in B.__dict__ for B in C.__mro__)):
                return True
        return NotImplemented

ContextDecorator(object)

  上下文管理基类或mixin类,该类可以像装饰器一样工作,提供你需要实现的任何辅助功能

class ContextDecorator(object):
    "A base class or mixin that enables context managers to work as decorators."

    def _recreate_cm(self):
        """Return a recreated instance of self.

        Allows an otherwise one-shot context manager like
        _GeneratorContextManager to support use as
        a decorator via implicit recreation.

        This is a private interface just for _GeneratorContextManager.
        See issue #11647 for details.
        """
        return self

    def __call__(self, func):
        @wraps(func)
        def inner(*args, **kwds):
            with self._recreate_cm():
                return func(*args, **kwds)
        return inner

_GeneratorContextManager(ContextDecorator, AbstractContextManager)

  contextmanager装饰器的包装函数提供以下方法:

  _recreate_cm(重新生成新对像),self.__class__(*args, **kwds)

  __enter__上下文管理进入函数

  __exit__上下文管理退出函数

  可以根据ContextDecorator实现任何想实现的辅助功能

class _GeneratorContextManager(ContextDecorator, AbstractContextManager):
    """Helper for @contextmanager decorator."""

    def __init__(self, func, args, kwds):
        self.gen = func(*args, **kwds)
        logger.info("get generator by with:{}".format(self.gen))
        self.func, self.args, self.kwds = func, args, kwds
        # Issue 19330: ensure context manager instances have good docstrings
        doc = getattr(func, "__doc__", None)  #得到函数文档,第三个参数为默认参数
        logger.info("doc:{}".format(doc))
        if doc is None:
            doc = type(self).__doc__
        self.__doc__ = doc
        # Unfortunately, this still doesn‘t provide good help output when
        # inspecting the created context manager instances, since pydoc
        # currently bypasses the instance docstring and shows the docstring
        # for the class instead.
        # See http://bugs.python.org/issue19404 for more details.

    def _recreate_cm(self):
        # _GCM instances are one-shot context managers, so the
        # CM must be recreated each time a decorated function is
        # called
        return self.__class__(self.func, self.args, self.kwds)

    def __enter__(self):
        logger.info("__enter__:you can add you method")
        
        @ContextDecorator()
        def testfun(*args, **kwds):
            logger.info("@ContextDecorator():testfun test success")   
        testfun("hello")
        
        try:
            return next(self.gen)
        except StopIteration:
            raise RuntimeError("generator didn‘t yield") from None

    def __exit__(self, type, value, traceback):
        logger.info("__exit__")
        logger.info("type:{}".format(type))
        if type is None:
            try:
                next(self.gen)
            except StopIteration:
                return
            else:
                raise RuntimeError("generator didn‘t stop")
        else:
            if value is None:
                # Need to force instantiation so we can reliably
                # tell if we get the same exception back
                value = type()
            try:
                self.gen.throw(type, value, traceback)
                raise RuntimeError("generator didn‘t stop after throw()")
            except StopIteration as exc:
                # Suppress StopIteration *unless* it‘s the same exception that
                # was passed to throw().  This prevents a StopIteration
                # raised inside the "with" statement from being suppressed.
                return exc is not value
            except RuntimeError as exc:
                # Don‘t re-raise the passed in exception. (issue27112)
                if exc is value:
                    return False
                # Likewise, avoid suppressing if a StopIteration exception
                # was passed to throw() and later wrapped into a RuntimeError
                # (see PEP 479).
                if exc.__cause__ is value:
                    return False
                raise
            except:
                # only re-raise if it‘s *not* the exception that was
                # passed to throw(), because __exit__() must not raise
                # an exception unless __exit__() itself failed.  But throw()
                # has to raise the exception to signal propagation, so this
                # fixes the impedance mismatch between the throw() protocol
                # and the __exit__() protocol.
                #
                if sys.exc_info()[1] is not value:
                    raise

装饰器contextmanager

def contextmanager(func):
    """@contextmanager decorator.

    Typical usage:

        @contextmanager
        def some_generator(<arguments>):
            <setup>
            try:
                yield <value>
            finally:
                <cleanup>

    This makes this:

        with some_generator(<arguments>) as <variable>:
            <body>

    equivalent to this:

        <setup>
        try:
            <variable> = <value>
            <body>
        finally:
            <cleanup>

    """
    @wraps(func)
    def helper(*args, **kwds):
        return _GeneratorContextManager(func, args, kwds)
    return helper

用例:

技术分享图片
#coding = utf-8

import abc
from functools import wraps

import logging
logging.basicConfig(level=logging.INFO, filename="logging.txt", filemode="w+",                 format=%(asctime)s - %(name)s - %(levelname)s - %(message)s)
logger = logging.getLogger(__name__)

class AbstractContextManager(abc.ABC):

    """An abstract base class for context managers."""

    def __enter__(self):
        """Return `self` upon entering the runtime context."""
        return self

    @abc.abstractmethod
    def __exit__(self, exc_type, exc_value, traceback):
        """Raise any exception triggered within the runtime context."""
        return None

    @classmethod
    def __subclasshook__(cls, C):
        if cls is AbstractContextManager:
            if (any("__enter__" in B.__dict__ for B in C.__mro__) and
                any("__exit__" in B.__dict__ for B in C.__mro__)):
                return True
        return NotImplemented


class ContextDecorator(object):
    "A base class or mixin that enables context managers to work as decorators."

    def _recreate_cm(self):
        """Return a recreated instance of self.

        Allows an otherwise one-shot context manager like
        _GeneratorContextManager to support use as
        a decorator via implicit recreation.

        This is a private interface just for _GeneratorContextManager.
        See issue #11647 for details.
        """
        return self

    def __call__(self, func):
        #logger.info("ContextDecorator func:{}".format(func))
        @wraps(func)
        def inner(*args, **kwds):
            #with self._recreate_cm():
            logger.info("you can do something in decorator")
            return func(*args, **kwds)
        return inner


class _GeneratorContextManager(ContextDecorator, AbstractContextManager):
    """Helper for @contextmanager decorator."""

    def __init__(self, func, args, kwds):
        self.gen = func(*args, **kwds)
        logger.info("get generator by with:{}".format(self.gen))
        self.func, self.args, self.kwds = func, args, kwds
        # Issue 19330: ensure context manager instances have good docstrings
        doc = getattr(func, "__doc__", None)  #得到函数文档,第三个参数为默认参数
        logger.info("doc:{}".format(doc))
        if doc is None:
            doc = type(self).__doc__
        self.__doc__ = doc
        # Unfortunately, this still doesn‘t provide good help output when
        # inspecting the created context manager instances, since pydoc
        # currently bypasses the instance docstring and shows the docstring
        # for the class instead.
        # See http://bugs.python.org/issue19404 for more details.

    def _recreate_cm(self):
        # _GCM instances are one-shot context managers, so the
        # CM must be recreated each time a decorated function is
        # called
        return self.__class__(self.func, self.args, self.kwds)

    def __enter__(self):
        logger.info("__enter__:you can add you method")
        
        @ContextDecorator()
        def testfun(*args, **kwds):
            logger.info("@ContextDecorator():testfun test success")   
        testfun("hello")
        
        try:
            return next(self.gen)
        except StopIteration:
            raise RuntimeError("generator didn‘t yield") from None

    def __exit__(self, type, value, traceback):
        logger.info("__exit__")
        logger.info("type:{}".format(type))
        if type is None:
            try:
                next(self.gen)
            except StopIteration:
                return
            else:
                raise RuntimeError("generator didn‘t stop")
        else:
            if value is None:
                # Need to force instantiation so we can reliably
                # tell if we get the same exception back
                value = type()
            try:
                self.gen.throw(type, value, traceback)
                raise RuntimeError("generator didn‘t stop after throw()")
            except StopIteration as exc:
                # Suppress StopIteration *unless* it‘s the same exception that
                # was passed to throw().  This prevents a StopIteration
                # raised inside the "with" statement from being suppressed.
                return exc is not value
            except RuntimeError as exc:
                # Don‘t re-raise the passed in exception. (issue27112)
                if exc is value:
                    return False
                # Likewise, avoid suppressing if a StopIteration exception
                # was passed to throw() and later wrapped into a RuntimeError
                # (see PEP 479).
                if exc.__cause__ is value:
                    return False
                raise
            except:
                # only re-raise if it‘s *not* the exception that was
                # passed to throw(), because __exit__() must not raise
                # an exception unless __exit__() itself failed.  But throw()
                # has to raise the exception to signal propagation, so this
                # fixes the impedance mismatch between the throw() protocol
                # and the __exit__() protocol.
                #
                if sys.exc_info()[1] is not value:
                    raise

def contextmanager(func):
    """@contextmanager decorator.

    Typical usage:

        @contextmanager
        def some_generator(<arguments>):
            <setup>
            try:
                yield <value>
            finally:
                <cleanup>

    This makes this:

        with some_generator(<arguments>) as <variable>:
            <body>

    equivalent to this:

        <setup>
        try:
            <variable> = <value>
            <body>
        finally:
            <cleanup>

    """
    @wraps(func)
    def helper(*args, **kwds):
        return _GeneratorContextManager(func, args, kwds)
    return helper
 
@contextmanager
def file_open(path):
    ‘‘‘ file open test‘‘‘
    try:
        f_obj = open(path,"w")
        yield f_obj
    except OSError:
        print("We had an error!")
    finally:
        print("Closing file")
        f_obj.close()

if __name__ == "__main__": 
    with file_open("contextlibtest.txt") as fobj:
        fobj.write("Testing context managers")
        logger.info("write file success")
View Code

关键输出:

2018-03-22 15:36:43,249 - __main__ - INFO - get generator by with:<generator object file_open at 0x01DE4870>
2018-03-22 15:36:43,249 - __main__ - INFO - doc: file open test
2018-03-22 15:36:43,249 - __main__ - INFO - __enter__:you can add you method
2018-03-22 15:36:43,249 - __main__ - INFO - you can do something in decorator
2018-03-22 15:36:43,249 - __main__ - INFO - @ContextDecorator():testfun test success
2018-03-22 15:36:43,249 - __main__ - INFO - write file success
2018-03-22 15:36:43,249 - __main__ - INFO - __exit__
2018-03-22 15:36:43,249 - __main__ - INFO - type:None

 




以上是关于Python之contextlib库及源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Python3之 contextlib

全链路压测之影子库及ShardingSphere实现影子库源码剖析

源码剖析@contextlib.contextmanager

源码剖析@contextlib.contextmanager

Python3标准库:contextlib上下文管理器工具

python的上下文管理(contextlib)