U
    .e                     @   s   d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZmZ dd	lmZ dd
lmZ ddlmZ ddlmZ dZeeZejej ZZG dd dejZdS )z)Worker <-> Worker communication Bootstep.    )defaultdict)partial)heappush)
itemgetter)Consumer)	DummyLock)ContentDisallowedDecodeError)	bootsteps)
get_logger)Bunch   )Mingle)Gossipc                       s   e Zd ZdZd ZefZedddddddZd	d
hZ	d- fdd	Z
dd Zd.ddZdd Zdd Z fddZdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Z  ZS )/r   zfBootstep consuming events from other workers.

    This keeps the logical clock value up to date.
    idclockhostnamepidtopicactioncverZamqpZredisF      @       @c                    s   | o|  |j| _|j| _| |_|jjj| _|j| _d| jt|j	g| _
tt t t d| _|j| _| jr|jjj| j| jdd| _|jrt |_| jj| _|| _|| _d | _tt| _i | _| j| j d| _!|jj"| _"d| j#i| _$t% j&|f| d S )N.)	node_join
node_leave	node_lostr   )on_node_joinon_node_leaveZmax_tasks_in_memory)zworker.electzworker.elect.acktask)'compatible_transportappenabledZgossipeventsReceiverr   joinstrr   full_hostnamer   setontimerStater   r   stateZhubr   Z_mutexeventupdate_stateintervalheartbeat_interval_trefr   listconsensus_requestsconsensus_replieson_electon_elect_ackevent_handlersr   	call_taskelection_handlerssuper__init__)selfcZwithout_gossipr/   r0   kwargs	__class__ A/tmp/pip-unpacked-wheel-f4liivr4/celery/worker/consumer/gossip.pyr;   $   sD    


 zGossip.__init__c              
   C   s.   |  }|jj| jkW  5 Q R  S Q R X d S N)Zconnection_for_read	transportZdriver_typecompatible_transports)r<   r!   connrA   rA   rB   r    M   s    
zGossip.compatible_transportNc                 C   s$   g | j |< | jjd|||dd d S )Nzworker-electr   )r   r   r   r   )r4   
dispatchersend)r<   r   r   r   rA   rA   rB   electionQ   s    
   zGossip.electionc              
   C   sH   z| j |  W n. tk
rB } ztd| W 5 d }~X Y nX d S )NzCould not call task: %r)r!   	signatureZapply_async	Exceptionlogger	exception)r<   r   excrA   rA   rB   r8   X   s    zGossip.call_taskc           
   
   C   s   z|  |\}}}}}}}W n2 tk
rN }	 ztd|	 W Y S d }	~	X Y nX t| j| || d| ||f | jjd|d d S )Nz!election request missing field %sr   zworker-elect-ack)r   )_cons_stamp_fieldsKeyErrorrL   rM   r   r3   rG   rH   )
r<   r-   Zid_r   r   r   r   r   _rN   rA   rA   rB   r5   ^   s    
  "zGossip.on_electc                    s   t  | |j| _d S rC   )r:   startZevent_dispatcherrG   )r<   r=   r?   rA   rB   rR   j   s    zGossip.startc           
      C   s   |d }z| j | }W n tk
r,   Y d S X t| j }||d  t|t|kr| j| j	| \}}}}|| j
krtd| z| j| }	W n  tk
r   td| Y qX |	| ntd|| | j	|d  | j |d  d S )Nr   r   zI won the election %rzUnknown election topic %rznode %s elected for %r)r4   rP   r(   r,   alive_workersappendlenr   Z	sort_heapr3   r'   infor9   rL   rM   pop)
r<   r-   r   ZrepliesrS   rQ   Zleaderr   r   handlerrA   rA   rB   r6   n   s*    


zGossip.on_elect_ackc                 C   s    t d|j | | jj| d S )Nz%s joined the party)debugr   _call_handlersr)   r   r<   workerrA   rA   rB   r      s    zGossip.on_node_joinc                 C   s    t d|j | | jj| d S )Nz%s left)rY   r   rZ   r)   r   r[   rA   rA   rB   r      s    zGossip.on_node_leavec                 C   s    t d|j | | jj| d S )Nzmissed heartbeat from %s)rV   r   rZ   r)   r   r[   rA   rA   rB   on_node_lost   s    zGossip.on_node_lostc                 O   sN   |D ]D}z||| W q t k
rF } ztd|| W 5 d }~X Y qX qd S )Nz!Ignored error from handler %r: %r)rK   rL   rM   )r<   handlersargsr>   rX   rN   rA   rA   rB   rZ      s      zGossip._call_handlersc                 C   s,   | j d k	r| j   | j| j| j| _ d S rC   )r1   cancelr*   Zcall_repeatedlyr/   periodic)r<   rA   rA   rB   register_timer   s    

zGossip.register_timerc                 C   sR   | j j}t }| D ]}|js|| | | q|D ]}||jd  q:d S rC   )	r,   workersr(   valuesaliveaddr]   rW   r   )r<   rc   Zdirtyr\   rA   rA   rB   ra      s    
zGossip.periodicc                 C   s:   |    | j|d| jd}t||jgt| j|jddgS )Nzworker.#)routing_keyZ	queue_ttlT)Zqueues
on_messageZno_ack)rb   r$   r0   r   queuer   rh   Zevent_from_message)r<   ZchannelZevrA   rA   rB   get_consumers   s    zGossip.get_consumersc           	   
   C   s   |j d }|ddd dkr"d S z| j| }W n tk
rD   Y nX ||jS |jdpd|jd }|| jkrz||j\}}| | W q t	t
tfk
r } zt| W 5 d }~X Y qX n
| j  d S )Nrg   r   r   r   r   r   )Zdelivery_infosplitr7   rP   payloadheadersgetr   r.   r	   r   	TypeErrorrL   errorr   Zforward)	r<   preparemessage_typerX   r   rQ   r-   rN   rA   rA   rB   rh      s$    


zGossip.on_message)Fr   r   )N)__name__
__module____qualname____doc__labelr   requiresr   rO   rE   r;   r    rI   r8   r5   rR   r6   r   r   r]   rZ   rb   ra   rj   rh   __classcell__rA   rA   r?   rB   r      s>             )

r   N)rw   collectionsr   	functoolsr   heapqr   operatorr   Zkombur   Zkombu.asynchronous.semaphorer   Zkombu.exceptionsr   r	   Zceleryr
   Zcelery.utils.logr   Zcelery.utils.objectsr   Zmingler   __all__rt   rL   rY   rV   ZConsumerStepr   rA   rA   rA   rB   <module>   s   