U
    .e                     @  s   d Z ddlmZ ddlZddlZddlZddlZddlmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ d
ZdZdZdZG dd dejZG dd dejejZG dd dejZG dd dejZdS )zT`librabbitmq`_ transport.

.. _`librabbitmq`: https://pypi.org/project/librabbitmq/
    )annotationsN)ChannelErrorConnectionError)get_manager)version_string_as_tuple   )base)to_rabbitmq_queue_argumentsz
    librabbitmq version too old to detect RabbitMQ version information
    so make sure you are using librabbitmq 1.5 when using rabbitmq > 3.3
i(  i'  zAssl not supported by librabbitmq, please use pyamqp:// or stunnelc                      s    e Zd ZdZ fddZ  ZS )MessagezAMQP Message (librabbitmq).c                   s8   t  j|||||d|d|d|dd d S )Ndelivery_tagcontent_typecontent_encodingheaders)channelbodyZdelivery_info
propertiesr   r   r   r   )super__init__get)selfr   propsinfor   	__class__ ?/tmp/pip-unpacked-wheel-48hrr5dg/kombu/transport/librabbitmq.pyr   $   s    zMessage.__init__)__name__
__module____qualname____doc__r   __classcell__r   r   r   r   r
   !   s   r
   c                   @  s&   e Zd ZdZeZdddZdd ZdS )ChannelzAMQP Channel (librabbitmq).Nc                 C  s:   |dk	r|ni }| |||d |dk	r2||d< ||fS )z%Encapsulate data into a AMQP message.N)r   r   r   priority)update)r   r   r"   r   r   r   r   r   r   r   prepare_message5   s    zChannel.prepare_messagec                 K  s   t |f|}dd | D S )Nc                 S  s   i | ]\}}| d |qS )utf8)encode).0kvr   r   r   
<dictcomp>F   s      z3Channel.prepare_queue_arguments.<locals>.<dictcomp>)r	   items)r   	argumentskwargsr   r   r   prepare_queue_argumentsD   s    zChannel.prepare_queue_arguments)NNNNN)r   r   r   r   r
   r$   r.   r   r   r   r   r!   0   s          
r!   c                   @  s   e Zd ZdZeZeZdS )
ConnectionzAMQP Connection (librabbitmq).N)r   r   r   r   r!   r
   r   r   r   r   r/   I   s   r/   c                   @  s   e Zd ZdZeZeZeZe	j
jeejeef Ze	j
jef ZdZdZe	j
jjdddZdd Zd	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Z dd Z!e"dd Z#dS ) 	TransportzAMQP Transport (librabbitmq).amqplibrabbitmqTF)ZasynchronousZ
heartbeatsc                 K  s4   || _ |dp| j| _|dp&| j| _d | _d S )Ndefault_portdefault_ssl_port)clientr   r3   r4   Z_Transport__reader)r   r5   r-   r   r   r   r   g   s    
zTransport.__init__c                 C  s   t jS N)r1   __version__r   r   r   r   driver_versionn   s    zTransport.driver_versionc                 C  s   |  S r6   )r   r   
connectionr   r   r   create_channelq   s    zTransport.create_channelc                 K  s   |j f |S r6   )drain_events)r   r;   r-   r   r   r   r=   t   s    zTransport.drain_eventsc              
   C  s   | j }| j D ] \}}t||dst||| q|jr@ttt|j	|j
|j|j|j|j|j|jdf|jppi }| jf |}| j |_ |j| j _|S )z(Establish connection to the AMQP broker.N)hostuseridpasswordvirtual_hostlogin_methodinsistsslconnect_timeout)r5   default_connection_paramsr+   getattrsetattrrD   NotImplementedErrorNO_SSL_ERRORdictr>   r?   r@   rA   rB   rC   rE   Ztransport_optionsr/   r=   )r   Zconninfonamedefault_valueoptsconnr   r   r   establish_connectionw   s,    	

zTransport.establish_connectionc                 C  s   d| j _|  dS )z!Close the AMQP broker connection.N)r5   r=   closer:   r   r   r   close_connection   s    zTransport.close_connectionc              	   C  sp   |d k	r^|j  D ]
}d |_qzt|  W n ttfk
rH   Y nX |j   |j	  d | j
_d | _
d S r6   )Zchannelsvaluesr;   osrQ   filenoOSError
ValueErrorclear	callbacksr5   r=   )r   r;   r   r   r   r   _collect   s    

zTransport._collectc                 C  s   |j S r6   )	connectedr:   r   r   r   verify_connection   s    zTransport.verify_connectionc                 C  s   | | | j|| d S r6   )Z
add_readerrU   Zon_readable)r   r;   Zloopr   r   r   register_with_event_loop   s       z"Transport.register_with_event_loopc                 O  s   t | jf||S r6   )r   r5   )r   argsr-   r   r   r   r      s    zTransport.get_managerc                 C  sP   z
|j }W n" tk
r,   ttt Y n X |ddkrLt|d dk S dS )NproductZRabbitMQversion)   ra   T)Zserver_propertiesAttributeErrorwarningswarnUserWarning	W_VERSIONr   r   )r   r;   r   r   r   r   qos_semantics_matches_spec   s    
z$Transport.qos_semantics_matches_specc                 C  s    dd| j jr| jn| jdddS )NZguest	localhostZPLAIN)r?   r@   porthostnamerB   )r5   rD   r4   r3   r8   r   r   r   rF      s    z#Transport.default_connection_paramsN)$r   r   r   r   r/   DEFAULT_PORTr3   DEFAULT_SSL_PORTr4   r   r0   Zconnection_errorsr   socketerrorIOErrorrV   Zchannel_errorsr   Zdriver_typeZdriver_nameZ
implementsextendr   r9   r<   r=   rP   rR   rZ   r\   r]   r   rg   propertyrF   r   r   r   r   r0   P   s@      
r0   )r   
__future__r   rT   rm   rc   r2   r1   r   r   Zkombu.utils.amq_managerr   Zkombu.utils.textr    r   r	   rf   rk   rl   rJ   r
   r!   Z
StdChannelr/   r0   r   r   r   r   <module>   s$   