U
    .e0d                  	   @   s  d Z ddlZddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddlmZmZmZ ddl m!Z! dZ"e#edZ$dZ%dZ&dZ'e!e(Z)e)j*Z+dZ,dZ-dZ.ej/ej0ej1ej2ej3ej4ej5ej6dZ7G dd deZ8e9e8 eddd dd d! Z:d"e%e
e;e<fd#d$Z=d%d& Z>d'd( Z?e?d)G d*d+ d+Z@e?d,G d-d. d.ZAG d/d0 d0ZBd1d2 ZCd3d4 ZDdS )5a  In-memory representation of cluster state.

This module implements a data-structure used to keep
track of the state of a cluster of workers and the tasks
it is working on (by consuming events).

For every event consumed the state is updated,
so the state represents the state of the cluster
at the time of the last event.

Snapshots (:mod:`celery.events.snapshot`) can be used to
take "pictures" of this state at regular intervals
to for example, store that in a database.
    N)defaultdict)Callable)datetime)Decimal)islice)
itemgetter)time)MappingOptional)WeakSetref	timetuple)cached_property)states)LRUCachememoizepass1)
get_logger)WorkerTaskStateheartbeat_expirespypy_version_info      zmSubstantial drift from %s may mean clocks are out of sync.  Current drift is %s seconds.  [orig: %s recv: %s]z4<State: events={0.event_count} tasks={0.task_count}>z9<Worker: {0.hostname} ({0.status_string} clock:{0.clock})z4<Task: {0.name}({0.uuid}) {0.state} clock:{0.clock}>)sentreceivedstartedfailedretried	succeededrevokedrejectedc                       s(   e Zd ZdZ fddZdd Z  ZS )CallableDefaultdicta  :class:`~collections.defaultdict` with configurable __call__.

    We use this for backwards compatibility in State.tasks_by_type
    etc, which used to be a method but is now an index instead.

    So you can do::

        >>> add_tasks = state.tasks_by_type['proj.tasks.add']

    while still supporting the method call::

        >>> add_tasks = list(state.tasks_by_type(
        ...     'proj.tasks.add', reverse=True))
    c                    s   || _ t j|| d S N)funsuper__init__)selfr&   argskwargs	__class__ 7/tmp/pip-unpacked-wheel-f4liivr4/celery/events/state.pyr(   _   s    zCallableDefaultdict.__init__c                 O   s   | j ||S r%   )r&   )r)   r*   r+   r.   r.   r/   __call__c   s    zCallableDefaultdict.__call__)__name__
__module____qualname____doc__r(   r0   __classcell__r.   r.   r,   r/   r$   O   s   r$   i  c                 C   s   | d S Nr   r.   )a_r.   r.   r/   <lambda>j       r9   )maxsizeZkeyfunc                 C   s    t t| |t|t| d S r%   )warnDRIFT_WARNINGr   fromtimestamp)hostnamedriftlocal_received	timestampr.   r.   r/   _warn_driftj   s    rC   <   c                 C   s8   |||r||n|}|| |r(|| } | ||d   S )z#Return time when heartbeat expires.g      Y@r.   )rB   freqexpire_windowr   float
isinstancer.   r.   r/   r   r   s    
r   c                 C   s
   | f |S r%   r.   )clsfieldsr.   r.   r/   _depickle_task~   s    rK   c                    s    fdd}|S )Nc                    s(    fdd}|| _  fdd}|| _| S )Nc                    s$   t || jr t|  t| kS tS r%   )rH   r-   getattrNotImplemented)thisotherattrr.   r/   __eq__   s    z8with_unique_field.<locals>._decorate_cls.<locals>.__eq__c                    s   t t|  S r%   )hashrL   )rN   rP   r.   r/   __hash__   s    z:with_unique_field.<locals>._decorate_cls.<locals>.__hash__)rR   rT   )rI   rR   rT   rP   r.   r/   _decorate_cls   s
    z(with_unique_field.<locals>._decorate_clsr.   )rQ   rU   r.   rP   r/   with_unique_field   s    rV   r?   c                   @   s   e Zd ZdZdZeZdZes$ed Z	ddd	Z
d
d Zdd Zdd Zdd Zedd Zedd ZeefddZedd ZdS )r   zWorker State.   )r?   pidrE   
heartbeatsclockactive	processedloadavgsw_identsw_versw_sys)event__dict____weakref__NrD   r   c                 C   s`   || _ || _|| _|d krg n|| _|p*d| _|| _|| _|| _|	| _|
| _	|| _
|  | _d S r6   )r?   rX   rE   rY   rZ   r[   r\   r]   r^   r_   r`   _create_event_handlerra   )r)   r?   rX   rE   rY   rZ   r[   r\   r]   r^   r_   r`   r.   r.   r/   r(      s    
zWorker.__init__c                 C   s6   | j | j| j| j| j| j| j| j| j| j	| j
| jffS r%   )r-   r?   rX   rE   rY   rZ   r[   r\   r]   r^   r_   r`   r)   r.   r.   r/   
__reduce__   s         zWorker.__reduce__c                    sP   t j jjjjjjd d d tttt	j
tf fdd	}|S )Nc	                    s   |pi }|  D ]\}	}
 |	|
 q| dkr<g d d < n||rD|sHd S ||||| }||krttj||| |r|}|d krd |r|d kr| n
|| d S )Noffline   r   )itemsrC   r?   )type_rB   rA   rJ   Z	max_driftabsintinsortlenkvr@   heartsZ_setZ	hb_appendZhb_popZhbmaxrY   r)   r.   r/   ra      s(     
z+Worker._create_event_handler.<locals>.event)object__setattr__heartbeat_maxrY   popappendHEARTBEAT_DRIFT_MAXrl   rm   bisectrn   ro   r)   ra   r.   rs   r/   rd      s        zWorker._create_event_handlerc                 K   s6   |rt |f|n|}| D ]\}}t| || qd S r%   )dictrj   setattr)r)   fkwdrp   rq   r.   r.   r/   update   s    zWorker.updatec                 C   s
   t | S r%   )R_WORKERformatre   r.   r.   r/   __repr__   s    zWorker.__repr__c                 C   s   | j r
dS dS )NZONLINEZOFFLINEalivere   r.   r.   r/   status_string   s    zWorker.status_stringc                 C   s   t | jd | j| jS )Nri   )r   rY   rE   rF   re   r.   r.   r/   r      s    
 zWorker.heartbeat_expiresc                 C   s   t | jo| | jk S r%   )boolrY   r   )r)   Znowfunr.   r.   r/   r      s    zWorker.alivec                 C   s
   d | S )Nz{0.hostname}.{0.pid})r   re   r.   r.   r/   id   s    z	Worker.id)NNrD   Nr   NNNNNN)r1   r2   r3   r4   rv   HEARTBEAT_EXPIRE_WINDOWrF   _fieldsPYPY	__slots__r(   rf   rd   r   r   propertyr   r   r   r   r   r.   r.   r.   r/   r      s8                  
!

r   uuidc                   @   s6  e Zd ZdZd Z Z Z Z Z Z	 Z
 Z Z Z Z Z Z Z Z Z Z Z Z Z Z Z Z ZZejZdZ dZ!e"sdZ#ej$diZ%dZ&d$dd	Z'dddej(e)e*j+ej,fd
dZ-d%ddZ.dd Z/dd Z0dd Z1dd Z2dd Z3dd Z4e5dd Z6e5dd Z7e5dd Z8e9d d! Z:e9d"d# Z;dS )&r   zTask State.Nr   )r   namestater   r   r   r#   r!   r   r    r"   r*   r+   etaexpiresretriesworkerresult	exceptionrB   runtime	tracebackexchangerouting_keyrZ   clientrootroot_idparent	parent_idchildren)rb   rc   )r   r*   r+   r   r   r   r   r   )r*   r+   r   r   r   r   r   r   r   r   r   r   c                    sd   | _ | _ jd k	r4t fdd|p(dD  _nt  _ j j jd _|r` j	| d S )Nc                 3   s(   | ] }| j jkr j j|V  qd S r%   )cluster_statetasksget).0Ztask_idre   r.   r/   	<genexpr>"  s   z Task.__init__.<locals>.<genexpr>r.   )r   r   r   )
r   r   r   r   _serializable_children_serializable_root_serializable_parent_serializer_handlersrb   r   )r)   r   r   r   r+   r.   re   r/   r(     s    
zTask.__init__c	           
         s   |pi }||}	|	d k	r&|| || n|  }	|	|kr~| j|kr~||	|| jkr~| j|	  d k	r fdd| D }n|j|	|d | j| d S )Nc                    s   i | ]\}}| kr||qS r.   r.   )r   rp   rq   Zkeepr.   r/   
<dictcomp>E  s      zTask.event.<locals>.<dictcomp>)r   rB   )upperr   merge_rulesr   rj   r   rb   )
r)   rk   rB   rA   rJ   
precedencer}   Ztask_event_to_stateRETRYr   r.   r   r/   ra   1  s    
z
Task.eventc                    s8    sg n  dkrj n fdd}t| S )z;Information about this task suitable for on-screen display.Nc                  3   s8   t t   D ]"} t| d }|d k	r| |fV  qd S r%   )listrL   )keyvalueextrarJ   r)   r.   r/   _keysS  s    zTask.info.<locals>._keys)_info_fieldsr|   )r)   rJ   r   r   r.   r   r/   infoN  s    z	Task.infoc                 C   s
   t | S r%   )R_TASKr   re   r.   r.   r/   r   [  s    zTask.__repr__c                    s&   t j jj fddjD S )Nc                    s"   i | ]}||t  |qS r.   )r   )r   rp   r   handlerr)   r.   r/   r   a  s     z Task.as_dict.<locals>.<dictcomp>)rt   __getattribute__r   r   r   re   r.   r   r/   as_dict^  s
    zTask.as_dictc                 C   s   dd | j D S )Nc                 S   s   g | ]
}|j qS r.   r   )r   taskr.   r.   r/   
<listcomp>f  s     z/Task._serializable_children.<locals>.<listcomp>)r   r)   r   r.   r.   r/   r   e  s    zTask._serializable_childrenc                 C   s   | j S r%   )r   r   r.   r.   r/   r   h  s    zTask._serializable_rootc                 C   s   | j S r%   )r   r   r.   r.   r/   r   k  s    zTask._serializable_parentc                 C   s   t | j|  ffS r%   )rK   r-   r   re   r.   r.   r/   rf   n  s    zTask.__reduce__c                 C   s   | j S r%   )r   re   r.   r.   r/   r   q  s    zTask.idc                 C   s   | j d kr| jS | j jS r%   )r   r   r   re   r.   r.   r/   originu  s    zTask.originc                 C   s   | j tjkS r%   r   r   ZREADY_STATESre   r.   r.   r/   readyy  s    z
Task.readyc                 C   s4   z| j o| jjj| j  W S  tk
r.   Y d S X d S r%   )r   r   r   dataKeyErrorre   r.   r.   r/   r   }  s    zTask.parentc                 C   s4   z| j o| jjj| j  W S  tk
r.   Y d S X d S r%   )r   r   r   r   r   re   r.   r.   r/   r     s    z	Task.root)NNN)NN)<r1   r2   r3   r4   r   r   r   r   r!   r   r    r"   r#   r*   r+   r   r   r   r   r   r   rB   r   r   r   r   r   r   r   r   PENDINGr   rZ   r   r   r   RECEIVEDr   r   r(   r   r}   TASK_EVENT_TO_STATEr   r   ra   r   r   r   r   r   r   rf   r   r   r   r   r   r   r   r.   r.   r.   r/   r      s     
  





r   c                
   @   s  e Zd ZdZeZeZdZdZdZd9ddZ	e
d	d
 Zdd Zd:ddZd;edddZd<ddZd=edddZdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zefd%d&Zd>ee d'd(d)Zd?ed*d+d,ZeZd@d-d.Z dAd/d0Z!d1d2 Z"d3d4 Z#d5d6 Z$d7d8 Z%dS )Br   zRecords clusters state.r   rW   N  '  c                 C   s   || _ |d krt|n|| _|d kr,t|n|| _|d kr>g n|| _|| _|| _|| _|| _t	
 | _i | _t | _i | _|   t| jt| _| jt|	| j t| jt| _| jt|
| j d S r%   )event_callbackr   workersr   	_taskheapmax_workers_in_memorymax_tasks_in_memoryon_node_joinon_node_leave	threadingLock_mutexhandlersset_seen_types_tasks_to_resolverebuild_taskheapr$   _tasks_by_typer   tasks_by_typer   !_deserialize_Task_WeakSet_Mapping_tasks_by_workertasks_by_worker)r)   callbackr   r   taskheapr   r   r   r   r   r   r.   r.   r/   r(     sB    


 
 
zState.__init__c                 C   s   |   S r%   )_create_dispatcherre   r.   r.   r/   _event  s    zState._eventc              
   O   sL   | dd}| j0 z|||W W  5 Q R  S |r<|   X W 5 Q R X d S )Nclear_afterF)rw   r   _clear)r)   r&   r*   r+   r   r.   r.   r/   freeze_while  s    zState.freeze_whileTc              
   C   s(   | j  | |W  5 Q R  S Q R X d S r%   )r   _clear_tasksr)   r   r.   r.   r/   clear_tasks  s    zState.clear_tasks)r   c                 C   sJ   |r.dd |   D }| j  | j| n
| j  g | jd d < d S )Nc                 S   s"   i | ]\}}|j tjkr||qS r.   r   r   r   r   r.   r.   r/   r     s    z&State._clear_tasks.<locals>.<dictcomp>)	itertasksr   clearr   r   )r)   r   Zin_progressr.   r.   r/   r     s    

zState._clear_tasksc                 C   s$   | j   | | d| _d| _d S r6   )r   r   r   event_count
task_countr   r.   r.   r/   r     s    

zState._clearc              
   C   s(   | j  | |W  5 Q R  S Q R X d S r%   )r   r   r   r.   r.   r/   r     s    zState.clearc                 K   s\   z"| j | }|r|| |dfW S  tk
rV   | j|f| }| j |< |df Y S X dS )zsGet or create worker by hostname.

        Returns:
            Tuple: of ``(worker, was_created)`` pairs.
        FTN)r   r   r   r   )r)   r?   r+   r   r.   r.   r/   get_or_create_worker  s    


zState.get_or_create_workerc                 C   sJ   z| j | dfW S  tk
rD   | j|| d }| j |< |df Y S X dS )zGet or create task by uuid.Fr   TN)r   r   r   )r)   r   r   r.   r.   r/   get_or_create_task  s
    zState.get_or_create_taskc              
   C   s(   | j  | |W  5 Q R  S Q R X d S r%   )r   r   r{   r.   r.   r/   ra     s    zState.eventc                 C   s    |  t|dd|gdd S )Deprecated, use :meth:`event`.-r   typer   r   r|   joinr)   rk   rJ   r.   r.   r/   
task_event  s    zState.task_eventc                 C   s    |  t|dd|gdd S )r   r   r   r   r   r   r   r.   r.   r/   worker_event  s    zState.worker_eventc                    s   j jjtdddtdddddjjjjj 	j	j
jj 
jj  jj jjjj jjjjtttjdf 	
fdd	}|S )	Nr?   rB   rA   r   rZ   Tc                    sd   j d7  _ r|  | d d\}}}z|}W n |k
rP   Y nX ||| |fS |dkrDz| \}	}
}W n |k
r   Y nX |dk}z|	d }}W n8 |k
r   |rЈ|	d }}n|	 }|	< Y nX |||
||  
r|s|dkr
| r4|r4| |	d  ||f|fS n|dkr`| \}}	}
}}|d	k}z|d }}W n. |k
r    |d
 }|< d}Y nX |r|	|_nXz|	}W n& |k
r   |	 }|	< Y nX ||_|d k	r|r|d ||
 |r|	n|j}t}|d 	kr>d |||
|t|}|rn|d krn| n
|| |dkr j	d7  _	|||
||  |j
}|d k	rڈ| |rڈ|| |	| |jr zj|j }W n  |k
r   | Y nX |j| zj|}W n |k
rF   Y nX |j| ||f|fS d S )Nrh   r   r   r   rg   FZonliner   r   r   Tr   ri   r   )r   	partitionra   rw   r   r   r   ro   r   r   r   addr   r   _add_pending_task_childr   r   r   )ra   r   r   rn   createdgroupr8   subjectr   r?   rB   rA   Z
is_offliner   r   rZ   Zis_client_eventr   Ztask_createdr   ZheapsZtimetupZ	task_nameZparent_task	_childrenr   r   add_typer   Zget_handlerZget_taskZget_task_by_type_setZget_task_by_worker_setZ
get_workerZmax_events_in_heapr   r   r)   r   r   ZtfieldsZ	th_appendZth_popZwfieldsr   r.   r/   r     s    


 




z(State._create_dispatcher.<locals>._event)r   __getitem__r   r   r   rx   rw   r   heap_multiplierr   r   r   r   r   r   r   r   r   r   r   r   r   rz   rn   )r)   r   r.   r  r/   r     s0       4^zState._create_dispatcherc                 C   sF   z| j |j }W n& tk
r6   t  }| j |j< Y nX || d S r%   )r   r   r   r   r   )r)   r   chr.   r.   r/   r   |  s
    zState._add_pending_task_childc                    s2    fdd| j  D  }| jd d < |  d S )Nc                    s$   g | ]} |j |j|jt|qS r.   )rZ   rB   r   r   r   tr   r.   r/   r     s   z*State.rebuild_taskheap.<locals>.<listcomp>)r   valuesr   sort)r)   r   heapr.   r   r/   r     s    
zState.rebuild_taskheap)limitc                 c   s6   t | j D ]"\}}|V  |r|d |kr q2qd S )Nrh   )	enumerater   rj   )r)   r  indexrowr.   r.   r/   r     s    zState.itertasksreversec                 c   sb   | j }|rt|}t }t|d|D ]8}|d  }|dk	r$|j}||kr$||fV  || q$dS )zkGenerator yielding tasks ordered by time.

        Yields:
            Tuples of ``(uuid, Task)``.
        r      N)r   reversedr   r   r   r   )r)   r  r  Z_heapseenZevtupr   r   r.   r.   r/   tasks_by_time  s    

zState.tasks_by_timec                    s"   t  fdd| j|dD d|S )zGet all tasks by type.

        This is slower than accessing :attr:`tasks_by_type`,
        but will be ordered by time.

        Returns:
            Generator: giving ``(uuid, Task)`` pairs.
        c                 3   s$   | ]\}}|j  kr||fV  qd S r%   r   r   r  r.   r/   r     s    
z'State._tasks_by_type.<locals>.<genexpr>r  r   r   r  )r)   r   r  r  r.   r  r/   r     s
    	 zState._tasks_by_typec                    s"   t  fdd| j|dD d|S )znGet all tasks by worker.

        Slower than accessing :attr:`tasks_by_worker`, but ordered by time.
        c                 3   s&   | ]\}}|j j kr||fV  qd S r%   )r   r?   r   r?   r.   r/   r     s    z)State._tasks_by_worker.<locals>.<genexpr>r  r   r  )r)   r?   r  r  r.   r  r/   r     s
     zState._tasks_by_workerc                 C   s
   t | jS )z%Return a list of all seen task types.)sortedr   re   r.   r.   r/   
task_types  s    zState.task_typesc                 C   s   dd | j  D S )z+Return a list of (seemingly) alive workers.c                 s   s   | ]}|j r|V  qd S r%   r   )r   wr.   r.   r/   r     s      z&State.alive_workers.<locals>.<genexpr>)r   r	  re   r.   r.   r/   alive_workers  s    zState.alive_workersc                 C   s
   t | S r%   )R_STATEr   re   r.   r.   r/   r     s    zState.__repr__c                 C   s8   | j | j| j| jd | j| j| j| jt| j	t| j
f
fS r%   )r-   r   r   r   r   r   r   r   _serialize_Task_WeakSet_Mappingr   r   re   r.   r.   r/   rf     s         zState.__reduce__)
NNNNr   r   NNNN)T)T)T)T)N)NT)NT)NT)&r1   r2   r3   r4   r   r   r   r   r  r(   r   r   r   r   r   r   r   r   r   r   ra   r   r   r   r   r   r   r
   rm   r   r  Ztasks_by_timestampr   r   r  r  r   rf   r.   r.   r.   r/   r     sR                  

	

{

r   c                 C   s   dd |   D S )Nc                 S   s    i | ]\}}|d d |D qS )c                 S   s   g | ]
}|j qS r.   r   r  r.   r.   r/   r     s     z>_serialize_Task_WeakSet_Mapping.<locals>.<dictcomp>.<listcomp>r.   )r   r   r   r.   r.   r/   r     s      z3_serialize_Task_WeakSet_Mapping.<locals>.<dictcomp>rj   )mappingr.   r.   r/   r    s    r  c                    s   | pi }  fdd|   D S )Nc                    s(   i | ] \}}|t  fd d|D qS )c                 3   s   | ]}| kr | V  qd S r%   r.   )r   ir   r.   r/   r     s      z?_deserialize_Task_WeakSet_Mapping.<locals>.<dictcomp>.<genexpr>)r   )r   r   idsr"  r.   r/   r     s    z5_deserialize_Task_WeakSet_Mapping.<locals>.<dictcomp>r  )r   r   r.   r"  r/   r     s    
r   )Er4   rz   sysr   collectionsr   collections.abcr   r   decimalr   	itertoolsr   operatorr   r   typingr	   r
   weakrefr   r   Zkombu.clocksr   Zkombu.utils.objectsr   Zceleryr   Zcelery.utils.functionalr   r   r   Zcelery.utils.logr   __all__hasattrr   r   ry   r=   r1   loggerwarningr<   r  r   r   r   r   ZSTARTEDFAILUREr   SUCCESSZREVOKEDZREJECTEDr   r$   registerrC   rG   rH   r   rK   rV   r   r   r   r  r   r.   r.   r.   r/   <module>   sv   


  
]   G