U
    .e                     @  s   d Z ddlmZ ddlZddlZddlmZ ddlmZ ddl	m
Z
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ erddlmZ dZdefdefdefdefdefdZdd Zdd Zdd ZG dd dZG dd dZG dd deZede d d!d"d#gdd$Z!G d%d& d&Z"dS )'zBase transport interface.    )annotationsN)TYPE_CHECKING)RecoverableConnectionError)ChannelErrorConnectionError)Message)
dictfilter)cached_property)maybe_s_to_ms)TracebackType)r   
StdChannel
Management	Transportz	x-expireszx-message-ttlzx-max-lengthzx-max-length-byteszx-max-priority)expiresZmessage_ttl
max_lengthZmax_length_bytesZmax_priorityc                 K  s.   t tdd | D }|r*t| f|S | S )a!  Convert queue arguments to RabbitMQ queue arguments.

    This is the implementation for Channel.prepare_queue_arguments
    for AMQP-based transports.  It's used by both the pyamqp and librabbitmq
    transports.

    Arguments:
        arguments (Mapping):
            User-supplied arguments (``Queue.queue_arguments``).

    Keyword Arguments:
        expires (float): Queue expiry time in seconds.
            This will be converted to ``x-expires`` in int milliseconds.
        message_ttl (float): Message TTL in seconds.
            This will be converted to ``x-message-ttl`` in int milliseconds.
        max_length (int): Max queue length (in number of messages).
            This will be converted to ``x-max-length`` int.
        max_length_bytes (int): Max queue size in bytes.
            This will be converted to ``x-max-length-bytes`` int.
        max_priority (int): Max priority steps for queue.
            This will be converted to ``x-max-priority`` int.

    Returns
    -------
        Dict: RabbitMQ compatible queue arguments.
    c                 s  s   | ]\}}t ||V  qd S N)_to_rabbitmq_queue_argument).0keyvalue r   8/tmp/pip-unpacked-wheel-48hrr5dg/kombu/transport/base.py	<genexpr>=   s   z.to_rabbitmq_queue_arguments.<locals>.<genexpr>)r   dictitems)	argumentsoptionspreparedr   r   r   to_rabbitmq_queue_arguments!   s    

r   c                 C  s$   t |  \}}||d k	r||n|fS r   )RABBITMQ_QUEUE_ARGUMENTS)r   r   opttypr   r   r   r   D   s    r   c                 C  s   t d| j|S )Nz<Transport {0.__module__}.{0.__name__} does not implement {1})NotImplementedErrorformat	__class__)objmethodr   r   r   
_LeftBlankJ   s     r'   c                   @  sX   e Zd ZdZdZdd Zdd Zdd Zd	d
 Zdd Z	dd Z
dddddddZdS )r   zStandard channel base class.Nc                 O  s   ddl m} || f||S )Nr   )Consumer)kombu.messagingr(   )selfargskwargsr(   r   r   r   r(   U   s    zStdChannel.Consumerc                 O  s   ddl m} || f||S )Nr   )Producer)r)   r-   )r*   r+   r,   r-   r   r   r   r-   Y   s    zStdChannel.Producerc                 C  s   t | dd S Nget_bindingsr'   r*   r   r   r   r/   ]   s    zStdChannel.get_bindingsc                 C  s   dS )zCallback called after RPC reply received.

        Notes
        -----
           Reply queue semantics: can be used to delete the queue
           after transient reply message received.
        Nr   )r*   queuer   r   r   after_reply_message_received`   s    z'StdChannel.after_reply_message_receivedc                 K  s   |S r   r   )r*   r   r,   r   r   r   prepare_queue_argumentsi   s    z"StdChannel.prepare_queue_argumentsc                 C  s   | S r   r   r1   r   r   r   	__enter__l   s    zStdChannel.__enter__ztype[BaseException] | NonezBaseException | NonezTracebackType | NoneNone)exc_typeexc_valexc_tbreturnc                 C  s   |    d S r   )close)r*   r7   r8   r9   r   r   r   __exit__o   s    zStdChannel.__exit__)__name__
__module____qualname____doc__Zno_ack_consumersr(   r-   r/   r3   r4   r5   r<   r   r   r   r   r   P   s   	r   c                   @  s    e Zd ZdZdd Zdd ZdS )r   z!AMQP Management API (incomplete).c                 C  s
   || _ d S r   )	transport)r*   rA   r   r   r   __init__{   s    zManagement.__init__c                 C  s   t | dd S r.   r0   r1   r   r   r   r/   ~   s    zManagement.get_bindingsN)r=   r>   r?   r@   rB   r/   r   r   r   r   r   x   s   r   c                   @  s(   e Zd ZdZdd Zdd Zdd ZdS )	
Implementsz/Helper class used to define transport features.c                 C  s,   z
| | W S  t k
r&   t|Y nX d S r   )KeyErrorAttributeError)r*   r   r   r   r   __getattr__   s    
zImplements.__getattr__c                 C  s   || |< d S r   r   )r*   r   r   r   r   r   __setattr__   s    zImplements.__setattr__c                 K  s   | j | f|S r   )r$   )r*   r,   r   r   r   extend   s    zImplements.extendN)r=   r>   r?   r@   rF   rG   rH   r   r   r   r   rC      s   rC   FdirectZtopicZfanoutheaders)asynchronousZexchange_type
heartbeatsc                   @  s  e Zd ZdZeZdZdZdZefZ	e
fZdZdZdZe Zdd Zdd Zd	d
 Zdd Zdd Zdd Zd3ddZdd Zdd Zdd Zdd Zdd Zejej e!j"e!j#ffddZ$d d! Z%d"d# Z&d4d%d%d&d'd(Z'e(d)d* Z)d+d, Z*e+d-d. Z,e(d/d0 Z-e(d1d2 Z.dS )5r   zBase class for transports.NFN/Ac                 K  s
   || _ d S r   )client)r*   rN   r,   r   r   r   rB      s    zTransport.__init__c                 C  s   t | dd S )Nestablish_connectionr0   r1   r   r   r   rO      s    zTransport.establish_connectionc                 C  s   t | dd S )Nclose_connectionr0   r*   
connectionr   r   r   rP      s    zTransport.close_connectionc                 C  s   t | dd S )Ncreate_channelr0   rQ   r   r   r   rS      s    zTransport.create_channelc                 C  s   t | dd S )Nclose_channelr0   rQ   r   r   r   rT      s    zTransport.close_channelc                 K  s   t | dd S )Ndrain_eventsr0   )r*   rR   r,   r   r   r   rU      s    zTransport.drain_events   c                 C  s   d S r   r   )r*   rR   Zrater   r   r   heartbeat_check   s    zTransport.heartbeat_checkc                 C  s   dS )NrM   r   r1   r   r   r   driver_version   s    zTransport.driver_versionc                 C  s   dS )Nr   r   rQ   r   r   r   get_heartbeat_interval   s    z Transport.get_heartbeat_intervalc                 C  s   d S r   r   r*   rR   loopr   r   r   register_with_event_loop   s    z"Transport.register_with_event_loopc                 C  s   d S r   r   rZ   r   r   r   unregister_from_event_loop   s    z$Transport.unregister_from_event_loopc                 C  s   dS NTr   rQ   r   r   r   verify_connection   s    zTransport.verify_connectionc                   s    j  fdd  S )Nc              
     sz   j stdzdd W nL k
r2   Y d S  k
rh } z|jkrVW Y 
d S  W 5 d }~X Y nX |  |  d S )NzSocket was disconnectedr   )timeout)	connectedr   errnoZ	call_soon)r[   exc_read_unavailrR   rU   errorr`   r   r   re      s    

z%Transport._make_reader.<locals>._read)rU   )r*   rR   r`   rg   rf   r   rd   r   _make_reader   s    zTransport._make_readerc                 C  s   dS r^   r   rQ   r   r   r   qos_semantics_matches_spec   s    z$Transport.qos_semantics_matches_specc                 C  s*   | j }|d kr| | }| _ || d S r   )_Transport__readerrh   )r*   rR   r[   readerr   r   r   on_readable   s    zTransport.on_readable**str)urir:   c                 C  s
   t  dS )z(Customise the display format of the URI.N)r"   )r*   ro   Zinclude_passwordmaskr   r   r   as_uri   s    zTransport.as_uric                 C  s   i S r   r   r1   r   r   r   default_connection_params   s    z#Transport.default_connection_paramsc                 O  s
   |  | S r   )r   )r*   r+   r,   r   r   r   get_manager  s    zTransport.get_managerc                 C  s   |   S r   )rs   r1   r   r   r   manager  s    zTransport.managerc                 C  s   | j jS r   )
implementsrL   r1   r   r   r   supports_heartbeats	  s    zTransport.supports_heartbeatsc                 C  s   | j jS r   )ru   rK   r1   r   r   r   supports_ev  s    zTransport.supports_ev)rV   )Frm   )/r=   r>   r?   r@   r   rN   Zcan_parse_urldefault_portr   Zconnection_errorsr   Zchannel_errorsZdriver_typeZdriver_namerj   default_transport_capabilitiesrH   ru   rB   rO   rP   rS   rT   rU   rW   rX   rY   r\   r]   r_   socketr`   rg   rb   EAGAINZEINTRrh   ri   rl   rq   propertyrr   rs   r	   rt   rv   rw   r   r   r   r   r      sN   
 




r   )#r@   
__future__r   rb   rz   typingr   Zamqp.exceptionsr   Zkombu.exceptionsr   r   Zkombu.messager   Zkombu.utils.functionalr   Zkombu.utils.objectsr	   Zkombu.utils.timer
   typesr   __all__intr   r   r   r'   r   r   r   rC   	frozensetry   r   r   r   r   r   <module>   s>   	#(
