U
    .e                     @  s  d Z ddlmZ ddlZddlZddlZddlZddlmZ ddlm	Z	m
Z
mZ ddlmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZ ddlmZmZ ddlmZ ddlmZ ddl m!Z! ddl"m#Z#m$Z$ ddl%m&Z& ddl'm(Z( ddl)m*Z* erddl+m,Z, dZ-dZ.dZ/dZ0dZ1dZ2ee3Z4eddZ5eddZ6G d d! d!Z7G d"d# d#e8Z9G d$d% d%e:Z;G d&d' d'Z<G d(d) d)Z=G d*d+ d+ej>Z>G d,d- d-Z?G d.d/ d/e?ej@ZAG d0d1 d1ejBZBG d2d3 d3ejCZCdS )4zPVirtual transport implementation.

Emulates the AMQ API for non-AMQ transports.
    )annotationsN)array)OrderedDictdefaultdict
namedtuple)count)Finalize)Empty)	monotonicsleep)TYPE_CHECKING)queue_declare_ok_t)ChannelErrorResourceError)
get_logger)base)emergency_dump_state)bytes_to_strstr_to_bytes)	FairCycleuuid   )STANDARD_EXCHANGE_TYPES)TracebackTypeHzlMessage could not be delivered: No queues bound to exchange {exchange!r} using binding key {routing_key!r}.
zkCannot redeclare exchange {0!r} in vhost {1!r} with different type, durable, autodelete or arguments value.z;Requeuing undeliverable message for queue %r: No consumers.z)Restoring {0!r} unacknowledged message(s)z#UNABLE TO RESTORE {0} MESSAGES: {1}binding_key_t)queueexchangerouting_keyqueue_binding_t)r   r   	argumentsc                   @  s    e Zd ZdZdd Zdd ZdS )Base64zBase64 codec.c                 C  s   t tt|S N)r   base64	b64encoder   selfs r)   @/tmp/pip-unpacked-wheel-48hrr5dg/kombu/transport/virtual/base.pyencodeF   s    zBase64.encodec                 C  s   t t|S r#   )r$   	b64decoder   r&   r)   r)   r*   decodeI   s    zBase64.decodeN)__name__
__module____qualname____doc__r+   r-   r)   r)   r)   r*   r"   C   s   r"   c                   @  s   e Zd ZdZdS )NotEquivalentErrorzAEntity declaration is not equivalent to the previous declaration.Nr.   r/   r0   r1   r)   r)   r)   r*   r2   M   s   r2   c                   @  s   e Zd ZdZdS )UndeliverableWarningz.The message could not be delivered to a queue.Nr3   r)   r)   r)   r*   r4   Q   s   r4   c                   @  sV   e Zd ZdZdZdZdZdddZdd Zdd Z	d	d
 Z
dd Zdd Zdd ZdS )BrokerStatez2Broker state holds exchanges, queues and bindings.Nc                 C  s&   |d kri n|| _ i | _tt| _d S r#   )	exchangesbindingsr   setqueue_index)r'   r6   r)   r)   r*   __init__r   s    zBrokerState.__init__c                 C  s"   | j   | j  | j  d S r#   )r6   clearr7   r9   r'   r)   r)   r*   r;   w   s    

zBrokerState.clearc                 C  s   |||f| j kS r#   )r7   )r'   r   r   r   r)   r)   r*   has_binding|   s    zBrokerState.has_bindingc                 C  s.   t |||}| j|| | j| | d S r#   )r   r7   
setdefaultr9   add)r'   r   r   r   r!   keyr)   r)   r*   binding_declare   s    zBrokerState.binding_declarec                 C  sB   t |||}z| j|= W n tk
r,   Y nX | j| | d S r#   )r   r7   KeyErrorr9   remove)r'   r   r   r   r@   r)   r)   r*   binding_delete   s    zBrokerState.binding_deletec                   s<   z j |}W n tk
r$   Y nX  fdd|D  d S )Nc                   s   g | ]} j |d qS r#   )r7   pop).0Zbindingr<   r)   r*   
<listcomp>   s     z5BrokerState.queue_bindings_delete.<locals>.<listcomp>)r9   rE   rB   )r'   r   r7   r)   r<   r*   queue_bindings_delete   s
    z!BrokerState.queue_bindings_deletec                   s    fdd j | D S )Nc                 3  s$   | ]}t |j|j j| V  qd S r#   )r    r   r   r7   )rF   r@   r<   r)   r*   	<genexpr>   s   z-BrokerState.queue_bindings.<locals>.<genexpr>)r9   r'   r   r)   r<   r*   queue_bindings   s    
zBrokerState.queue_bindings)N)r.   r/   r0   r1   r6   r7   r9   r:   r;   r=   rA   rD   rH   rK   r)   r)   r)   r*   r5   U   s   

	r5   c                   @  s~   e Zd ZdZdZdZdZdZdddZdd Z	d	d
 Z
dd Zdd Zdd Zdd ZdddZdd ZdddZdd ZdS )QoSzQuality of Service guarantees.

    Only supports `prefetch_count` at this point.

    Arguments:
    ---------
        channel (ChannelT): Connection channel.
        prefetch_count (int): Initial prefetch count (defaults to 0).
    r   NTc                 C  sR   || _ |pd| _t | _d| j_t | _| jj| _| jj	| _
t| | jdd| _d S )Nr   Fr   )Zexitpriority)channelprefetch_countr   
_deliveredrestoredr8   _dirtyr?   
_quick_ack__setitem___quick_appendr   restore_unacked_once_on_collect)r'   rM   rN   r)   r)   r*   r:      s    


  zQoS.__init__c                 C  s$   | j }| p"t| jt| j |k S )zReturn true if the channel can be consumed from.

        Used to ensure the client adhers to currently active
        prefetch limits.
        )rN   lenrO   rQ   r'   Zpcountr)   r)   r*   can_consume   s    zQoS.can_consumec                 C  s,   | j }|r(t|t| jt| j  dS dS )a  Return the maximum number of messages allowed to be returned.

        Returns an estimated number of messages that a consumer may be allowed
        to consume at once from the broker.  This is used for services where
        bulk 'get message' calls are preferred to many individual 'get message'
        calls - like SQS.

        Returns
        -------
            int: greater than zero.
        r   N)rN   maxrW   rO   rQ   rX   r)   r)   r*   can_consume_max_estimate   s    zQoS.can_consume_max_estimatec                 C  s   | j r|   | || dS )z&Append message to transactional state.N)rQ   _flushrT   )r'   messagedelivery_tagr)   r)   r*   append   s    z
QoS.appendc                 C  s
   | j | S r#   )rO   r'   r^   r)   r)   r*   get   s    zQoS.getc                 C  sD   | j }| j}z| }W n tk
r0   Y q@Y nX ||d qdS )z'Flush dirty (acked/rejected) tags from.N)rQ   rO   rE   rB   )r'   Zdirty	deliveredZ	dirty_tagr)   r)   r*   r\      s    
z
QoS._flushc                 C  s   |  | dS )z8Acknowledge message and remove from transactional state.N)rR   r`   r)   r)   r*   ack   s    zQoS.ackFc                 C  s$   |r| j | j|  | | dS )z4Remove from transactional state and requeue message.N)rM   _restore_at_beginningrO   rR   r'   r^   requeuer)   r)   r*   reject   s    z
QoS.rejectc              
   C  s   |    | j}g }| jj}|j}|rz| \}}W n tk
rJ   Y qY nX z|| W q  tk
r } z|||f W 5 d}~X Y q X q |  |S )z$Restore all unacknowledged messages.N)	r\   rO   rM   _restorepopitemrB   BaseExceptionr_   r;   )r'   rb   errorsrestoreZpop_message_r]   excr)   r)   r*   restore_unacked   s     
"zQoS.restore_unackedc                 C  s   | j   |   |dkr tjn|}| j}| jr8| jjs<dS t	|ddrT|rPt
dS z`|rttt| j|d |  }|rtt| \}}ttt|||d t||d W 5 d|_X dS )zRestore all unacknowledged messages at shutdown/gc collect.

        Note:
        ----
            Can only be called once for each instance, subsequent
            calls will be ignored.
        NrP   T)file)stderr)rV   cancelr\   sysrq   rO   restore_at_shutdownrM   
do_restoregetattrAssertionErrorrP   printRESTORING_FMTformatrW   ro   listzipRESTORE_PANIC_FMTr   )r'   rq   stateZ
unrestoredrk   messagesr)   r)   r*   rU     s,    
zQoS.restore_unacked_oncec                 O  s   dS )a  Restore any pending unackwnowledged messages.

        To be filled in for visibility_timeout style implementations.

        Note:
        ----
            This is implementation optional, and currently only
            used by the Redis transport.
        Nr)   )r'   argskwargsr)   r)   r*   restore_visible2  s    zQoS.restore_visible)r   )F)N)r.   r/   r0   r1   rN   rO   rQ   rt   r:   rY   r[   r_   ra   r\   rc   rg   ro   rU   r   r)   r)   r)   r*   rL      s    
	

 rL   c                      s*   e Zd ZdZd fdd	Zdd Z  ZS )MessagezMessage object.Nc                   st   || _ |d }|d}|r.|||d}t jf |||d |d|d|d||dd	d
	| d S )N
propertiesbodybody_encodingr^   content-typecontent-encodingheadersdelivery_infozutf-8)	r   rM   r^   content_typecontent_encodingr   r   r   Z
postencode)_rawra   decode_bodysuperr:   )r'   payloadrM   r   r   r   	__class__r)   r*   r:   A  s$    

zMessage.__init__c                 C  sJ   | j }| j| j|d\}}t| j}|dd  ||| j| j	|dS )Nr   compression)r   r   r   r   r   )
r   rM   encode_bodyr   ra   dictr   rE   r   r   )r'   propsr   rm   r   r)   r)   r*   serializableS  s    

zMessage.serializable)N)r.   r/   r0   r1   r:   r   __classcell__r)   r)   r   r*   r   >  s   r   c                   @  s\   e Zd ZdZdddZdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dddZdd ZdS )AbstractChannelzAbstract channel interface.

    This is an abstract class defining the channel methods
    you'd usually want to implement in a virtual channel.

    Note:
    ----
        Do not subclass directly, but rather inherit
        from :class:`Channel`.
    Nc                 C  s   t ddS )zGet next message from `queue`.z$Virtual channels must implement _getNNotImplementedError)r'   r   timeoutr)   r)   r*   _geto  s    zAbstractChannel._getc                 C  s   t ddS )zPut `message` onto `queue`.z$Virtual channels must implement _putNr   )r'   r   r]   r)   r)   r*   _puts  s    zAbstractChannel._putc                 C  s   t ddS )z!Remove all messages from `queue`.z&Virtual channels must implement _purgeNr   rJ   r)   r)   r*   _purgew  s    zAbstractChannel._purgec                 C  s   dS )z<Return the number of messages in `queue` as an :class:`int`.r   r)   rJ   r)   r)   r*   _size{  s    zAbstractChannel._sizec                 O  s   |  | dS )zDelete `queue`.

        Note:
        ----
            This just purges the queue, if you need to do more you can
            override this method.
        Nr   )r'   r   r   r   r)   r)   r*   _delete  s    zAbstractChannel._deletec                 K  s   dS )zCreate new queue.

        Note:
        ----
            Your transport can override this method if it needs
            to do something whenever a new queue is declared.
        Nr)   r'   r   r   r)   r)   r*   
_new_queue  s    zAbstractChannel._new_queuec                 K  s   dS )zVerify that queue exists.

        Returns
        -------
            bool: Should return :const:`True` if the queue exists
                or :const:`False` otherwise.
        Tr)   r   r)   r)   r*   
_has_queue  s    zAbstractChannel._has_queuec                 C  s
   | |S )z-Poll a list of queues for available messages.)ra   )r'   cyclecallbackr   r)   r)   r*   _poll  s    zAbstractChannel._pollc                 C  s   |  |}||| d S r#   )r   )r'   r   r   r]   r)   r)   r*   _get_and_deliver  s    
z AbstractChannel._get_and_deliver)N)N)r.   r/   r0   r1   r   r   r   r   r   r   r   r   r   r)   r)   r)   r*   r   c  s   

	

r   c                   @  s  e Zd ZdZeZeZdZeeZ	dZ
de iZdZedZdZdZdZdZd	Zd
d ZdgddZdhddZdiddZdjddZdd ZdkddZdlddZdmddZdnddZd d! Zd"d# Z d$d% Z!d&d' Z"d(d) Z#d*d+ Z$d,d- Z%dod.d/Z&dpd0d1Z'dqd2d3Z(drd4d5Z)dsd6d7Z*d8d9 Z+d:d; Z,dtd<d=Z-dud>d?Z.d@dA Z/dBdC Z0dvdDdEZ1dFdG Z2dwdHdIZ3dxdJdKZ4dLdM Z5dydNdOZ6dzdPdQZ7dRdS Z8dTdU Z9dVdWdXdYdZd[d\Z:e;d]d^ Z<e;d_d` Z=e;dadb Z>d{dcddZ?dedf Z@dS )|ChannelzVirtual channel.

    Arguments:
    ---------
        connection (ConnectionT): The transport instance this
            channel is part of.
    TFr$   r   N)r   deadletter_queuer   	   c              	     s   | _ t  _d  _i  _g  _d  _d _ fdd j	 D  _ 
  _ j jj} jD ].}zt |||  W q` tk
r   Y q`X q`d S )NFc                   s   i | ]\}}|| qS r)   r)   )rF   typclsr<   r)   r*   
<dictcomp>  s     z$Channel.__init__.<locals>.<dictcomp>)
connectionr8   
_consumers_cycle_tag_to_queue_active_queues_qosclosedexchange_typesitems_get_free_channel_id
channel_idclienttransport_optionsfrom_transport_optionssetattrrB   )r'   r   r   Ztoptsopt_namer)   r<   r*   r:     s"    



zChannel.__init__directc           	   	   C  s   |pd}|pd| }|rH|| j jkrDtd|| jjjp8dddddS zD| j j| }| |||||||st	t
|| jjjpdW n0 tk
r   ||||pi g d	| j j|< Y nX dS )
zDeclare exchange.r   zamq.%sz*NOT_FOUND - no exchange {!r} in vhost {!r}/2   
   Channel.exchange_declare404N)typedurableauto_deleter!   table)r~   r6   r   rz   r   r   virtual_hosttypeofZ
equivalentr2   NOT_EQUIVALENT_FMTrB   )	r'   r   r   r   r   r!   nowaitpassiveprevr)   r)   r*   exchange_declare  s@         
r   c                 C  s:   |  |D ]\}}}| j|ddd q
| jj|d dS )z'Delete `exchange` and all its bindings.T)	if_unusedif_emptyN)	get_tablequeue_deleter~   r6   rE   )r'   r   r   r   rkeyrm   r   r)   r)   r*   exchange_delete	  s    zChannel.exchange_deletec                 K  sb   |pdt   }|rB| j|f|sBtd|| jjjp4ddddn| j|f| t|| 	|dS )zDeclare queue.z
amq.gen-%sz'NOT_FOUND - no queue {!r} in vhost {!r}r   r   Channel.queue_declarer   r   )
r   r   r   rz   r   r   r   r   r   r   )r'   r   r   r   r)   r)   r*   queue_declare  s       r   c           	      K  sd   |r|  |rdS | j|D ]4\}}}| |||||}| j||f|| q| j| dS )zDelete queue.N)r   r~   rK   r   prepare_bindr   rH   )	r'   r   r   r   r   r   r   r   metar)   r)   r*   r     s    
   zChannel.queue_deletec                 C  s   |  | d S r#   )r   rJ   r)   r)   r*   after_reply_message_received'  s    z$Channel.after_reply_message_received c                 C  s   t dd S )Nz(transport does not support exchange_bindr   r'   destinationsourcer   r   r!   r)   r)   r*   exchange_bind*  s    zChannel.exchange_bindc                 C  s   t dd S )Nz*transport does not support exchange_unbindr   r   r)   r)   r*   exchange_unbind.  s    zChannel.exchange_unbindc                 K  s|   |pd}| j |||rdS | j |||| | j j| dg }| |||||}|| | jrx| j	|f|  dS )z.Bind `queue` to `exchange` with `routing key`.z
amq.directNr   )
r~   r=   rA   r6   r>   r   r   r_   supports_fanoutZ_queue_bind)r'   r   r   r   r!   r   r   r   r)   r)   r*   
queue_bind2  s    
   
zChannel.queue_bindc                   sj   | j ||| z| |}W n tk
r4   Y d S X | |||||  fdd|D |d d < d S )Nc                   s   g | ]}| kr|qS r)   r)   )rF   r   Zbinding_metar)   r*   rG   P  s      z(Channel.queue_unbind.<locals>.<listcomp>)r~   rD   r   rB   r   r   )r'   r   r   r   r!   r   r   r)   r   r*   queue_unbindC  s    
   zChannel.queue_unbindc                   s    fdd j jD S )Nc                 3  s.   | ]&}  |D ]\}}}|||fV  qqd S r#   )r   )rF   r   r   patternr   r<   r)   r*   rI   S  s    z(Channel.list_bindings.<locals>.<genexpr>r~   r6   r<   r)   r<   r*   list_bindingsR  s    
zChannel.list_bindingsc                 K  s
   |  |S )z%Remove all ready messages from queue.r   r   r)   r)   r*   queue_purgeW  s    zChannel.queue_purgec                 C  s   t  S r#   r   r<   r)   r)   r*   _next_delivery_tag[  s    zChannel._next_delivery_tagc                 K  s:   |  ||| |r*| |j|||f|S | j||f|S )zPublish message.)_inplace_augment_messager   Zdeliverr   )r'   r]   r   r   r   r)   r)   r*   basic_publish^  s    
  zChannel.basic_publishc                 C  sJ   |  |d | j\|d< }|d }|j||  d |d j||d d S )Nr   r   )r   r^   r   r   r   )r   r   updater   )r'   r]   r   r   r   r   r)   r)   r*   r   h  s     z Channel._inplace_augment_messagec                   sJ   |j |< j|  fdd}|jj|< j|   dS )zConsume from `queue`.c                   s*   j | d}s"j||j  |S )NrM   )r   qosr_   r^   )raw_messager]   r   no_ackr'   r)   r*   	_callback{  s    z(Channel.basic_consume.<locals>._callbackN)r   r   r_   r   
_callbacksr   r?   _reset_cycle)r'   r   r   r   consumer_tagr   r   r)   r   r*   basic_consumev  s    
zChannel.basic_consumec                 C  sf   || j krb| j | |   | j|d}z| j| W n tk
rP   Y nX | jj|d dS )z Cancel consumer by consumer tag.N)	r   rC   r   r   rE   r   
ValueErrorr   r   )r'   r   r   r)   r)   r*   basic_cancel  s    
zChannel.basic_cancelc                 K  sH   z.| j | || d}|s*| j||j |W S  tk
rB   Y nX dS )z+Get message by direct access (synchronous).r   N)r   r   r   r_   r^   r	   )r'   r   r   r   r]   r)   r)   r*   	basic_get  s    zChannel.basic_getc                 C  s   | j | dS )zAcknowledge message.N)r   rc   )r'   r^   multipler)   r)   r*   	basic_ack  s    zChannel.basic_ackc                 C  s   |r| j  S tddS )zRecover unacked messages.z'Does not support recover(requeue=False)N)r   ro   r   )r'   rf   r)   r)   r*   basic_recover  s    
zChannel.basic_recoverc                 C  s   | j j||d dS )zReject message.rf   N)r   rg   re   r)   r)   r*   basic_reject  s    zChannel.basic_rejectc                 C  s   || j _dS )zzChange QoS settings for this channel.

        Note:
        ----
            Only `prefetch_count` is supported.
        N)r   rN   )r'   Zprefetch_sizerN   Zapply_globalr)   r)   r*   	basic_qos  s    zChannel.basic_qosc                 C  s   t | jjS r#   )r{   r~   r6   r<   r)   r)   r*   get_exchanges  s    zChannel.get_exchangesc                 C  s   | j j| d S )z%Get table of bindings for `exchange`.r   r   )r'   r   r)   r)   r*   r     s    zChannel.get_tablec                 C  s8   z| j j| d }W n tk
r,   |}Y nX | j| S )z.Get the exchange type instance for `exchange`.r   )r~   r6   rB   r   )r'   r   defaultr   r)   r)   r*   r     s
    
zChannel.typeofc                 C  s   |dkr| j }|s|p|gS z | || ||||}W n tk
rT   g }Y nX |s|dk	rtttj	||d | 
| |g}|S )zFind all queues matching `routing_key` for the given `exchange`.

        Returns
        -------
            list[str]: queue names -- must return `[default]`
                if default is set and no queues matched.
        Nr   )r   r   lookupr   rB   warningswarnr4   UNDELIVERABLE_FMTrz   r   )r'   r   r   r  Rr)   r)   r*   _lookup  s*    

  

 

zChannel._lookupc                 C  s@   |j }| }d|d< | |d |d D ]}| || q*dS )z.Redeliver message to its original destination.TZredeliveredr   r   N)r   r   r
  r   )r'   r]   r   r   r)   r)   r*   rh     s    zChannel._restorec                 C  s
   |  |S r#   )rh   )r'   r]   r)   r)   r*   rd     s    zChannel._restore_at_beginningc                 C  sR   |p
| j j}| jrH| j rHt| dr6| j| j|dS | j| j	||dS t
 d S )N	_get_manyr   )r   _deliverr   r   rY   hasattrr  r   r   r   r	   )r'   r   r   r)   r)   r*   drain_events  s    
zChannel.drain_eventsc                 C  s   t || js| j|| dS |S )z1Convert raw message to :class:`Message` instance.)r   rM   )
isinstancer   )r'   r   r)   r)   r*   message_to_python  s    zChannel.message_to_pythonc                 C  s>   |pi }| di  | d|p"| j ||||p2i |p8i dS )zPrepare message data.r   priority)r   r   r   r   r   )r>   default_priority)r'   r   r  r   r   r   r   r)   r)   r*   prepare_message  s    zChannel.prepare_messagec                 C  s   t ddS )zEnable/disable message flow.

        Raises
        ------
            NotImplementedError: as flow
                is not implemented by the base virtual implementation.
        z%virtual channels do not support flow.Nr   )r'   activer)   r)   r*   flow  s    zChannel.flowc                 C  sp   | j sfd| _ t| jD ]}| | q| jr6| j  | jdk	rP| j  d| _| jdk	rf| j	|  d| _
dS )zTClose channel.

        Cancel all consumers, and requeue unacked messages.
        TN)r   r{   r   r   r   rU   r   closer   close_channelr   )r'   Zconsumerr)   r)   r*   r    s    



zChannel.closec                 C  s"   |r| j |||fS ||fS r#   )codecsra   r+   r'   r   encodingr)   r)   r*   r   $  s    zChannel.encode_bodyc                 C  s   |r| j ||S |S r#   )r  ra   r-   r  r)   r)   r*   r   )  s    zChannel.decode_bodyc                 C  s   t | j| jt| _d S r#   )r   r   r   r	   r   r<   r)   r)   r*   r   .  s
      zChannel._reset_cyclec                 C  s   | S r#   r)   r<   r)   r)   r*   	__enter__2  s    zChannel.__enter__ztype[BaseException] | NonezBaseException | NonezTracebackType | NoneNone)exc_typeexc_valexc_tbreturnc                 C  s   |    d S r#   )r  )r'   r  r  r   r)   r)   r*   __exit__5  s    zChannel.__exit__c                 C  s   | j jS )z/Broker state containing exchanges and bindings.)r   r~   r<   r)   r)   r*   r~   =  s    zChannel.statec                 C  s   | j dkr| | | _ | j S )z&:class:`QoS` manager for this channel.N)r   rL   r<   r)   r)   r*   r   B  s    
zChannel.qosc                 C  s   | j d kr|   | j S r#   )r   r   r<   r)   r)   r*   r   I  s    
zChannel.cyclec              
   C  sX   z$t tt|d d | j| j}W n  tttfk
rD   | j}Y nX |rT| j| S |S )zGet priority from message.

        The value is limited to within a boundary of 0 to 9.

        Note:
        ----
            Higher value has more priority.
        r   r  )	rZ   minintmax_prioritymin_priority	TypeErrorr   rB   r  )r'   r]   reverser  r)   r)   r*   _get_message_priorityO  s    	zChannel._get_message_priorityc                 C  sd   t | jj}td| jjd D ]"}||kr| jj| |  S qtdt| jj	| jjdd S )Nr   z/No free channel ids, current={}, channel_max={})   r   )
r8   r   _used_channel_idsrangechannel_maxr_   r   rz   rW   channels)r'   Zused_channel_idsr   r)   r)   r*   r   c  s    

zChannel._get_free_channel_id)Nr   FFNFF)FF)NF)FF)r   r   FN)r   r   FN)Nr   N)Nr   N)F)F)F)F)r   r   F)r   )N)NN)NNNNN)T)N)N)F)Ar.   r/   r0   r1   r   rL   ru   r   r   r   r   r"   r  r   r   Z_delivery_tagsr   r   r  r&  r%  r:   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r  r  r   r   r
  rh   rd   r  r  r  r  r  r   r   r   r  r"  propertyr~   r   r   r)  r   r)   r)   r)   r*   r     s   	
       



    
    
  
  






  





      








r   c                      s0   e Zd ZdZ fddZdd Zdd Z  ZS )
Managementz'Base class for the AMQP management API.c                   s   t  | |j | _d S r#   )r   r:   r   rM   )r'   	transportr   r)   r*   r:   w  s    zManagement.__init__c                 C  s   dd | j  D S )Nc                 S  s   g | ]\}}}|||d qS ))r   r   r   r)   )rF   qerr)   r)   r*   rG   |  s   z+Management.get_bindings.<locals>.<listcomp>)rM   r   r<   r)   r)   r*   get_bindings{  s    zManagement.get_bindingsc                 C  s   | j   d S r#   )rM   r  r<   r)   r)   r*   r    s    zManagement.close)r.   r/   r0   r1   r:   r5  r  r   r)   r)   r   r*   r0  t  s   r0  c                   @  s   e Zd ZdZeZeZeZdZdZ	dZ
dZdZdZejjjdeddgddZd	d
 Zdd Zdd Zdd Zdd ZdddZdd Zdd Zdd Zd ddZedd ZdS )!	Transportz|Virtual transport.

    Arguments:
    ---------
        client (kombu.Connection): The client this is a transport for.
    Ng      ?i  Fr   Ztopic)ZasynchronousZexchange_typeZ
heartbeatsc                 K  s\   || _ t | _g | _g | _i | _| | j| jt| _	|j
d}|d k	rN|| _tt| _d S )Npolling_interval)r   r5   r~   r.  _avail_channelsr   Cycle_drain_channelr	   r   r   ra   r7  r   ARRAY_TYPE_Hr+  )r'   r   r   r7  r)   r)   r*   r:     s    zTransport.__init__c                 C  s@   z| j  W S  tk
r:   | |}| j| | Y S X d S r#   )r8  rE   
IndexErrorr   r.  r_   )r'   r   rM   r)   r)   r*   create_channel  s    
zTransport.create_channelc                 C  s`   zRz| j|j W n tk
r(   Y nX z| j| W n tk
rN   Y nX W 5 d |_ X d S r#   )r   r+  rC   r   r   r.  )r'   rM   r)   r)   r*   r    s    
zTransport.close_channelc                 C  s   | j | |  | S r#   )r8  r_   r=  r<   r)   r)   r*   establish_connection  s    zTransport.establish_connectionc              	   C  sP   | j   | j| jfD ]4}|rz| }W n tk
r>   Y qX |  qqd S r#   )r   r  r8  r.  rE   LookupError)r'   r   Z	chan_listrM   r)   r)   r*   close_connection  s    
zTransport.close_connectionc                 C  s   t  }| jj}| j}|r(|r(||kr(|}z|| j|d W q tk
r|   |d k	rht  | |krht |d k	rxt| Y q(X qq(d S )Nr  )	r
   r   ra   r7  r  r	   socketr   r   )r'   r   r   Z
time_startra   r7  r)   r)   r*   r    s    zTransport.drain_eventsc                 C  sX   |st d|z| j| }W n* t k
rJ   tt| | | Y n
X || d S )Nz.Received message without destination queue: {})rB   rz   r   loggerwarningW_NO_CONSUMERS_reject_inbound_message)r'   r]   r   r   r)   r)   r*   r    s    zTransport._deliverc                 C  sF   | j D ]:}|r|j||d}|j||j |j|jdd  qBqd S )Nr   Tr   )r.  r   r   r_   r^   r  )r'   r   rM   r]   r)   r)   r*   rE    s    
z!Transport._reject_inbound_messagec                 C  s0   |r|| j krtd||| j | | d S )Nz,Message for queue {!r} without consumers: {})r   rB   rz   )r'   rM   r]   r   r)   r)   r*   on_message_ready  s     zTransport.on_message_readyc                 C  s   |j ||dS )N)r   r   )r  )r'   rM   r   r   r)   r)   r*   r:  
  s    zTransport._drain_channelc                 C  s   | j ddS )N	localhost)porthostname)default_portr<   r)   r)   r*   default_connection_params  s    z#Transport.default_connection_params)N)N)r.   r/   r0   r1   r   r   r9  r0  r   rJ  r.  r   r7  r-  r   r6  Z
implementsextend	frozensetr:   r=  r  r>  r@  r  r  rE  rF  r:  r/  rK  r)   r)   r)   r*   r6    s6   


r6  )Dr1   
__future__r   r$   rA  rs   r  r   collectionsr   r   r   	itertoolsr   Zmultiprocessing.utilr   r   r	   timer
   r   typingr   Zamqp.protocolr   Zkombu.exceptionsr   r   Z	kombu.logr   Zkombu.transportr   Zkombu.utils.divr   Zkombu.utils.encodingr   r   Zkombu.utils.schedulingr   Zkombu.utils.uuidr   r   r   typesr   r;  r  r   rD  ry   r}   r.   rB  r   r    r"   	Exceptionr2   UserWarningr4   r5   rL   r   r   Z
StdChannelr   r0  r6  r)   r)   r)   r*   <module>   s\   


G #%B   R