U
    .e                     @  s   d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ dd	lmZmZ dd
lmZ ertddlmZ dZG dd dZG dd deZG dd deZdS )zSimple messaging interface.    )annotationsN)deque)Empty)	monotonic)TYPE_CHECKING   )entity	messaging)maybe_channel)TracebackType)SimpleQueueSimpleBufferc                   @  s   e Zd ZeZdZdd Zdddddd	d
Zd#ddZd$ddZdd Z	d%ddZ
dd Zdd Zdd Zdd Zdd Zdd  Zd!d" ZeZdS )&
SimpleBaseFc                 C  s   | S N selfr   r   0/tmp/pip-unpacked-wheel-48hrr5dg/kombu/simple.py	__enter__   s    zSimpleBase.__enter__ztype[BaseException] | NonezBaseException | NonezTracebackType | NoneNone)exc_typeexc_valexc_tbreturnc                 C  s   |    d S r   )close)r   r   r   r   r   r   r   __exit__   s    zSimpleBase.__exit__c                 C  sD   t || _|| _|| _|| _| jjd | _t | _| j	| j
 d S )Nr   )r
   channelproducerconsumerno_ackZqueuesqueuer   bufferZregister_callback_receive)r   r   r   r   r   r   r   r   __init__#   s    
zSimpleBase.__init__TNc                 C  s   |s|   S |   t }|}| jr.| j S |d k	rF|dkrF|  z| jjjj	|d W n t
jk
rz   |  Y nX |d k	rt | }|| }qd S )Ng        )timeout)
get_nowait_consumer   r!   popleftr   r   
connectionclientZdrain_eventssocketr$   )r   blockr$   Z
time_start	remainingelapsedr   r   r   get,   s     

zSimpleBase.getc                 C  s&   | j j| j| jjd}|s"|  |S )N)r   accept)r    r.   r   r   r/   r   )r   mr   r   r   r%   N   s    zSimpleBase.get_nowaitc                 K  s"   | j j|f||||d| d S )N)
serializerrouting_keyheaderscompression)r   publish)r   messager1   r3   r4   r2   kwargsr   r   r   putT   s    
zSimpleBase.putc                 C  s
   | j  S r   )r   purger   r   r   r   clear]   s    zSimpleBase.clearc                 C  s   | j jdd\}}}|S )NT)Zpassive)r    Zqueue_declare)r   _sizer   r   r   qsize`   s    zSimpleBase.qsizec                 C  s   | j   d S r   )r   cancelr   r   r   r   r   d   s    zSimpleBase.closec                 C  s   | j | d S r   )r!   append)r   Zmessage_datar6   r   r   r   r"   g   s    zSimpleBase._receivec                 C  s    | j s| jj| jd d| _ d S )N)r   T)
_consumingr   consumer   r   r   r   r   r&   j   s    zSimpleBase._consumec                 C  s   |   S )z`len(self) -> self.qsize()`.)r=   r   r   r   r   __len__o   s    zSimpleBase.__len__c                 C  s   dS )NTr   r   r   r   r   __bool__s   s    zSimpleBase.__bool__)F)TN)NNNN)__name__
__module____qualname__r   r@   r   r   r#   r.   r%   r8   r:   r=   r   r"   r&   rB   rC   __nonzero__r   r   r   r   r      s$   
	
"  
	r   c                      s6   e Zd ZdZdZi Zi ZddiZd fdd	Z  Z	S )	r   z!Simple API for persistent queues.FtypedirectNc
                   s   |}
t | jf|pi }t | jf|p$i }t | jf|p6i }|d krH| j}t|
tjstj|f|}tj|||fd|i|}
|}n|
j	}|
j
}tj||
|	d}tj|||||d}t |||| d S )NZqueue_arguments)r/   )r1   r2   r4   )dict
queue_opts
queue_argsexchange_optsr   
isinstancer   QueueZExchangeexchanger2   r	   ZConsumerZProducersuperr#   )r   r   namer   rK   rL   rM   r1   r4   r/   r    rP   r2   r   r   	__class__r   r   r#      s4    
 zSimpleQueue.__init__)NNNNNNN)
rD   rE   rF   __doc__r   rK   rL   rM   r#   __classcell__r   r   rS   r   r   x   s            r   c                   @  s*   e Zd ZdZdZdddZddddZdS )r   z Simple API for ephemeral queues.TF)durableauto_deleteZ	transient)rW   Zdelivery_moderX   N)rD   rE   rF   rU   r   rK   rM   r   r   r   r   r      s   r   )rU   
__future__r   r*   collectionsr   r    r   timer   typingr    r   r	   r(   r
   typesr   __all__r   r   r   r   r   r   r   <module>   s   d#