U
    .e                  
   @  s  d Z ddlmZ ddlZddlmZmZ ddlmZ ddl	m
Z
 ddlmZ dd	lmZ z$ddlZdd
lmZ ddlmZ W n  ek
r   d Z ZZY nX dZdZdZe
eZG dd dejZG dd dejZedk	reddd  ejejddG dd dZ edkre!d e" bZ#e!d$ej%j&ej%j' e( .Z)e!d$ej%j& e#*e Z+e)*de+ W 5 Q R X e#,  W 5 Q R X dS )a  Pyro transport module for kombu.

Pyro transport, and Kombu Broker daemon.

Requires the :mod:`Pyro4` library to be installed.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: No

Connection String
=================

To use the Pyro transport with Kombu, use an url of the form:

.. code-block::

    pyro://localhost/kombu.broker

The hostname is where the transport will be looking for a Pyro name server,
which is used in turn to locate the kombu.broker Pyro service.
This broker can be launched by simply executing this transport module directly,
with the command: ``python -m kombu.transport.pyro``

Transport Options
=================
    )annotationsN)EmptyQueue)reraise)
get_logger)cached_property   )virtual)NamingError)SerializerBasei#  z5Unable to locate pyro nameserver on host {0.hostname}zKUnable to lookup '{0.virtual_host}' in pyro nameserver on host {0.hostname}c                      s~   e Zd ZdZ fddZdd Zdd Zdd	 ZdddZdd Z	dd Z
dd Zdd Zdd Zdd Zedd Z  ZS )ChannelzPyro Channel.c                   s   t    | jr| j  d S N)supercloseshared_queuesZ_pyroReleaseself	__class__ 8/tmp/pip-unpacked-wheel-48hrr5dg/kombu/transport/pyro.pyr   C   s    
zChannel.closec                 C  s
   | j  S r   )r   get_queue_namesr   r   r   r   queuesH   s    zChannel.queuesc                 K  s   ||   kr| j| d S r   r   r   	new_queuer   queuekwargsr   r   r   
_new_queueK   s    zChannel._new_queuec                 K  s   | j |S r   )r   	has_queuer   r   r   r   
_has_queueO   s    zChannel._has_queueNc                 C  s   |  |}| j|S r   )
_queue_forr   get)r   r   timeoutr   r   r   _getR   s    
zChannel._getc                 C  s   ||   kr| j| |S r   r   r   r   r   r   r   r!   V   s    zChannel._queue_forc                 K  s   |  |}| j|| d S r   )r!   r   put)r   r   messager   r   r   r   _put[   s    
zChannel._putc                 C  s   | j |S r   )r   sizer%   r   r   r   _size_   s    zChannel._sizec                 O  s   | j | d S r   )r   delete)r   r   argsr   r   r   r   _deleteb   s    zChannel._deletec                 C  s   | j |S r   )r   purger%   r   r   r   _purgee   s    zChannel._purgec                 C  s   d S r   r   r%   r   r   r   after_reply_message_receivedh   s    z$Channel.after_reply_message_receivedc                 C  s   | j jS r   )
connectionr   r   r   r   r   r   k   s    zChannel.shared_queues)N)__name__
__module____qualname____doc__r   r   r   r    r$   r!   r(   r*   r-   r/   r0   r   r   __classcell__r   r   r   r   r   @   s   
r   c                      sT   e Zd ZdZeZe ZeZ	d Z
Z fddZdd Zdd Zed	d
 Z  ZS )	TransportzPyro Transport.pyroc                   s   t  j|f| | j| _d S r   )r   __init__global_statestate)r   clientr   r   r   r   r9   }   s    zTransport.__init__c              	   C  s   t d | j}ztj|j| jd}W n2 tk
rX   tttt	
|t d  Y nX z||j}t|W S  tk
r   tttt
|t d  Y nX d S )Nz0trying Pyro nameserver to find the broker daemon)hostport   )loggerdebugr<   r8   locateNShostnamedefault_portr
   r   E_NAMESERVERformatsysexc_infolookupZvirtual_hostZProxyE_LOOKUP)r   ZconninfoZ
nameserverurir   r   r   _open   s"    




zTransport._openc                 C  s   t jS r   )r8   __version__r   r   r   r   driver_version   s    zTransport.driver_versionc                 C  s   |   S r   )rL   r   r   r   r   r      s    zTransport.shared_queues)r2   r3   r4   r5   r   r	   ZBrokerStater:   DEFAULT_PORTrD   Zdriver_typeZdriver_namer9   rL   rN   r   r   r6   r   r   r   r   r7   p   s   r7   zqueue.Emptyc                 C  s   t  S r   )r   )clsdatar   r   r   <lambda>       rR   Zsingle)Zinstance_modec                   @  sX   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd ZdS )KombuBrokerzmKombu Broker used by the Pyro transport.

        You have to run this as a separate (Pyro) service.
        c                 C  s
   i | _ d S r   r   r   r   r   r   r9      s    zKombuBroker.__init__c                 C  s
   t | jS r   )listr   r   r   r   r   r      s    zKombuBroker.get_queue_namesc                 C  s   || j krd S t | j |< d S r   )r   r   r%   r   r   r   r      s    
zKombuBroker.new_queuec                 C  s
   || j kS r   rU   r%   r   r   r   r      s    zKombuBroker.has_queuec                 C  s   | j | jddS )NF)block)r   r"   r%   r   r   r   r"      s    zKombuBroker.getc                 C  s   | j | | d S r   )r   r&   )r   r   r'   r   r   r   r&      s    zKombuBroker.putc                 C  s   | j |  S r   )r   qsizer%   r   r   r   r)      s    zKombuBroker.sizec                 C  s   | j |= d S r   rU   r%   r   r   r   r+      s    zKombuBroker.deletec                 C  s6   z| j | jdd W q  tk
r.   Y q2Y q X q d S )NF)blocking)r   r"   r   r%   r   r   r   r.      s    zKombuBroker.purgeN)r2   r3   r4   r5   r9   r   r   r   r"   r&   r)   r+   r.   r   r   r   r   rT      s   rT   __main__z,Launching Broker for Kombu's Pyro transport.z'(Expecting a Pyro name server at {}:{})zAYou can connect with Kombu using the url 'pyro://{}/kombu.broker'zkombu.broker)-r5   
__future__r   rG   r   r   r   Zkombu.exceptionsr   Z	kombu.logr   Zkombu.utils.objectsr    r	   ZPyro4r8   ZPyro4.errorsr
   Z
Pyro4.utilr   ImportErrorrO   rE   rJ   r2   r@   r   r7   Zregister_dict_to_classZexposeZbehaviorrT   printZDaemondaemonrF   configZNS_HOSTZNS_PORTrB   nsregisterrK   ZrequestLoopr   r   r   r   <module>   sP   "0*

*

 

