U
    .e                     @  sf  d Z ddlmZ ddlZddlZddlmZ ddlmZm	Z	 ddl
mZmZ ddlmZ zddlZdd	lmZ dd
lmZ ejjejjejjejjejjejjejjejjejjf	Zejjejj ejj!ejjejjejjejj"ejj#ejjejj$ejj%ejj&ejjejj'ej(fZ)W n" e*k
r4   dZd ZZ)Y nX dZ+dZ,G dd dej-Z-G dd dej.Z.dS )a  Zookeeper transport module for kombu.

Zookeeper based transport. This transport uses the built-in kazoo Zookeeper
based queue implementation.

**References**

- https://zookeeper.apache.org/doc/current/recipes.html#sc_recipes_Queues
- https://kazoo.readthedocs.io/en/latest/api/recipe/queue.html

**Limitations**
This queue does not offer reliable consumption.  An entry is removed from
the queue prior to being processed.  So if an error occurs, the consumer
has to re-queue the item or it will be lost.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: Yes
* Supports TTL: No

Connection String
=================
Connects to a zookeeper node as:

.. code-block::

    zookeeper://SERVER:PORT/VHOST

The <vhost> becomes the base for all the other znodes.  So we can use
it like a vhost.


Transport Options
=================

    )annotationsN)Empty)bytes_to_strensure_bytes)dumpsloads   )virtual)KazooClient)Queue i  z!Mahendra M <mahendra.m@gmail.com>c                      s   e Zd ZdZdZi Z fddZdd Zdd Zd	d
 Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zedd Z  ZS )ChannelzZookeeper Channel.Nc                   s0   t  j|f| | jjj}d|d| _d S )Nz/{}/)super__init__
connectionclientZvirtual_hostformatstrip_vhost)selfr   kwargsZvhost	__class__r   =/tmp/pip-unpacked-wheel-48hrr5dg/kombu/transport/zookeeper.pyr   i   s    
zChannel.__init__c                 C  s   t j| j|S N)ospathjoinr   )r   
queue_namer   r   r   	_get_pathn   s    zChannel._get_pathc                 C  s>   | j |d }|d kr:t| j| |}|| j |< t| |S r   )_queuesgetr   r   r    len)r   r   queuer   r   r   
_get_queueq   s    
zChannel._get_queuec                 K  s&   |  |jtt|| j|dddS )NT)reverse)priority)r%   putr   r   Z_get_message_priority)r   r$   messager   r   r   r   _put}   s    

zChannel._putc                 C  s,   |  |}| }|d kr t tt|S r   )r%   r"   r   r   r   )r   r$   msgr   r   r   _get   s
    
zChannel._getc                 C  s.   d}|  |}| }|d kr q*|d7 }q|S )Nr   r   )r%   r"   )r   r$   countr+   r   r   r   _purge   s    

zChannel._purgec                 O  s*   |  |r&| | | j| | d S r   )
_has_queuer.   r   deleter    )r   r$   argsr   r   r   r   _delete   s    

zChannel._deletec                 C  s   |  |}t|S r   )r%   r#   r   r$   r   r   r   _size   s    
zChannel._sizec                 K  s   |  |s| |}d S r   )r/   r%   )r   r$   r   r   r   r   
_new_queue   s    
zChannel._new_queuec                 C  s   | j | |d k	S r   )r   existsr    r3   r   r   r   r/      s    zChannel._has_queuec              	   C  s   | j j}g }|jr|jD ]}|dr6|tdd  }|s<qz |dd\}}|t|f}W n6 tk
r   ||jkr||j	pt
f}n|t
f}Y nX || q|j|j	pt
f}||kr|d| ddd |D }t|}|  |S )Nzzookeeper://:r   r   ,c                 S  s   g | ]\}}| d | qS )r7   r   ).0hpr   r   r   
<listcomp>   s     z!Channel._open.<locals>.<listcomp>)r   r   Zalt
startswithr#   splitint
ValueErrorhostnameportDEFAULT_PORTappendinsertr   r
   start)r   Zconninfohosts	host_porthostrB   Zconn_strconnr   r   r   _open   s.    


zChannel._openc                 C  s   | j d kr|  | _ | j S r   )_clientrK   r   r   r   r   r      s    

zChannel.client)__name__
__module____qualname____doc__rL   r!   r   r    r%   r*   r,   r.   r2   r4   r5   r/   rK   propertyr   __classcell__r   r   r   r   r   c   s    	r   c                      sT   e Zd ZdZeZdZeZej	j
e Z
ej	je ZdZdZ fddZdd Z  ZS )		TransportzZookeeper Transport.r   Z	zookeeperkazooc                   s"   t d krtdt j|| d S )Nz"The kazoo library is not installed)rU   ImportErrorr   r   )r   r1   r   r   r   r   r      s    zTransport.__init__c                 C  s   t jS r   )rU   __version__rM   r   r   r   driver_version   s    zTransport.driver_version)rN   rO   rP   rQ   r   Zpolling_intervalrC   default_portr	   rT   Zconnection_errorsKZ_CONNECTION_ERRORSZchannel_errorsKZ_CHANNEL_ERRORSZdriver_typeZdriver_namer   rX   rS   r   r   r   r   rT      s   

rT   )/rQ   
__future__r   r   socketr$   r   Zkombu.utils.encodingr   r   Zkombu.utils.jsonr   r    r	   rU   Zkazoo.clientr
   Zkazoo.recipe.queuer   
exceptionsZSystemErrorExceptionZConnectionLossExceptionZMarshallingErrorExceptionZUnimplementedExceptionZOperationTimeoutExceptionZNoAuthExceptionZInvalidACLExceptionZAuthFailedExceptionZSessionExpiredExceptionrZ   ZRuntimeInconsistencyExceptionZDataInconsistencyExceptionZBadArgumentsExceptionZApiErrorExceptionZNoNodeExceptionZNodeExistsExceptionZ NoChildrenForEphemeralsExceptionZNotEmptyExceptionZInvalidCallbackExceptionerrorr[   rV   rC   
__author__r   rT   r   r   r   r   <module>   sX   )f