U
    .eM                  
   @   s  d Z ddlZddlZddlmZmZmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ dZdZee Z!edddddddddgZ"dd Z#dd Z$G dd deZ%d d! Z&d"d# Z'e' d$d% Z(e'd&d'd(efgd)dd+d,Z)d-d. Z*e'd/d0d1d2d3 Z+ej,j-fd4d5Z.ej/j0ej1j0fd6d7Z2e&d8d0d1dd9d:Z3e&d;d<d1dd=d>Z4dd?d@Z5e&d8dAe6fgdBdCdDdE Z7e&dFe6fdGe6fgdHdIdJdG Z8e&dFe6fdKe9fdLe9fgdMdIddNdOZ:e' dPdQ Z;e& ddRdSZ<e& dTdU Z=e& dVdW Z>e& dXdY Z?e'd*dZdd[d\Z@e'd]d^d_d` ZAe' dadb ZBe'dcdddedf ZCdgdh ZDe'didddjdk ZEe'dlddddmdnZFe'dodddpdq ZGe'drdsdtduddvdwZHe'dxde6fdyeIfdzeIfgd{d|dddZJe' dd ZKe'deIfgddIdddZLe&deIfgddIdddZMe&deIfgddIdddZNe& dddZOe&deIfdeIfgddIdddZPe& dddZQe&de6fde6fde6fde6fgddIdddZRe&de6fgddIdd ZSe' dd ZTdS )z.Worker remote control command implementations.    N)UserDictdefaultdict
namedtuple)TERM_SIGNAME)	safe_repr)WorkerShutdown)signals)
maybe_list)
get_logger)jsonify	strtobool)rate   state)Request)Panel)exchangerouting_key
rate_limitcontroller_info_taliastypevisibledefault_timeouthelp	signatureargsvariadicc                 C   s   d| iS )Nok valuer    r    9/tmp/pip-unpacked-wheel-f4liivr4/celery/worker/control.pyr      s    r   c                 C   s   d| iS )Nerrorr    r!   r    r    r#   nok"   s    r%   c                
   @   s2   e Zd ZdZi Zi Zedd Zed
dd	ZdS )r   z+Global registry of remote control commands.c                 O   s    |r| j f || S | j f |S N)	_register)clsr   kwargsr    r    r#   register,   s    zPanel.registerNcontrolT      ?c
              
      s"    	f
dd}
|
S )Nc              	      s^   p| j }p$| jpd dd }| j|< t 	|j|<  rZ| j < | S )N 
r   )__name____doc__stripsplitdatar   meta)ZfunZcontrol_nameZ_help
r   r   r(   r   r   namer   r   r   r   r    r#   _inner7   s     

      

zPanel._register.<locals>._innerr    )r(   r6   r   r   r   r   r   r   r   r   r7   r    r5   r#   r'   2   s    
zPanel._register)	NNr+   Tr,   NNNN)	r/   
__module____qualname__r0   r3   r4   classmethodr*   r'   r    r    r    r#   r   &   s   
           r   c                  K   s   t jf ddi| S )Nr   r+   r   r*   r)   r    r    r#   control_commandD   s    r=   c                  K   s   t jf ddi| S )Nr   inspectr;   r<   r    r    r#   inspect_commandH   s    r?   c                 C   s   t | j S )z6Information about Celery installation for bug reports.)r   appZ	bugreportr   r    r    r#   reportN   s    rA   Z	dump_confz[include_defaults=False]with_defaults)r   r   r   Fc                 K   s   t | jjj|dttdS )zList configuration.)rB   )Z	keyfilterZunknown_type_filter)r   r@   conftable_wanted_config_keyr   )r   rB   r)   r    r    r#   rC   T   s    rC   c                 C   s   t | to| d S )N__)
isinstancestr
startswith)keyr    r    r#   rE   `   s    rE   idsz[id1 [id2 [... [idN]]]])r   r   c                 K   s   dd t t|D S )z!Query for task information by id.c                 S   s    i | ]}|j t|| fqS r    )id_state_of_taskinfo).0reqr    r    r#   
<dictcomp>l   s    zquery_task.<locals>.<dictcomp>)_find_requests_by_idr	   )r   rK   r)   r    r    r#   
query_taskf   s    
rS   c              	   c   s2   | D ](}z||V  W q t k
r*   Y qX qd S r&   )KeyError)rK   get_requesttask_idr    r    r#   rR   r   s
    rR   c                 C   s   || rdS || rdS dS )Nactivereservedreadyr    )requestZ	is_activeis_reservedr    r    r#   rM   {   s
    rM   rV   c                 K   sN   t t|pg d }}t| |||f|}t|tr>d|kr>|S td| dS )zRevoke task by task id (or list of ids).

    Keyword Arguments:
        terminate (bool): Also terminate the process if the task is active.
        signal (str): Name of signal to use for terminate (e.g., ``KILL``).
    Nr   ztasks z flagged as revoked)setr	   _revokerG   dictr   )r   rV   	terminatesignalr)   task_idsr    r    r#   revoke   s
    rb   headersz/[key1=value1 [key2=value2 [... [keyN=valueN]]]]c                 K   s.  t |p
t}t|tr&dd |D }| D ]2\}}ttj	|pFg tt| }|tj|< q.|svt
d| dS ttj}	tt}
|	D ]z}t|dr|jr| D ]\\}}||jkrt|}t|j| }t|t|@ }|r|
| | |j| jj|d qq|
st
d| dS t
d|
 dS )	a  Revoke task by header (or list of headers).

    Keyword Arguments:
        headers(dictionary): Dictionary that contains stamping scheme name as keys and stamps as values.
                             If headers is a list, it will be converted to a dictionary.
        terminate (bool): Also terminate the process if the task is active.
        signal (str): Name of signal to use for terminate (e.g., ``KILL``).
    Sample headers input:
        {'mtask_id': [id1, id2, id3]}
    c                 S   s&   i | ]}| d d | d d qS )=r   r   )r2   )rO   hr    r    r#   rQ      s      z-revoke_by_stamped_headers.<locals>.<dictcomp>zheaders z' flagged as revoked, but not terminatedstampsr`   z were not terminatedz revoked)_signalssignumr   rG   listitemsr	   worker_stateZrevoked_stampsgetr   active_requestsr   r\   hasattrrf   updater_   consumerpool)r   rc   r_   r`   r)   ri   headerrf   Zupdated_stampsrn   Z#terminated_scheme_to_stamps_mappingrP   Zexpected_header_keyZexpected_header_valueZactual_headerZmatching_stamps_for_requestr    r    r#   revoke_by_stamped_headers   s.    
 

rt   c           
      K   s   t |}t }tj| |rt|p(t}t|D ]L}|j	|kr4|
|j	 td|j	| |j| jj|d t ||kr4 qq4|stdS tdd|S d|}	td|	 |S )NzTerminating %s (%s)rg   zterminate: tasks unknownzterminate: {}z, zTasks flagged as revoked: %s)lenr\   rl   revokedrp   rh   ri   r   rR   rL   addloggerrN   r_   rq   rr   r   formatjoin)
r   ra   r_   r`   r)   sizeZ
terminatedri   rZ   Zidstrr    r    r#   r]      s$    

r]   r`   z <signal> [id1 [id2 [... [idN]]]])r   r   r   c                 K   s   t | |d|dS )z+Terminate task by task id (or list of ids).T)r_   r`   )rb   )r   r`   rV   r)   r    r    r#   r_      s    r_   	task_namer   z0<task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>)r   r   c              
   K   s   zt | W n4 tk
r@ } ztd| W Y S d}~X Y nX z|| jj| _W n, tk
r   tjd|dd td Y S X | j	
  |std| tdS td	|| td
S )zTell worker(s) to modify the rate limit for a task by type.

    See Also:
        :attr:`celery.app.task.Task.rate_limit`.

    Arguments:
        task_name (str): Type of task to set rate limit for.
        rate_limit (int, str): New rate limit.
    zInvalid rate limit string: Nz&Rate limit attempt for unknown task %sTexc_infounknown taskz)Rate limits disabled for tasks of type %sz rate limit disabled successfullyz(New rate limit for tasks of type %s: %s.znew rate limit set successfully)r   
ValueErrorr%   r@   tasksr   rT   rx   r$   rq   Zreset_rate_limitsrN   r   )r   r|   r   r)   excr    r    r#   r      s*    $ 
 softhardz#<task_name> <soft_secs> [hard_secs]c                 K   sb   z| j j| }W n, tk
r<   tjd|dd td Y S X ||_||_td||| t	dS )zTell worker(s) to modify the time limit for task by type.

    Arguments:
        task_name (str): Name of task to change.
        hard (float): Hard time limit.
        soft (float): Soft time limit.
    z-Change time limit attempt for unknown task %sTr}   r   z5New time limits for tasks of type %s: soft=%s hard=%sztime limits set successfully)
r@   r   rT   rx   r$   r%   Zsoft_time_limit
time_limitrN   r   )r   r|   r   r   r)   taskr    r    r#   r     s        r   c                 K   s   d| j jjiS )z Get current logical clock value.clock)r@   r   r"   r   r)   r    r    r#   r   =  s    r   c                 K   s   | j jr| j j||| dS )zHold election.

    Arguments:
        id (str): Unique election id.
        topic (str): Election topic.
        action (str): Action to take for elected actor.
    N)rq   Zgossipelection)r   rL   Ztopicactionr)   r    r    r#   r   C  s    	r   c                 C   s>   | j j}|jr6d|jkr6|jd td tdS tdS )z+Tell worker(s) to send task-related events.r   z)Events of group {task} enabled by remote.ztask events enabledztask events already enabled)rq   event_dispatchergroupsrw   rx   rN   r   r   
dispatcherr    r    r#   enable_eventsP  s    
r   c                 C   s8   | j j}d|jkr0|jd td tdS tdS )z3Tell worker(s) to stop sending task-related events.r   z*Events of group {task} disabled by remote.ztask events disabledztask events already disabled)rq   r   r   discardrx   rN   r   r   r    r    r#   disable_events[  s    

r   c                 C   s,   t d | jj}|jddditj dS )z3Tell worker(s) to send event heartbeat immediately.zHeartbeat requested by remote.worker-heartbeatfreq   N)r   )rx   debugrq   r   sendrl   ZSOFTWARE_INFOr   r    r    r#   	heartbeatf  s    
r   )r   c                 K   sJ   || j krFtd| |r&tj| tj  tjj| jj	
 dS dS )zRequest mingle sync-data.zsync with %s)rv   r   N)hostnamerx   rN   rl   rv   rp   purge_datar@   r   Zforward)r   Z	from_noderv   r)   r    r    r#   hellop  s    


r   g?)r   c                 K   s   t dS )zPing worker(s).Zpong)r   r   r    r    r#   ping  s    r   c                 K   s   | j j S )z&Request worker statistics/information.)rq   
controllerstatsr   r    r    r#   r     s    r   Zdump_schedule)r   c                 K   s   t t| jjS )z0List of currently scheduled ETA/countdown tasks.)rj   _iter_schedule_requestsrq   timerr   r    r    r#   	scheduled  s    r   c              
   c   sn   | j jD ]`}z|jjd }W n ttfk
r8   Y qY qX t|tr|jrT|j	 nd |j
| dV  qd S )Nr   )etapriorityrZ   )schedulequeueentryr   
IndexError	TypeErrorrG   r   r   	isoformatr   rN   )r   ZwaitingZarg0r    r    r#   r     s    

r   Zdump_reservedc                 K   s.   |  tj|  tj }|s g S dd |D S )zAList of currently reserved tasks, not including scheduled/active.c                 S   s   g | ]}|  qS r    rN   rO   rZ   r    r    r#   
<listcomp>  s     zreserved.<locals>.<listcomp>)tsetrl   reserved_requestsrn   )r   r)   Zreserved_tasksr    r    r#   rX     s    

rX   Zdump_activec                    s    fdd|  tjD S )z'List of tasks currently being executed.c                    s   g | ]}|j  d qS )safer   r   r   r    r#   r     s   zactive.<locals>.<listcomp>)r   rl   rn   )r   r   r)   r    r   r#   rW     s    

rW   Zdump_revokedc                 K   s
   t tjS )zList of revoked task-ids.)rj   rl   rv   r   r    r    r#   rv     s    rv   Z
dump_taskstaskinfoitemsz[attr1 [attr2 [... [attrN]]]])r   r   r   c                    sJ   | j jpt|rndd D }fdd  fddt|D S )zList of registered tasks.

    Arguments:
        taskinfoitems (Sequence[str]): List of task attributes to include.
            Defaults to ``exchange,routing_key,rate_limit``.
        builtins (bool): Also include built-in tasks.
    c                 s   s   | ]}| d s|V  qdS )zcelery.N)rI   rO   r   r    r    r#   	<genexpr>  s    
 zregistered.<locals>.<genexpr>c                    sB    fddD }|r<dd |  D }d jd|S  jS )Nc                    s.   i | ]&}t  |d d k	r|tt  |d qS r&   )getattrrH   )rO   fieldr   r    r#   rQ     s    z5registered.<locals>._extract_info.<locals>.<dictcomp>c                 S   s   g | ]}d  |qS )rd   )rz   )rO   fr    r    r#   r     s     z5registered.<locals>._extract_info.<locals>.<listcomp>z{} [{}] )rk   ry   r6   rz   )r   fieldsrN   )r   r   r#   _extract_info  s    
z!registered.<locals>._extract_infoc                    s   g | ]} | qS r    r    r   )r   regr    r#   r     s     zregistered.<locals>.<listcomp>)r@   r   DEFAULT_TASK_INFO_ITEMSsorted)r   r   builtinsr)   r   r    )r   r   r   r#   
registered  s    
r   g      N@num	max_depthz.[object_type=Request] [num=200 [max_depth=10]])r   r   r      
   r   c              
      s   zddl }W n tk
r(   tdY nX td| tjddddF}||d|  |j | fd	d
|jd d|jiW  5 Q R  S Q R X dS )a  Create graph of uncollected objects (memory-leak debugging).

    Arguments:
        num (int): Max number of objects to graph.
        max_depth (int): Traverse at most n levels deep.
        type (str): Name of object to graph.  Default is ``"Request"``.
    r   NzRequires the objgraph libraryzDumping graph for type %rZcobjgz.pngF)prefixsuffixdeletec                    s   |  kS r&   r    )vZobjectsr    r#   <lambda>      zobjgraph.<locals>.<lambda>)r   Z	highlightfilenamer   )	objgraphImportErrorrx   rN   tempfileNamedTemporaryFileZby_typeZshow_backrefsr6   )r   r   r   r   Z	_objgraphfhr    r   r#   r     s$      
r   c                 K   s   ddl m} | S )z Sample current RSS memory usage.r   )
sample_mem)Zcelery.utils.debugr   )r   r)   r   r    r    r#   	memsample  s    r   samplesz[n_samples=10]c                 K   s(   ddl m} t }|j|d | S )z/Dump statistics of previous memsample requests.r   )r   )file)Zcelery.utilsr   ioStringIOmemdumpgetvalue)r   r   r)   r   outr    r    r#   r     s    r   nz[N=1]c                 K   s4   | j jjrtdS | j j| | j | tdS )z!Grow pool by n processes/threads.zJpool_grow is not supported with autoscale. Adjust autoscale range instead.zpool will grow)rq   r   
autoscalerr%   rr   Zgrow_update_prefetch_countr   r   r   r)   r    r    r#   	pool_grow  s
    
r   c                 K   s6   | j jjrtdS | j j| | j |  tdS )z#Shrink pool by n processes/threads.zLpool_shrink is not supported with autoscale. Adjust autoscale range instead.zpool will shrink)rq   r   r   r%   rr   shrinkr   r   r   r    r    r#   pool_shrink  s
    
r   c                 K   s2   | j jjr&| jjj|||d tdS tddS )zRestart execution pool.)reloaderzreload startedzPool restarts not enabledN)r@   rC   Zworker_pool_restartsrq   r   reloadr   r   )r   modulesr   r   r)   r    r    r#   pool_restart,  s    
r   maxminz[max [min]]c                 C   s>   | j jj}|r2|||\}}td| d| S tddS )zModify autoscale settings.zautoscale now max=z min=zAutoscale not enabledN)rq   r   r   rp   r   r   )r   r   r   r   Zmax_Zmin_r    r    r#   	autoscale6  s
    
r   Got shutdown from remotec                 K   s   t | t|dS )zShutdown worker(s).N)rx   warningr   )r   msgr)   r    r    r#   shutdownC  s    
r   r   r   exchange_typer   z'<queue> [exchange [type [routing_key]]]c                 K   s.   | j j| j j|||pd|f| td| S )z2Tell worker(s) to consume from task queue by name.directzadd consumer )rq   	call_soonZadd_task_queuer   )r   r   r   r   r   optionsr    r    r#   add_consumerL  s       r   z<queue>c                 K   s    | j | j j| td| S )z9Tell worker(s) to stop consuming from task queue by name.zno longer consuming from )rq   r   Zcancel_task_queuer   )r   r   _r    r    r#   cancel_consumer^  s
     r   c                 C   s    | j jrdd | j jjD S g S )z:List the task queues a worker is currently consuming from.c                 S   s   g | ]}t |jd dqS )T)Zrecurse)r^   Zas_dict)rO   r   r    r    r#   r   n  s   z!active_queues.<locals>.<listcomp>)rq   Ztask_consumerZqueuesr   r    r    r#   active_queuesj  s
    r   )F)FN)FN)FN)NNN)N)N)F)NF)r   r   r   )r   )r   )r   )NFN)NN)r   )NNN)Ur0   r   r   collectionsr   r   r   Zbilliard.commonr   Zkombu.utils.encodingr   Zcelery.exceptionsr   Zcelery.platformsr   rh   Zcelery.utils.functionalr	   Zcelery.utils.logr
   Zcelery.utils.serializationr   r   Zcelery.utils.timer   r-   r   rl   rZ   r   __all__r   r/   rx   r   r   r%   r   r=   r?   rA   rC   rE   rS   requests__getitem__rR   rn   __contains__r   rM   rb   rt   r]   rH   r_   r   floatr   r   r   r   r   r   r   r   r   r   r   rX   rW   rv   r   intr   r   r   r   r   r   r   r   r   r   r   r    r    r    r#   <module>   s<        

	



6


$





	








			  	
