U
    .e                     @  s  d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ dd	lmZ dd
lmZ dZdgZg Ze ZejdZdd ZG dd deZG dd deZdd ZG dd deZeeedZG dd deZeeedZdd Z dd Z!d$d d!Z"d"d# Z#dS )%zPublic resource pools.    )annotationsN)chain   )Resource)Producer)EqualityDict)register_after_fork)lazy)ProducerPool	PoolGroupregister_groupconnections	producers	get_limit	set_limitreset
   ZKOMBU_DISABLE_LIMIT_PROTECTIONc                 C  s   |    d S N)cleargroup r   //tmp/pip-unpacked-wheel-48hrr5dg/kombu/pools.py_after_fork_cleanup_group   s    r   c                      sd   e Zd ZdZeZdZ fddZdd Zdd Zd	d
 Z	dd Z
dd Zdd Z fddZ  ZS )r
   z*Pool of :class:`kombu.Producer` instances.Tc                   s,   || _ |dd p| j| _t j|| d S )Nr   )r   popr   super__init__)selfr   argskwargs	__class__r   r   r       s    zProducerPool.__init__c                 C  s   | j jddS )NT)block)r   acquirer   r   r   r   _acquire_connection%   s    z ProducerPool._acquire_connectionc                 C  s8   |   }z| |W S  tk
r2   |   Y nX d S r   )r%   r   BaseExceptionrelease)r   connr   r   r   create_producer(   s    zProducerPool.create_producerc                 C  s
   t | jS r   )r	   r)   r$   r   r   r   new0   s    zProducerPool.newc                 C  s*   | j r&t| j D ]}| j|   qd S r   )limitrangeZ	_resource
put_nowaitr*   )r   _r   r   r   setup3   s    zProducerPool.setupc                 C  s   d S r   r   r   resourcer   r   r   close_resource8   s    zProducerPool.close_resourcec                 C  sR   t |r| }|jd krN|  }z|| W n tk
rL   |   Y nX |S r   )callableZ_channelr%   Zreviver&   r'   )r   pr(   r   r   r   prepare;   s    
zProducerPool.preparec                   s&   |j r|j   d |_t | d S r   )Z__connection__r'   Zchannelr   r0   r    r   r   r'   G   s    
zProducerPool.release)__name__
__module____qualname____doc__r   close_after_forkr   r%   r)   r*   r/   r2   r5   r'   __classcell__r   r   r    r   r
      s   r
   c                   @  s*   e Zd ZdZd
ddZdd Zdd	 ZdS )r   zCollection of resource pools.NTc                 C  s(   || _ || _| jr$td k	r$t| t d S r   )r+   r:   r   r   )r   r+   r:   r   r   r   r   Q   s    zPoolGroup.__init__c                 C  s   t dd S )Nz!PoolGroups must define ``create``)NotImplementedError)r   r1   r+   r   r   r   createW   s    zPoolGroup.createc                 C  s,   | j }|tkrt }| || }| |< |S r   )r+   use_global_limitr   r=   )r   r1   r+   kr   r   r   __missing__Z   s
    zPoolGroup.__missing__)NT)r6   r7   r8   r9   r   r=   r@   r   r   r   r   r   N   s   
r   c                 C  s   t |  | S )z*Register group (can be used as decorator).)_groupsappendr   r   r   r   r   b   s    
r   c                   @  s   e Zd ZdZdd ZdS )ConnectionszCollection of connection pools.c                 C  s   |j |dS Nr+   )ZPoolr   
connectionr+   r   r   r   r=   k   s    zConnections.createNr6   r7   r8   r9   r=   r   r   r   r   rC   h   s   rC   rE   c                   @  s   e Zd ZdZdd ZdS )	ProducerszCollection of producer pools.c                 C  s   t t| |dS rD   )r
   r   rF   r   r   r   r=   u   s    zProducers.createNrH   r   r   r   r   rI   r   s   rI   c                   C  s   t dd tD  S )Nc                 s  s"   | ]}|r|  ntg V  qd S r   )valuesiter).0gr   r   r   	<genexpr>}   s     z_all_pools.<locals>.<genexpr>)r   rA   r   r   r   r   
_all_pools|   s    rO   c                   C  s   t d S )z"Get current connection pool limit.r   )_limitr   r   r   r   r      s    r   Fc                 C  s>   | pd} t d pd}| |kr:| t d< t D ]}||  q*| S )zSet new connection pool limit.r   )rP   rO   resize)r+   forceZreset_afterignore_errorsZglimitpoolr   r   r   r      s    
r   c               	   O  sD   t  D ]&}z|  W q tk
r*   Y qX qtD ]}|  q2dS )z*Reset all pools by closing open resources.N)rO   Zforce_close_all	ExceptionrA   r   )r   r   rT   r   r   r   r   r      s    
r   )FFF)$r9   
__future__r   os	itertoolsr   rG   r   Z	messagingr   Zutils.collectionsr   Zutils.compatr   Zutils.functionalr	   __all__rP   rA   objectr>   environgetZdisable_limit_protectionr   r
   r   r   rC   r   rI   r   rO   r   r   r   r   r   r   r   <module>   s2   4
