U
    .e޽                     @  s  d Z ddlmZ ddlZddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ ddlmZmZmZ ddlmZ ddlm Z m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z' ddl(m)Z) zddl*Z*W n e+k
r*   dZ*Y nX zddl*m,Z, W n e+k
rV   dZ,Y nX edZ-e-j.e-j/ Z0Z/dZ1dZ2dZ3ddddgZ4eddZ5dd  Z6d!d" Z7G d#d$ d$e8Z9e
d%d& Z:d'd( Z;G d)d* d*Z<G d+d, d,e<e*j=Z>G d-d. d.e<e*j?j@ZAG d/d0 d0e*j?jBZCG d1d2 d2e)jDZDG d3d4 d4ZEG d5d6 d6e)jFZFG d7d8 d8e)jGZGe,rpG d9d: d:e,jHe*jIZJG d;d< d<eFZKG d=d> d>eGZLdS )?a  Redis transport module for Kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: No

Connection String
=================
Connection string has the following format:

.. code-block::

    redis://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]
    rediss://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]

To use sentinel for dynamic Redis discovery,
the connection string has following format:

.. code-block::

    sentinel://[USER:PASSWORD@]SENTINEL_ADDRESS[:PORT]

Transport Options
=================
* ``sep``
* ``ack_emulation``: (bool) If set to True transport will
  simulate Acknowledge of AMQP protocol.
* ``unacked_key``
* ``unacked_index_key``
* ``unacked_mutex_key``
* ``unacked_mutex_expire``
* ``visibility_timeout``
* ``unacked_restore_limit``
* ``fanout_prefix``
* ``fanout_patterns``
* ``global_keyprefix``: (str) The global key prefix to be prepended to all keys
  used by Kombu
* ``socket_timeout``
* ``socket_connect_timeout``
* ``socket_keepalive``
* ``socket_keepalive_options``
* ``queue_order_strategy``
* ``max_connections``
* ``health_check_interval``
* ``retry_on_timeout``
* ``priority_steps``
    )annotationsN)bisect)
namedtuple)contextmanager)Empty)time)promise)InconsistencyErrorVersionMismatch)
get_logger)register_after_fork)bytes_to_str)ERRREADpoll)accepts_argument)dumpsloads)cached_property)cycle_by_name)
_parse_url   )virtual)sentinelzkombu.transport.redisi           	   error_classes_t)connection_errorschannel_errorsc               	   C  s^   ddl m}  t| dr| j}n| j}ttjjt	t
jtt| j| j| jf tjj|| j| jf S )z$Return tuple of redis error classes.r   
exceptionsInvalidData)redisr"   hasattrr#   	DataErrorr   r   	Transportr   r	   socketerrorIOErrorOSErrorConnectionErrorAuthenticationErrorTimeoutErrorr    ZInvalidResponseResponseError)r"   r&    r0   9/tmp/pip-unpacked-wheel-48hrr5dg/kombu/transport/redis.pyget_redis_error_classesy   s(    
r2   c                  C  s   ddl m}  | jS )z1Return the redis ConnectionError exception class.r   r!   )r$   r"   r,   r!   r0   r0   r1   get_redis_ConnectionError   s    r3   c                   @  s   e Zd ZdZdS )	MutexHeldz)Raised when another party holds the lock.N__name__
__module____qualname____doc__r0   r0   r0   r1   r4      s   r4   c                 c  sf   | j ||d}d}z"|jdd}|r,dV  nt W 5 |r`z|  W n tjjk
r^   Y nX X dS )zTAcquire redis lock in non blocking way.

    Raise MutexHeld if not successful.
    timeoutF)blockingN)lockreleaser$   r"   ZLockNotOwnedErroracquirer4   )clientnameZexpirer=   Zlock_acquiredr0   r0   r1   Mutex   s    
rB   c                 C  s   |    d S N)_after_forkchannelr0   r0   r1   _after_fork_cleanup_channel   s    rG   c                      s   e Zd ZdZdddddddd	d
dddddddgZdddddddddddddZdd Z fddZ fddZd"d d!Z	  Z
S )#GlobalKeyPrefixMixina  Mixin to provide common logic for global key prefixing.

    Overriding all the methods used by Kombu with the same key prefixing logic
    would be cumbersome and inefficient. Hence, we override the command
    execution logic that is called by all commands.
    ZHDELZHGETZHLENZHSETZLLENZLPUSHZPUBLISHZRPUSHZRPOPZSADDZSREMZSETZSMEMBERSZZADDZZREMZZREVRANGEBYSCOREr   N)
args_startargs_end   r   )ZDELBRPOPZEVALSHAZWATCHc                   s   t |}|d}| jkr4 jt|d  |d< nx| jkr j| d } j| d }|dkrn|d | ng }g }|d k	r||d  }| fdd||| D  | }|f|S )Nr   rI   rJ   c                   s   g | ]} j t| qS r0   global_keyprefixstr.0argselfr0   r1   
<listcomp>   s   z5GlobalKeyPrefixMixin._prefix_args.<locals>.<listcomp>)listpopPREFIXED_SIMPLE_COMMANDSrO   rP   PREFIXED_COMPLEX_COMMANDS)rU   argscommandrI   rJ   Zpre_argsZ	post_argsr0   rT   r1   _prefix_args   s"    



z!GlobalKeyPrefixMixin._prefix_argsc                   sD   t  j||f|}|dkr@|r@|\}}|t| jd }||fS |S )zParse a response from the Redis server.

        Method wraps ``redis.parse_response()`` to remove prefixes of keys
        returned by redis command.
        rM   N)superparse_responselenrO   )rU   
connectioncommand_nameoptionsretkeyvalue	__class__r0   r1   r_      s    z#GlobalKeyPrefixMixin.parse_responsec                   s   t  j| ||S rC   r^   execute_commandr]   rU   r[   kwargsrg   r0   r1   rj      s    z$GlobalKeyPrefixMixin.execute_commandTc                 C  s   t | j| j||| jdS )NrO   )PrefixedRedisPipelineconnection_poolZresponse_callbacksrO   )rU   transactionZ
shard_hintr0   r0   r1   pipeline   s    zGlobalKeyPrefixMixin.pipeline)TN)r6   r7   r8   r9   rY   rZ   r]   r_   rj   rq   __classcell__r0   r0   rg   r1   rH      s6   rH   c                   @  s    e Zd ZdZdd Zdd ZdS )PrefixedStrictRedisz@Returns a ``StrictRedis`` client that prefixes the keys it uses.c                 O  s&   | dd| _tjj| f|| d S NrO    )rX   rO   r$   Redis__init__rk   r0   r0   r1   rw     s    zPrefixedStrictRedis.__init__c                 K  s   t | jfd| ji|S )NrO   )PrefixedRedisPubSubro   rO   )rU   rl   r0   r0   r1   pubsub  s    zPrefixedStrictRedis.pubsubN)r6   r7   r8   r9   rw   ry   r0   r0   r0   r1   rs   
  s   rs   c                   @  s   e Zd ZdZdd ZdS )rn   a   Custom Redis pipeline that takes global_keyprefix into consideration.

    As the ``PrefixedStrictRedis`` client uses the `global_keyprefix` to prefix
    the keys it uses, the pipeline called by the client must be able to prefix
    the keys as well.
    c                 O  s(   | dd| _tjjj| f|| d S rt   )rX   rO   r$   r@   Pipelinerw   rk   r0   r0   r1   rw   !  s    zPrefixedRedisPipeline.__init__N)r6   r7   r8   r9   rw   r0   r0   r0   r1   rn     s   rn   c                      sD   e Zd ZdZdZ fddZdd Z fddZ fd	d
Z  Z	S )rx   zCRedis pubsub client that takes global_keyprefix into consideration.)Z	SUBSCRIBEZUNSUBSCRIBEZ
PSUBSCRIBEZPUNSUBSCRIBEc                   s    | dd| _t j|| d S rt   )rX   rO   r^   rw   rk   rg   r0   r1   rw   0  s    zPrefixedRedisPubSub.__init__c                   s8   t |}|d}| jkr. fdd|D }|f|S )Nr   c                   s   g | ]} j t| qS r0   rN   rQ   rT   r0   r1   rV   9  s   z4PrefixedRedisPubSub._prefix_args.<locals>.<listcomp>)rW   rX   PUBSUB_COMMANDS)rU   r[   r\   r0   rT   r1   r]   4  s    


z PrefixedRedisPubSub._prefix_argsc                   sB   t  j||}|dkr|S |^}}}|f fdd|D |fS )zParse a response from the Redis server.

        Method wraps ``PubSub.parse_response()`` to remove prefixes of keys
        returned by redis command.
        Nc                   s   g | ]}|t  jd  qS rC   )r`   rO   )rR   rF   rT   r0   r1   rV   R  s     z6PrefixedRedisPubSub.parse_response.<locals>.<listcomp>)r^   r_   )rU   r[   rl   rd   Zmessage_typeZchannelsmessagerg   rT   r1   r_   @  s    z"PrefixedRedisPubSub.parse_responsec                   s   t  j| ||S rC   ri   rk   rg   r0   r1   rj   V  s    z#PrefixedRedisPubSub.execute_command)
r6   r7   r8   r9   r{   rw   r]   r_   rj   rr   r0   r0   rg   r1   rx   &  s   rx   c                      s   e Zd ZdZdZ fddZ fddZd#dd	Z fd
dZd$ddZ	e
d%ddZd&ddZd'ddZd(ddZedd Zedd Zedd Zedd  Zed!d" Z  ZS ))QoSzRedis Ack Emulation.Tc                   s   t  j|| d| _d S )Nr   )r^   rw   _vrestore_countrk   rg   r0   r1   rw   _  s    zQoS.__init__c              	     s   |j }|d |d  }}tjd dkr4|t ig}n
t |g}|  B}|j| jf| | j|t	|j
||g  t || W 5 Q R X d S )Nexchangerouting_keyr   r   )delivery_infor$   VERSIONr   pipe_or_acquireZzaddunacked_index_keyZhsetunacked_keyr   _rawexecuter^   append)rU   r|   delivery_tagZdeliveryEXRKZ	zadd_argspiperg   r0   r1   r   c  s    

 z
QoS.appendNc              	   C  s@   | j | }| jD ]}| j||d qW 5 Q R X | j  d S )Nr@   )rF   conn_or_acquireZ
_deliveredrestore_by_tagclear)rU   r@   tagr0   r0   r1   restore_unackedt  s    
zQoS.restore_unackedc                   s   |  |  t | d S rC   )_remove_from_indicesr   r^   ack)rU   r   rg   r0   r1   r   z  s    zQoS.ackFc                 C  s    |r| j |dd | | d S NT)leftmost)r   r   )rU   r   Zrequeuer0   r0   r1   reject~  s    z
QoS.rejectc              	   c  s2   |r|V  n"| j |}| V  W 5 Q R X d S rC   )rF   r   rq   )rU   r   r@   r0   r0   r1   r     s    zQoS.pipe_or_acquirec              
   C  s:   |  |&}|| j|| j|W  5 Q R  S Q R X d S rC   )r   Zzremr   hdelr   )rU   r   r   r0   r0   r1   r     s
     zQoS._remove_from_indicesr   
   c           	   
   C  s   |  j d7  _ | j d | r d S | j }t | j }zZt|| j| j@ |j| j	|d|o^||dd}|png D ]\}}| 
|| qpW 5 Q R X W n tk
r   Y nX W 5 Q R X d S )Nr   r   T)startnumZ
withscores)r~   rF   r   r   visibility_timeoutrB   unacked_mutex_keyunacked_mutex_expireZzrevrangebyscorer   r   r4   )	rU   r   r   intervalr@   ceilZvisibler   Zscorer0   r0   r1   restore_visible  s*        zQoS.restore_visiblec              	     s:    fdd}j |}||j W 5 Q R X d S )Nc                   sP   |  j}|   |  |rLtt|\}}}j||||   d S rC   )hgetr   multir   r   r   rF   _do_restore_message)r   pMr   r   r   rU   r   r0   r1   restore_transaction  s    z/QoS.restore_by_tag.<locals>.restore_transaction)rF   r   rp   r   )rU   r   r@   r   r   r0   r   r1   r     s    zQoS.restore_by_tagc                 C  s   | j jS rC   )rF   r   rT   r0   r0   r1   r     s    zQoS.unacked_keyc                 C  s   | j jS rC   )rF   r   rT   r0   r0   r1   r     s    zQoS.unacked_index_keyc                 C  s   | j jS rC   )rF   r   rT   r0   r0   r1   r     s    zQoS.unacked_mutex_keyc                 C  s   | j jS rC   )rF   r   rT   r0   r0   r1   r     s    zQoS.unacked_mutex_expirec                 C  s   | j jS rC   )rF   r   rT   r0   r0   r1   r     s    zQoS.visibility_timeout)N)F)NN)N)r   r   r   )NF)r6   r7   r8   r9   Zrestore_at_shutdownrw   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rr   r0   r0   rg   r1   r}   Z  s,   








r}   c                   @  s   e Zd ZdZeeB ZdZdZdd Z	dd Z
dd	 Zd
d Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd(d$d%Zed&d' ZdS ))MultiChannelPollerz%Async I/O poller for Redis transport.FNc                 C  s(   t  | _i | _i | _t | _t  | _d S rC   )set	_channels_fd_to_chan_chan_to_sockr   poller
after_readrT   r0   r0   r1   rw     s
    zMultiChannelPoller.__init__c              
   C  s\   | j  D ].}z| j| W q
 ttfk
r6   Y q
X q
| j  | j  | j   d S rC   )	r   valuesr   
unregisterKeyError
ValueErrorr   r   r   )rU   fdr0   r0   r1   close  s    

zMultiChannelPoller.closec                 C  s   | j | d S rC   )r   addrU   rF   r0   r0   r1   r     s    zMultiChannelPoller.addc                 C  s   | j | d S rC   )r   discardr   r0   r0   r1   r     s    zMultiChannelPoller.discardc              	   C  s0   z| j |j W n ttfk
r*   Y nX d S rC   )r   r   _sockAttributeError	TypeErrorrU   ra   r0   r0   r1   _on_connection_disconnect  s    z,MultiChannelPoller._on_connection_disconnectc                 C  sr   |||f| j kr| ||| |jjd kr4|j  |jj}||f| j| < || j |||f< | j|| j	 d S rC   )
r   _unregisterra   r   connectr   filenor   register
eventflags)rU   rF   r@   typesockr0   r0   r1   	_register  s    
zMultiChannelPoller._registerc                 C  s   | j | j|||f  d S rC   )r   r   r   )rU   rF   r@   r   r0   r0   r1   r     s    zMultiChannelPoller._unregisterc                 C  s:   t |dd d kr|jd|_|jjd k	o8|||f| jkS )Nra   _)getattrro   get_connectionra   r   r   )rU   rF   r@   cmdr0   r0   r1   _client_registered  s
    z%MultiChannelPoller._client_registeredc                 C  s>   ||j df}| ||j ds,d|_| j|  |js:|  dS )zEnable BRPOP mode for channel.rM   FN)r@   r   _in_pollr   _brpop_start)rU   rF   identr0   r0   r1   _register_BRPOP  s    
z"MultiChannelPoller._register_BRPOPc                 C  s8   |  ||jds&d|_| ||jd |js4|  dS )zEnable LISTEN mode for channel.LISTENFN)r   	subclient
_in_listenr   
_subscriber   r0   r0   r1   _register_LISTEN  s
    z#MultiChannelPoller._register_LISTENc                 C  s:   | j D ].}|jr$|j r$| | |jr| | qd S rC   )r   active_queuesqoscan_consumer   active_fanout_queuesr   r   r0   r0   r1   on_poll_start  s    


z MultiChannelPoller.on_poll_startc                 C  s(   || _ | jD ]}|jj|jd  S d S N)r   )r   r   r   r   unacked_restore_limit)rU   r   rF   r0   r0   r1   on_poll_init  s
    
zMultiChannelPoller.on_poll_initc                 C  s*   | j D ]}|jr|jj|jd  S qd S r   )r   r   r   r   r   r   r0   r0   r1   maybe_restore_messages#  s
    
z)MultiChannelPoller.maybe_restore_messagesc                 C  s<   | j D ]0}|jd}|d k	rtt|dd r|  qd S )Nr   check_health)r   __dict__getcallabler   r   )rU   rF   r@   r0   r0   r1   maybe_check_subclient_health+  s    
z/MultiChannelPoller.maybe_check_subclient_healthc                 C  s(   | j | \}}|j r$|j|   d S rC   )r   r   r   handlers)rU   r   chanr   r0   r0   r1   on_readable3  s    
zMultiChannelPoller.on_readablec                 C  s:   |t @ r| || fS |t@ r6| j| \}}|| d S rC   )r   r   r   r   _poll_error)rU   r   eventr   r   r0   r0   r1   handle_event8  s
    zMultiChannelPoller.handle_eventc           	      C  s   d| _ z| jD ].}|jr,|j r,| | |j	r| 
| q| j|}|rv|D ]"\}}| ||}|rR W d S qR|   t W 5 d| _ | jrz| j }W n tk
r   Y qY qX |  qX d S )NTF)_in_protected_readr   rX   r   r   r   r   r   r   r   r   r   r   r   r   r   )	rU   callbackr;   ZfunrF   eventsr   r   rd   r0   r0   r1   r   ?  s.    




zMultiChannelPoller.getc                 C  s   | j S rC   )r   rT   r0   r0   r1   fds]  s    zMultiChannelPoller.fds)N)r6   r7   r8   r9   r   r   r   r   r   rw   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   propertyr   r0   r0   r0   r1   r     s.   

	
r   c                      sz  e Zd ZdZeZdZdZdZdZdZ	dZ
dZdZdZi ZdZdZd	Zd
ZdZdZdZeZdZdZdZdZdZdZeZdZ dZ!dZ"dZ#dZ$dZ%e&j'j(d Z(e)re)j*ndZ+e)re)j,ndZ- fddZ.dd Z/dd Z0dd Z1drddZ2ds fdd	Z3dd Z4 fdd Z5d!d" Z6 fd#d$Z7d%d& Z8d'd( Z9d)d* Z:d+d, Z;d-d. Z<d/d0 Z=d1d2 Z>dtd4d5Z?d6d7 Z@d8d9 ZAd:d; ZBd<d= ZCd>d? ZDd@dA ZEdBdC ZFdDdE ZGdudFdGZHdHdI ZIdJdK ZJdLdM ZKdNdO ZLdPdQ ZM fdRdSZNdTdU ZOdVdW ZPdvdXdYZQdwdZd[ZRdxd\d]ZSdyd^d_ZTd`da ZUeVdzdbdcZWeXddde ZYeXdfdg ZZe[dhdi Z\e[djdk Z]dldm Z^dndo Z_eXdpdq Z`  ZaS ){ChannelzRedis Channel.NFTz_kombu.binding.%sz/{db}.zZunackedZunacked_indexZunacked_mutexi,  i  r   ru   Zround_robin)sepack_emulationr   r   r   r   r   r   fanout_prefixfanout_patternsrO   socket_timeoutsocket_connect_timeoutsocket_keepalivesocket_keepalive_optionsqueue_order_strategymax_connectionshealth_check_intervalretry_on_timeoutpriority_stepsc                   s   t  j|| | jstj| _t| j | _|  | _	| 
 | _t | _t | _i | _| j| jd| _| jrt| jtr| j| _nd| _z| j  W n tk
r   |    Y nX | jj|  | jj| _td k	rt| t  d S )N)rM   r   ru   )!r^   rw   r   r   r}   r   r   _queue_cycle_get_clientClient_get_response_errorr/   r   r   auto_delete_queues_fanout_to_queue_brpop_read_receiver   r   
isinstancerP   keyprefix_fanoutr@   Zping	Exception_disconnect_poolsra   cycler   r   r   rG   rk   rg   r0   r1   rw     s.    



zChannel.__init__c                 C  s   |    d S rC   )r  rT   r0   r0   r1   rD     s    zChannel._after_forkc                 C  s<   | j }| j}d  | _| _ |d k	r(|  |d k	r8|  d S rC   )_pool_async_pool
disconnect)rU   pool
async_poolr0   r0   r1   r    s    zChannel._disconnect_poolsc                 C  s@   | j |krd | _ | j|kr d | _| jr<| jjr<| jj| d S rC   )r   r   ra   r  r   r   r0   r0   r1   r     s    

z!Channel._on_connection_disconnectc                 C  s   zfz d|d d< d|d d d< W n t k
r6   Y nX | ||D ]}|rR|jn|j|t| qDW n" tk
r   td|dd Y nX d S )NTheadersZredeliveredZ
propertiesr   zCould not restore message: %rexc_info)r   Z_lookuplpushZrpushr   r  crit)rU   payloadr   r   r   r   queuer0   r0   r1   r   	  s     
zChannel._do_restore_messagec              	     sN   j st |S |j fdd} }||j W 5 Q R X d S )Nc                   sP   |  j}|   | j |rLtt|\}}}||||   d S rC   )r   r   r   r   r   r   r   )r   Pr   r   r   r   r0   r1   r     s    z-Channel._restore.<locals>.restore_transaction)r   r^   _restorer   r   rp   r   )rU   r|   r   r   r@   rg   r   r1   r    s    
zChannel._restorec                 C  s   | j |ddS r   )r  )rU   r|   r0   r0   r1   _restore_at_beginning(  s    zChannel._restore_at_beginningc                   sN   || j kr.| j | \}}| j| || j|< t j|f||}|   |S rC   )_fanout_queuesr   r   r   r^   basic_consume_update_queue_cycle)rU   r  r[   rl   r   r   rd   rg   r0   r1   r  +  s    

zChannel.basic_consumec                 C  s8   | j }|r4|jjr*|jjt| j|fS | |S d S rC   )ra   r  r   r   r   r   _basic_cancel)rU   consumer_tagra   r0   r0   r1   basic_cancel?  s    zChannel.basic_cancelc                   s   z| j | }W n tk
r$   Y d S X z| j| W n tk
rJ   Y nX | | z| j| \}}| j| W n tk
r   Y nX t 	|}| 
  |S rC   )Z_tag_to_queuer   r   remove_unsubscribe_fromr  r   rX   r^   r  r  )rU   r  r  r   r   rd   rg   r0   r1   r  L  s"    
zChannel._basic_cancelc                 C  s.   |r| j rd| j|d|gS d| j|gS )Nru   /)r   joinr  )rU   r   r   r0   r0   r1   _get_publish_topic`  s    
zChannel._get_publish_topicc                 C  s   | j | \}}| ||S rC   )r  r!  )rU   r  r   r   r0   r0   r1   _get_subscribe_topice  s    zChannel._get_subscribe_topicc                   sN    fdd j D }|sd S  j}|jjd kr8|j  |j _|| d S )Nc                   s   g | ]}  |qS r0   )r"  rR   r  rT   r0   r1   rV   j  s   z&Channel._subscribe.<locals>.<listcomp>)r   r   ra   r   r   r   Z
psubscribe)rU   keyscr0   rT   r1   r   i  s    

zChannel._subscribec                 C  s.   |  |}| j}|jr*|jjr*||g d S rC   )r"  r   ra   r   unsubscribe)rU   r  topicr%  r0   r0   r1   r  t  s    
zChannel._unsubscribe_fromc                 C  s   t |d dkr&|d dkr&d|_d S t |d dkr\|d |d |d |d f\}}}}n |d d |d |d f\}}}}||||dS )	Nr   r&  rL   FZpmessager   r   )r   patternrF   data)r   Z
subscribed)rU   r@   rr   r(  rF   r)  r0   r0   r1   _handle_messagez  s    & zChannel._handle_messagec                 C  sf   | j }g }z|| | W n tk
r2   Y nX |jd k	r^|jjddr^|| | q4t|S )Nr   r:   )r   r   _receive_oner   ra   Zcan_readany)rU   r%  rd   r0   r0   r1   r    s    zChannel._receivec              	   C  s  d }z|  }W n | jk
r.   d | _ Y nX t|ttfr
| ||}t|d dr
t|d }|d r
|d dkr|	d\}}}zt
t|d }W n: ttfk
r   td|t|d d	 d
d t Y nX |dd
d }| j|| j|  dS d S )Nr   r|   rF   r)  r   r  .z&Cannot process event on channel %r: %si   r   r  T)r_   r   r   r  rW   tupler+  r   endswith	partitionr   r   r   warnreprr   splitra   _deliverr   )rU   r%  responser  rF   r   r|   r   r0   r0   r1   r,    s8    
   zChannel._receive_oner   c                   sp   j tj  sd S  fddjD |p4dg }jj_d|}jr^j	|}jjj
|  d S )Nc                   s"   g | ]} D ]} ||qqS r0   )
_q_for_pri)rR   prir  ZqueuesrU   r0   r1   rV     s     z(Channel._brpop_start.<locals>.<listcomp>r   rM   )rM   )r   consumer`   r   r   r@   ra   r   rO   r]   Zsend_command)rU   r;   r$  command_argsr0   r9  r1   r     s    
zChannel._brpop_startc                 K  s   zz| jj| jjdf|}W n$ | jk
r@   | jj   Y nX |r|\}}t|| jdd }| j	
| | jtt|| W dS t W 5 d | _ X d S )NrM   r   r   T)r   r@   r_   ra   r   r
  r   rsplitr   r   rotater5  r   r   )rU   rc   Z
dest__itemdestitemr0   r0   r1   r    s$    
zChannel._brpop_readc                 K  s*   |dkr| j   n| j| jj| d S )Nr   )r   r_   r@   ra   )rU   r   rc   r0   r0   r1   r     s    zChannel._poll_errorc              
   C  s\   |   J}| jD ]6}|| ||}|rtt|  W  5 Q R  S qt W 5 Q R X d S rC   )r   r   Zrpopr7  r   r   r   )rU   r  r@   r8  r?  r0   r0   r1   _get  s    

zChannel._getc                 C  sx   |   f}| R}| jD ]}|| ||}q| }tdd |D W  5 Q R  W  5 Q R  S Q R X W 5 Q R X d S )Nc                 s  s   | ]}t |tjr|V  qd S rC   )r  numbersIntegral)rR   sizer0   r0   r1   	<genexpr>  s    z Channel._size.<locals>.<genexpr>)r   rq   r   llenr7  r   sum)rU   r  r@   r   r8  sizesr0   r0   r1   _size  s    


zChannel._sizec                 C  s$   |  |}|r | | j | S |S rC   )priorityr   )rU   r  r8  r0   r0   r1   r7    s    
zChannel._q_for_pric                 C  s   | j }|t||d  S )Nr   )r   r   )rU   nZstepsr0   r0   r1   rI    s    zChannel.priorityc              	   K  s>   | j |dd}|  }|| ||t| W 5 Q R X dS )zDeliver message.F)reverseN)Z_get_message_priorityr   r  r7  r   )rU   r  r|   rl   r8  r@   r0   r0   r1   _put  s    
zChannel._putc              	   K  s0   |   }|| ||t| W 5 Q R X dS )zDeliver fanout message.N)r   publishr!  r   )rU   r   r|   r   rl   r@   r0   r0   r1   _put_fanout  s
    

zChannel._put_fanoutc                 K  s   |r| j | d S rC   )r   r   )rU   r  Zauto_deleterl   r0   r0   r1   
_new_queue  s    zChannel._new_queuec              	   C  sl   |  |jdkr&||ddf| j|< |  4}|| j|f | j|pJd|pPd|pVdg W 5 Q R X d S )Nfanout#*ru   )	Ztypeofr   replacer  r   Zsaddkeyprefix_queuer   r   )rU   r   r   r(  r  r@   r0   r0   r1   _queue_bind  s     


zChannel._queue_bindc           
   
   O  s   | j | | j|ddn}|| j|f | j|p:d|p@d|pFdg | ,}| j	D ]}	|
| ||	}q^|  W 5 Q R X W 5 Q R X d S )Nr@   r   ru   )r   r   r   r   ZsremrT  r   r   rq   r   deleter7  r   )
rU   r  r   r   r(  r[   rl   r@   r   r8  r0   r0   r1   _delete  s    

zChannel._deletec                 K  sj   |   X}| D}| jD ]}|| ||}qt| W  5 Q R  W  5 Q R  S Q R X W 5 Q R X d S rC   )r   rq   r   existsr7  r-  r   )rU   r  rl   r@   r   r8  r0   r0   r1   
_has_queue  s
    


zChannel._has_queuec              
     sZ    j | }  >}||}|s2g W  5 Q R  S  fdd|D W  5 Q R  S Q R X d S )Nc                   s    g | ]}t t| jqS r0   )r/  r   r4  r   )rR   valrT   r0   r1   rV   -  s     z%Channel.get_table.<locals>.<listcomp>)rT  r   Zsmembers)rU   r   re   r@   r   r0   rT   r1   	get_table%  s    


zChannel.get_tablec                 C  s   |   p}| \}| jD ] }| ||}|||}q| }t|d d d W  5 Q R  W  5 Q R  S Q R X W 5 Q R X d S )NrL   )r   rq   r   r7  rE  rV  r   rF  )rU   r  r@   r   r8  ZpriqrG  r0   r0   r1   _purge/  s    


zChannel._purgec                   s   d| _ | jr.z|   W n tk
r,   Y nX | js| jj|  | j	d}|d k	rz| j
D ]}|| jkr\| j||d q\|   |   t   d S )NTr@   r   )_closingr   r  r   closedra   r  r   r   r   r  r   Zqueue_deleter  _close_clientsr^   r   )rU   r@   r  rg   r0   r1   r   8  s     

zChannel.closec                 C  sP   dD ]F}z$| j | }|jd  }|_|  W q tt| jfk
rH   Y qX qd S )N)r@   r   )r   ra   r
  r   r   r/   )rU   attrr@   ra   r0   r0   r1   r_  M  s    
zChannel._close_clientsc                 C  sh   t |tjsd|r|dkrt}n|dr4|dd  }zt|}W n" tk
rb   td|Y nX |S )Nr  r   z/Database is int between 0 and limit - 1, not {})r  rA  rB  
DEFAULT_DB
startswithintr   format)rU   Zvhostr0   r0   r1   _prepare_virtual_hostW  s    

zChannel._prepare_virtual_hostc                 K  s   |S rC   r0   )rU   r   r   paramsr0   r0   r1   _filter_tcp_connparamsf  s    zChannel._filter_tcp_connparamsc                   s  | j j}|jpd|jp| j j|j|j|j| j| j	| j
| j| j| j| jd}| j}t|dr|g}t|drx|t|j7 }|D ]}t|jdr| qq||d |jrz||j | j|d< W n tk
r   Y nX |d }d|krvt|\}}	}	}
}}}|d	krN| jf |}|jtjd
| df| |dd  |dd  |dd  |
|d< ||d< |dd  |dd  | |dd |d< |  |dp| j}|rG  fddd|}|}||d< |S )Nz	127.0.0.1)hostportvirtual_hostusernamepasswordr   r   r   r   r   r   r   rw   	__bases__r   connection_classrh  z://r(   r  )rn  pathr   r   r   rk  rl  ri  rj  dbc                      s   e Zd Z fddZ  ZS )z'Channel._connparams.<locals>.Connectionc                   s   t  j|  |  d S rC   )r^   r
  r   )rU   r[   )rh   rF   r0   r1   r
    s    z2Channel._connparams.<locals>.Connection.disconnect)r6   r7   r8   r
  rr   r0   rE   rg   r1   
Connection  s   rq  ) ra   r@   hostnameri  default_portrj  Zuseridrl  r   r   r   r   r   r   r   rn  r%   rW   rm  r   rw   rX   sslupdateconnection_class_sslr   r   rg  r$   ZUnixDomainSocketConnectionre  r   )rU   asynchronousZconninfo
connparamsZ
conn_classclassesklassrh  schemer   rk  rl  ro  queryZconnection_clsrq  r0   rE   r1   _connparamsj  sv    





zChannel._connparamsc                 C  s    |r| j | jdS | j | jdS )N)ro   )r   r  r  rU   rw  r0   r0   r1   _create_client  s    zChannel._create_clientc                 C  s,   | j |d}| jj|d d| _tjf |S )Nrw  rp  )rp  )r}  r  rd  r$   ConnectionPool)rU   rw  rf  r0   r0   r1   	_get_pool  s    zChannel._get_poolc                 C  s4   t jdk rtdt | jr.tjt| jdS t jS )N)r   rL   r   zSRedis transport requires redis-py versions 3.2.0 or later. You have {0.__version__}rm   )	r$   r   r
   rd  rO   	functoolspartialrs   ZStrictRedisrT   r0   r0   r1   r     s    
zChannel._get_clientc                 c  s   |r|V  n
|   V  d S rC   r  rU   r@   r0   r0   r1   r     s    zChannel.conn_or_acquirec                 C  s   | j d kr|  | _ | j S rC   )r  r  rT   r0   r0   r1   r    s    

zChannel.poolc                 C  s   | j d kr| jdd| _ | j S )NTr  )r	  r  rT   r0   r0   r1   r    s    
zChannel.async_poolc                 C  s   | j ddS )z+Client used to publish messages, BRPOP etc.Tr  r  rT   r0   r0   r1   r@     s    zChannel.clientc                 C  s   | j dd}| S )z1Pub/Sub connection used to consume fanout queues.Tr  )r  ry   r  r0   r0   r1   r     s    zChannel.subclientc                 C  s   | j | j d S rC   )r   ru  r   rT   r0   r0   r1   r    s    zChannel._update_queue_cyclec                 C  s   ddl m} |jS )Nr   r!   )r$   r"   r/   )rU   r"   r0   r0   r1   r     s    zChannel._get_response_errorc                   s    fdd j D S )z<Set of queues being consumed from (excluding fanout queues).c                   s   h | ]}| j kr|qS r0   )r   r#  rT   r0   r1   	<setcomp>  s    
z(Channel.active_queues.<locals>.<setcomp>)Z_active_queuesrT   r0   rT   r1   r     s    zChannel.active_queues)F)F)r   )F)NN)F)F)F)N)br6   r7   r8   r9   r}   Z_clientZ
_subclientr]  Zsupports_fanoutrT  r  r   r   r   r  r   r   r   r   r   r   r   PRIORITY_STEPSr   r   r   r   r   r   r   DEFAULT_HEALTH_CHECK_INTERVALr   r   r   rO   r   r	  r  r   r   from_transport_optionsr$   rq  rn  SSLConnectionrv  rw   rD   r  r   r   r  r  r  r  r  r!  r"  r   r  r+  r  r,  r   r  r   r@  rH  r7  rI  rL  rN  rO  rU  rW  rY  r[  r\  r   r_  re  rg  r}  r  r  r   r   r   r   r  r  r   r@   r   r  r   r   rr   r0   r0   rg   r1   r   b  s   %	 

	

	
  

N





r   c                      sv   e Zd ZdZeZdZeZdZdZ	e
jjjdedddgdZerJe \ZZ fd	d
Zdd Zdd Zdd Z  ZS )r'   zRedis Transport.Nr$   Tdirectr'  rP  )rw  Zexchange_typec                   s*   t d krtdt j|| t | _d S )Nz)Missing redis library (pip install redis))r$   ImportErrorr^   rw   r   r  rk   rg   r0   r1   rw     s    zTransport.__init__c                 C  s   t jS rC   )r$   __version__rT   r0   r0   r1   driver_version  s    zTransport.driver_versionc                   s   | j j jj | jfdd}|_ fddj 	dj
 |jjdt}	|j d S )Nc                   sB   | j r| j   jr>zj W n tk
r<   Y nX d S rC   )r   r  r   on_tickr   )ra   )r  loopr   r0   r1   _on_disconnect   s    z:Transport.register_with_event_loop.<locals>._on_disconnectc                     s       fddj D  d S )Nc                   s   g | ]} ||qS r0   r0   )rR   r   )
add_readerr   r0   r1   rV   /  s     zMTransport.register_with_event_loop.<locals>.on_poll_start.<locals>.<listcomp>)r   r0   )r  r  cycle_poll_startr   r0   r1   r   -  s    z9Transport.register_with_event_loop.<locals>.on_poll_startr   r   )r  r   r   r   r  r   r   r  r   Zcall_repeatedlyr   r@   Ztransport_optionsr   r  r   )rU   ra   r  r  r   r0   )r  r  r  r  r   r   r1   register_with_event_loop  s$    z"Transport.register_with_event_loopc                 C  s   | j | dS )z1Handle AIO event for one of our file descriptors.N)r  r   )rU   r   r0   r0   r1   r   ;  s    zTransport.on_readable)r6   r7   r8   r9   r   Zpolling_intervalDEFAULT_PORTrs  Zdriver_typeZdriver_namer   r'   Z
implementsextend	frozensetr$   r2   r   r    rw   r  r  r   rr   r0   r0   rg   r1   r'     s    
"r'   c                   @  s   e Zd ZdZdS )SentinelManagedSSLConnectionzConnect to a Redis server using Sentinel + TLS.

        Use Sentinel to identify which Redis server is the current master
        to connect to and when connecting to the Master server, use an
        SSL Connection.
        Nr5   r0   r0   r0   r1   r  A  s   r  c                   @  sH   e Zd ZdZejd Zer ejndZer,e	ndZ
d	ddZd
ddZdS )SentinelChannela  Channel with explicit Redis Sentinel knowledge.

    Broker url is supposed to look like:

    .. code-block::

        sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...

    where each sentinel is separated by a `;`.

    Other arguments for the sentinel should come from the transport options
    (see `transport_options` of :class:`~kombu.connection.Connection`).

    You must provide at least one option in Transport options:
     * `master_name` - name of the redis group to poll

    Example:
    -------
    .. code-block:: python

        >>> import kombu
        >>> c = kombu.Connection(
             'sentinel://sentinel1:26379;sentinel://sentinel2:26379',
             transport_options={'master_name': 'mymaster'}
        )
        >>> c.connect()
    )master_namemin_other_sentinelssentinel_kwargsNFc           	      C  s   |  |}| }|dd  |dd  g }| jjjD ]4}t|}|jdkr8|jpZ| jj	}|
|j|f q8|s|
|d |d f tj|ft| ddt| dd d|}t| dd }|d krtd	||| jjS )
Nrh  ri  r   r  r   r  )r  r  r  z1'master_name' transport option must be specified.)r}  copyrX   ra   r@   Zaltr   r{  ri  rs  r   rr  r   ZSentinelr   r   Z
master_forr   ro   )	rU   rw  rx  Zadditional_paramsZ	sentinelsurlri  Zsentinel_instr  r0   r0   r1   _sentinel_managed_pools  s:    



z&SentinelChannel._sentinel_managed_poolc                 C  s
   |  |S rC   )r  r~  r0   r0   r1   r    s    zSentinelChannel._get_pool)F)F)r6   r7   r8   r9   r   r  r   SentinelManagedConnectionrn  r  rv  r  r  r0   r0   r0   r1   r  N  s   

%r  c                   @  s   e Zd ZdZdZeZdS )SentinelTransportzRedis Sentinel Transport.ig  N)r6   r7   r8   r9   rs  r  r   r0   r0   r0   r1   r    s   r  )Mr9   
__future__r   r  rA  r(   r   collectionsr   
contextlibr   r  r   r   Zviner   Zkombu.exceptionsr	   r
   Z	kombu.logr   Zkombu.utils.compatr   Zkombu.utils.encodingr   Zkombu.utils.eventior   r   r   Zkombu.utils.functionalr   Zkombu.utils.jsonr   r   Zkombu.utils.objectsr   Zkombu.utils.schedulingr   Zkombu.utils.urlr   ru   r   r$   r  r   loggercriticalr2  r  r  ra  r  r  r   r2   r3   r  r4   rB   rG   rH   rv   rs   r@   rz   rn   ZPubSubrx   r}   r   r   r'   r  r  r  r  r  r0   r0   r0   r1   <module>   s~   5



S4i       D
N