U
    .e7                     @  s   d Z ddlmZ ddlZddlmZ ddlmZ ddlmZ	 ddl
mZ dd	lmZ dd
lmZ ddlmZ er|ddlmZ dd ZG dd de	ZG dd dZdS )z%Generic resource pool implementation.    )annotationsN)deque)Empty)	LifoQueue)TYPE_CHECKING   )
exceptions)register_after_fork)lazy)TracebackTypec                 C  s&   z|    W n tk
r    Y nX d S N)force_close_all	Exception)resource r   2/tmp/pip-unpacked-wheel-48hrr5dg/kombu/resource.py_after_fork_cleanup_resource   s    r   c                   @  s   e Zd ZdZdd ZdS )r   z#Last in first out version of Queue.c                 C  s   t  | _d S r   )r   queue)selfmaxsizer   r   r   _init   s    zLifoQueue._initN)__name__
__module____qualname____doc__r   r   r   r   r   r      s   r   c                   @  s   e Zd ZdZejZdZd&ddZdd Zdd	 Z	d'd
dZ
dd Zdd Zdd Zdd Zdd Zdd Zdd Zd(ddZd)ddZedd  Zejd!d  Zejd"re
ZeZd#Zd$d Z
d%d ZdS )*ResourcezPool of resources.FNc                 C  s^   || _ |pd| _d| _|d k	r"|n| j| _t | _t | _| jrRtd k	rRt| t	 | 
  d S )Nr   F)_limitpreload_closedclose_after_forkr   	_resourceset_dirtyr	   r   setup)r   limitr   r   r   r   r   __init__(   s    

zResource.__init__c                 C  s   t dd S )Nzsubclass responsibility)NotImplementedErrorr   r   r   r   r#   7   s    zResource.setupc                 C  s6   | j r"t| j| j kr"| | j | j|   d S r   )r$   lenr"   LimitExceededr    
put_nowaitnewr'   r   r   r   _add_when_empty:   s    zResource._add_when_emptyc                   s   j rtdjrzjj||d W n tk
rD     Y qX z  W n8 tk
r   t	 t
r|j  n
   Y nX j  qqn   fdd}| _ S )a  Acquire resource.

        Arguments:
        ---------
            block (bool): If the limit is exceeded,
                then block until there is an available item.
            timeout (float): Timeout to wait
                if ``block`` is true.  Default is :const:`None` (forever).

        Raises
        ------
            LimitExceeded: if block is false and the limit has been exceeded.
        zAcquire on closed pool)blocktimeoutc                     s      dS )a'  Release resource so it can be used by another thread.

            Warnings:
            --------
                The caller is responsible for discarding the object,
                and to never use the resource again.  A new resource must
                be acquired if so needed.
            N)releaser   Rr   r   r   r/   h   s    	z!Resource.acquire.<locals>.release)r   RuntimeErrorr$   r    getr   r,   prepareBaseException
isinstancer
   r*   r/   r"   addr+   )r   r-   r.   r/   r   r0   r   acquireB   s(    


zResource.acquirec                 C  s   |S r   r   r   r   r   r   r   r4   v   s    zResource.preparec                 C  s   |   d S r   )closer9   r   r   r   close_resourcey   s    zResource.close_resourcec                 C  s   d S r   r   r9   r   r   r   release_resource|   s    zResource.release_resourcec                 C  s    | j r| j| | | dS )zqReplace existing resource with a new instance.

        This can be used in case of defective resources.
        N)r$   r"   discardr;   r9   r   r   r   replace   s    zResource.replacec                 C  s8   | j r*| j| | j| | | n
| | d S r   )r$   r"   r=   r    r*   r<   r;   r9   r   r   r   r/      s
    zResource.releasec                 C  s   d S r   r   r9   r   r   r   collect_resource   s    zResource.collect_resourcec                 C  s   | j r
dS d| _ | j}| j}z| }W n tk
r@   Y qhY nX z| | W q tk
rd   Y qX qz|j }W n tk
r   Y qY nX z| | W qh tk
r   Y qhX qhdS )zClose and remove all resources in the pool (also those in use).

        Used to close resources from parent processes after fork
        (e.g. sockets/connections).
        NT)	r   r"   r    popKeyErrorr?   AttributeErrorr   
IndexError)r   Zdirtyr   Zdresresr   r   r   r      s*    

zResource.force_close_allc                 C  s   | j }| jrDd|  k r"| j k rDn n|sD|s@td| j |d}|| _ |rpz|   W n tk
rn   Y nX |   ||k r| j|dkd d S )Nr   z,Can't shrink pool when in use: was={} now={}T)collect)r   r"   r2   formatr   r   r#   _shrink_down)r   r$   forceignore_errorsresetZ
prev_limitr   r   r   resize   s&    $ zResource.resizeTc              	   C  s\   G dd d}| j }t|d| 0 t|j| jkrN|j }|r$| | q$W 5 Q R X d S )Nc                   @  s(   e Zd Zdd Zddddddd	Zd
S )z#Resource._shrink_down.<locals>.Noopc                 S  s   d S r   r   r'   r   r   r   	__enter__   s    z-Resource._shrink_down.<locals>.Noop.__enter__typer   r   None)exc_typeexc_valexc_tbreturnc                 S  s   d S r   r   )r   rO   rP   rQ   r   r   r   __exit__   s    z,Resource._shrink_down.<locals>.Noop.__exit__N)r   r   r   rL   rS   r   r   r   r   Noop   s   rT   mutex)r    getattrr(   r   r$   popleftr?   )r   rE   rT   r   r1   r   r   r   rG      s    
zResource._shrink_downc                 C  s   | j S r   )r   r'   r   r   r   r$      s    zResource.limitc                 C  s   |  | d S r   )rK   )r   r$   r   r   r   r$      s    ZKOMBU_DEBUG_POOLr   c                 O  s~   dd l }| jd  }| _td| d| jj  | j||}||_td| d| jj  t|dsjg |_|j	|
  |S )Nr   r   +z	 ACQUIRE -acquired_by)	traceback_next_resource_idprint	__class__r   _orig_acquire_resource_idhasattrrZ   appendformat_stack)r   argskwargsr[   idrr   r   r   r8      s    
c                 C  sR   |j }td| d| jj  | |}td| d| jj  |  jd8  _|S )NrX   z	 RELEASE rY   r   )r`   r]   r^   r   _orig_releaser\   )r   r   rf   rg   r   r   r   r/      s    
)NNN)FN)FFF)T)r   r   r   r   r   r)   r   r%   r#   r,   r8   r4   r;   r<   r>   r/   r?   r   rK   rG   propertyr$   setterosenvironr3   r_   rh   r\   r   r   r   r   r   !   s4   

4	!



r   )r   
__future__r   rk   collectionsr   r   r   r   Z
_LifoQueuetypingr    r   Zutils.compatr	   Zutils.functionalr
   typesr   r   r   r   r   r   r   <module>   s   