U
    .eH                     @  s   d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl	m
Z
 d	Zd
ZG dd de	jZG dd deje	jZG dd dejZG dd de	jZG dd deZdS )a  pyamqp transport module for Kombu.

Pure-Python amqp transport using py-amqp library.

Features
========
* Type: Native
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
Connection string can have the following formats:

.. code-block::

    amqp://[USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]
    [USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]
    amqp://

For TLS encryption use:

.. code-block::

    amqps://[USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]

Transport Options
=================
Transport Options are passed to constructor of underlying py-amqp
:class:`~kombu.connection.Connection` class.

Using TLS
=========
Transport over TLS can be enabled by ``ssl`` parameter of
:class:`~kombu.Connection` class. By setting ``ssl=True``, TLS transport is
used::

    conn = Connect('amqp://', ssl=True)

This is equivalent to ``amqps://`` transport URI::

    conn = Connect('amqps://')

For adding additional parameters to underlying TLS, ``ssl`` parameter should
be set with dict instead of True::

    conn = Connect('amqp://broker.example.com', ssl={
            'keyfile': '/path/to/keyfile'
            'certfile': '/path/to/certfile',
            'ca_certs': '/path/to/ca_certfile'
        }
    )

All parameters are passed to ``ssl`` parameter of
:class:`amqp.connection.Connection` class.

SSL option ``server_hostname`` can be set to ``None`` which is causing using
hostname from broker URL. This is usefull when failover is used to fill
``server_hostname`` with currently used broker::

    conn = Connect('amqp://broker1.example.com;broker2.example.com', ssl={
            'server_hostname': None
        }
    )
    )annotationsN)get_manager)version_string_as_tuple   )baseto_rabbitmq_queue_argumentsi(  i'  c                      s"   e Zd ZdZd fdd	Z  ZS )MessagezAMQP Message.Nc                   sL   |j }t jf |j||j|d|d|j|j |dp<i d| d S )Ncontent_typecontent_encodingapplication_headers)bodychanneldelivery_tagr
   r   delivery_info
propertiesheaders)r   super__init__r   r   getr   )selfmsgr   kwargsprops	__class__ :/tmp/pip-unpacked-wheel-48hrr5dg/kombu/transport/pyamqp.pyr   X   s    	zMessage.__init__)N__name__
__module____qualname____doc__r   __classcell__r   r   r   r   r	   U   s   r	   c                   @  s<   e Zd ZdZeZdddddejfddZdd Zdd ZdS )	ChannelzAMQP Channel.Nc                 C  s   ||f||||d|pi S )z<Prepare message so that it can be sent using this transport.)priorityr
   r   r   r   )r   r   r%   r
   r   r   r   Z_Messager   r   r   prepare_messagek   s    zChannel.prepare_messagec                 K  s   t |f|S Nr   )r   	argumentsr   r   r   r   prepare_queue_argumentsx   s    zChannel.prepare_queue_argumentsc                 C  s   | j || dS )z4Convert encoded message body back to a Python value.r   )r	   )r   Zraw_messager   r   r   message_to_python{   s    zChannel.message_to_python)	r   r    r!   r"   r	   amqpr&   r)   r+   r   r   r   r   r$   f   s      
r$   c                   @  s   e Zd ZdZeZdS )
ConnectionzAMQP Connection.N)r   r    r!   r"   r$   r   r   r   r   r-      s   r-   c                   @  s   e Zd ZdZeZeZeZe	jj
Z
e	jjZe	jjZe	jjZdZdZejjjdddZd$ddZd	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd%ddZdd Ze d d! Z!d"d# Z"dS )&	TransportzAMQP Transport.zpy-amqpr,   T)ZasynchronousZ
heartbeatsNc                 K  s"   || _ |p| j| _|p| j| _d S r'   )clientdefault_portdefault_ssl_port)r   r/   r0   r1   r   r   r   r   r      s    zTransport.__init__c                 C  s   t jS r'   )r,   __version__r   r   r   r   driver_version   s    zTransport.driver_versionc                 C  s   |  S r'   r*   r   
connectionr   r   r   create_channel   s    zTransport.create_channelc                 K  s   |j f |S r'   )drain_events)r   r6   r   r   r   r   r8      s    zTransport.drain_eventsc                 C  s   |d k	r|   d S r'   )Zcollectr5   r   r   r   _collect   s    zTransport._collectc                 C  s   | j }| j D ] \}}t||dst||| q|jdkrBd|_t|jtrrd|jkrr|jd dkrr|j|jd< t|j	|j
|j|j|j|j|j|j|jd	f|jpi }| jf |}| j |_ |  |S )z(Establish connection to the AMQP broker.N	localhostz	127.0.0.1server_hostname)	hostuseridpasswordlogin_methodvirtual_hostinsistsslconnect_timeout	heartbeat)r/   default_connection_paramsitemsgetattrsetattrhostname
isinstancerB   dictr<   r=   r>   r?   r@   rA   rC   rD   Ztransport_optionsr-   connect)r   Zconninfonamedefault_valueoptsconnr   r   r   establish_connection   s:    

zTransport.establish_connectionc                 C  s   |j S r'   )	connectedr5   r   r   r   verify_connection   s    zTransport.verify_connectionc                 C  s   d|_ |  dS )z!Close the AMQP broker connection.N)r/   closer5   r   r   r   close_connection   s    zTransport.close_connectionc                 C  s   |j S r'   )rD   r5   r   r   r   get_heartbeat_interval   s    z Transport.get_heartbeat_intervalc                 C  s    d|j _||j| j|| d S NT)	transportZraise_on_initial_eintrZ
add_readersockZon_readable)r   r6   Zloopr   r   r   register_with_event_loop   s    z"Transport.register_with_event_loop   c                 C  s   |j |dS )N)rate)Zheartbeat_tick)r   r6   r\   r   r   r   heartbeat_check   s    zTransport.heartbeat_checkc                 C  s(   |j }|ddkr$t|d dk S dS )NproductZRabbitMQversion)   r`   T)Zserver_propertiesr   r   )r   r6   r   r   r   r   qos_semantics_matches_spec   s    z$Transport.qos_semantics_matches_specc                 C  s    dd| j jr| jn| jdddS )NZguestr:   ZPLAIN)r=   r>   portrI   r?   )r/   rB   r1   r0   r3   r   r   r   rE      s    z#Transport.default_connection_paramsc                 O  s   t | jf||S r'   )r   r/   r   argsr   r   r   r   r      s    zTransport.get_manager)NN)r[   )#r   r    r!   r"   r-   DEFAULT_PORTr0   DEFAULT_SSL_PORTr1   r,   Zconnection_errorsZchannel_errorsZrecoverable_connection_errorsZrecoverable_channel_errorsZdriver_nameZdriver_typer   r.   Z
implementsextendr   r4   r7   r8   r9   rQ   rS   rU   rV   rZ   r]   ra   propertyrE   r   r   r   r   r   r.      s@      



r.   c                      s    e Zd ZdZ fddZ  ZS )SSLTransportzAMQP SSL Transport.c                   s"   t  j|| | jjsd| j_d S rW   )r   r   r/   rB   rc   r   r   r   r      s    zSSLTransport.__init__r   r   r   r   r   ri      s   ri   )r"   
__future__r   r,   Zkombu.utils.amq_managerr   Zkombu.utils.textr    r   r   re   rf   r	   r$   Z
StdChannelr-   r.   ri   r   r   r   r   <module>   s   Fo