U
    .e]j                     @   s  d Z ddlZddlZddlmZ ddlmZmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZmZmZ ddlmZ ddlmZmZmZm Z m!Z!m"Z"m#Z#m$Z$ ddl%mZ& ddl'm(Z(m)Z)m*Z* ddl+m,Z, ddl-m.Z. ddl/m0Z0 ddl1m2Z2m3Z3m4Z4 ddl5m6Z6 dZ7e8edZ9e,e:Z;e;j<e;j=e;j>e;j?f\Z<Z=Z@Z?daAdaBdd ZCeC  e4jDZDejEjFZGejHjFZIe6jJZJe6jKZKe6jLZMe6jNZNG dd dZOeeMeKdefddZPdS ) zfTask request.

This module defines the :class:`Request` class, that specifies
how tasks are executed.
    N)datetime)	monotonictime)ref)TERM_SIGNAME)ExceptionWithTraceback)	safe_reprsafe_str)cached_property)current_appsignals)Context)fast_trace_task
trace_tasktrace_task_ret)BasePool)IgnoreInvalidTaskErrorRejectRetryTaskRevokedError
TerminatedTimeLimitExceededWorkerLostError)r   )maybe
maybe_listnoop)
get_logger)gethostname)get_pickled_exception)maybe_iso8601maybe_make_awaretimezone   )state)Requestpypy_version_infoFc                   C   s   t tjat tjad S N)loggerisEnabledForloggingDEBUG_does_debugINFO
_does_info r/   r/   9/tmp/pip-unpacked-wheel-f4liivr4/celery/worker/request.py__optimize__.   s    r1   c                   @   sF  e Zd ZdZdZdZdZdZdZdZ	dZ
dZdZes8dZeddddddeddddeefddZed	d
 Zedd Zedd Zedd Zedd Zedd Zedd Zedd Zedd Zedd Zedd Zedd  Zed!d" Z ed#d$ Z!ed%d& Z"ed'd( Z#ed)d* Z$e$j%d+d* Z$ed,d- Z&ed.d/ Z'ed0d1 Z(e(j%d2d1 Z(ed3d4 Z)ed5d6 Z*ed7d8 Z+ed9d: Z,e,j%d;d: Z,ed<d= Z-ed>d? Z.ed@dA Z/e/j%dBdA Z/edCdD Z0e0j%dEdD Z0edFdG Z1edHdI Z2edJdK Z3ee4dLdMdNZ5ee6dLdOdPZ7edQdR Z8e9dSdTdUZ:ddVdWZ;dXdY Z<ddZd[Z=dd\d]Z>d^d_ Z?d`da Z@dbdc ZAddde ZBdfdg ZCdhdi ZDdjdk ZEdldm ZFddndoZGdpdq ZHddrdsZIddtduZJdvdw ZKdxdy ZLdzd{ ZMeNd|d} ZOeNd~d ZPeNdd ZQeNdd ZReNdd ZSeNdd ZTdS )r%   zA request for task execution.FN)NN)_app_typenameid_root_id
_parent_id_on_ack_body	_hostname_eventer_connection_errors_task_eta_expires_request_dict
_on_reject_utc_content_type_content_encoding	_argsrepr_kwargsrepr_args_kwargs_decodedZ	__payload__weakref____dict__Tc              
   K   s  || _ |d kr|j n| | _|
d kr0|jn|
| _|| _|| _|| _|rZd  | _	| _
n|j|j | _	| _
| jrx| jn|j| _| jd | _| jd  | _| _d| jkr| jd p| j| _| jd| _| jd| _| jdd }|r|| _| jdd| _| jd	d| _|| _|	| _|p$t | _|| _|p6d
| _|pL| jj| j | _| jdd| _ | jd}|d k	rz||}W n> t!t"t#fk
r } zt$d|d| W 5 d }~X Y nX ||| j%| _&nd | _&| jd}|d k	rJz||}W n> t!t"t#fk
r8 } zt$d|d| W 5 d }~X Y nX ||| j%| _'nd | _'|j(pZi }|j)pfi }|d|d|d|ddd| _*| j+||d|d| j| j*d | j\| jd< | jd< }| jd | _,| jd | _-d S )Nr5   taskZshadowroot_id	parent_id	timelimitargsrepr 
kwargsreprr/   ignore_resultFetazinvalid ETA value z: expireszinvalid expires value exchangerouting_keypriorityredelivered)rV   rW   rX   rY   reply_tocorrelation_id)
propertiesrZ   r[   hostnamedelivery_infoargskwargs)._messageheaderscopyr@   bodyr9   r2   rB   rI   rC   rD   content_typecontent_encodingpayload_Request__payloadr5   r3   r4   getr6   r7   time_limitsrE   rF   r8   rA   r   r:   r;   r<   Ztasksr=   _ignore_resultAttributeError
ValueError	TypeErrorr   tzlocalr>   r?   r^   r\   _delivery_infoupdaterG   rH   )selfmessageon_ackr]   eventerappconnection_errorsrequest_dictrL   	on_rejectrd   rb   decodedutcr!   r    optsrO   rT   excrU   r^   r\   _r/   r/   r0   __init__Z   s     




zRequest.__init__c                 C   s   | j S r'   )rp   rr   r/   r/   r0   r^      s    zRequest.delivery_infoc                 C   s   | j S r'   )ra   r   r/   r/   r0   rs      s    zRequest.messagec                 C   s   | j S r'   r@   r   r/   r/   r0   rx      s    zRequest.request_dictc                 C   s   | j S r'   )r9   r   r/   r/   r0   rd      s    zRequest.bodyc                 C   s   | j S r'   )r2   r   r/   r/   r0   rv      s    zRequest.appc                 C   s   | j S r'   )rB   r   r/   r/   r0   r{      s    zRequest.utcc                 C   s   | j S r'   )rC   r   r/   r/   r0   re      s    zRequest.content_typec                 C   s   | j S r'   )rD   r   r/   r/   r0   rf      s    zRequest.content_encodingc                 C   s   | j S r'   )r3   r   r/   r/   r0   type   s    zRequest.typec                 C   s   | j S r'   )r6   r   r/   r/   r0   rM      s    zRequest.root_idc                 C   s   | j S r'   )r7   r   r/   r/   r0   rN      s    zRequest.parent_idc                 C   s   | j S r'   )rE   r   r/   r/   r0   rP      s    zRequest.argsreprc                 C   s   | j S r'   )rG   r   r/   r/   r0   r_      s    zRequest.argsc                 C   s   | j S r'   )rH   r   r/   r/   r0   r`      s    zRequest.kwargsc                 C   s   | j S r'   )rF   r   r/   r/   r0   rR      s    zRequest.kwargsreprc                 C   s   | j S r'   )r8   r   r/   r/   r0   rt      s    zRequest.on_ackc                 C   s   | j S r'   rA   r   r/   r/   r0   ry      s    zRequest.on_rejectc                 C   s
   || _ d S r'   r   rr   valuer/   r/   r0   ry      s    c                 C   s   | j S r'   )r:   r   r/   r/   r0   r]      s    zRequest.hostnamec                 C   s   | j S r'   )rk   r   r/   r/   r0   rS      s    zRequest.ignore_resultc                 C   s   | j S r'   r;   r   r/   r/   r0   ru      s    zRequest.eventerc                 C   s
   || _ d S r'   r   )rr   ru   r/   r/   r0   ru     s    c                 C   s   | j S r'   )r<   r   r/   r/   r0   rw     s    zRequest.connection_errorsc                 C   s   | j S r'   )r=   r   r/   r/   r0   rL   
  s    zRequest.taskc                 C   s   | j S r'   )r>   r   r/   r/   r0   rT     s    zRequest.etac                 C   s   | j S r'   r?   r   r/   r/   r0   rU     s    zRequest.expiresc                 C   s
   || _ d S r'   r   r   r/   r/   r0   rU     s    c                 C   s   | j d kr| jjj| _ | j S r'   )_tzlocalr2   confr"   r   r/   r/   r0   ro     s    
zRequest.tzlocalc                 C   s   | j j p| j jS r'   )rL   rS   Zstore_errors_even_if_ignoredr   r/   r/   r0   store_errors   s    
zRequest.store_errorsc                 C   s   | j S r'   r5   r   r/   r/   r0   task_id%  s    zRequest.task_idc                 C   s
   || _ d S r'   r   r   r/   r/   r0   r   *  s    c                 C   s   | j S r'   r4   r   r/   r/   r0   	task_name.  s    zRequest.task_namec                 C   s
   || _ d S r'   r   r   r/   r/   r0   r   3  s    c                 C   s
   | j d S )NrZ   r   r   r/   r/   r0   rZ   7  s    zRequest.reply_toc                 C   s   | j ddS )Nreplaced_task_nestingr   r@   ri   r   r/   r/   r0   r   <  s    zRequest.replaced_task_nestingc                 C   s   | j dg S )Ngroupsr   r   r/   r/   r0   r   @  s    zRequest.groups)returnc                 C   s   | j dpg S )Nstamped_headersr   r   r/   r/   r0   r   D  s    zRequest.stamped_headersc                    s$   | j dpi   fdd| jD S )Nstampsc                    s   i | ]}|  |qS r/   )ri   ).0headerr   r/   r0   
<dictcomp>K  s      z"Request.stamps.<locals>.<dictcomp>)r@   ri   r   r   r/   r   r0   r   H  s    zRequest.stampsc                 C   s
   | j d S )Nr[   r   r   r/   r/   r0   r[   M  s    zRequest.correlation_id)poolc           	      K   s   | j }| j}|  rt|| j\}}| jjr2tnt}|j	|| j
|| j| j| j| jf| j| j| j| j|pl|j|pt|j|d	}tt|| _|S )a  Used by the worker to send this task to the pool.

        Arguments:
            pool (~celery.concurrency.base.TaskPool): The execution pool
                used to execute this request.

        Raises:
            celery.exceptions.TaskRevokedError: if the task was revoked.
        r_   Zaccept_callbackZtimeout_callbackcallbackZerror_callbackZsoft_timeouttimeoutr[   )r5   r=   revokedr   rj   r2   use_fast_trace_taskr   r   apply_asyncr3   r@   r9   rC   rD   on_accepted
on_timeout
on_success
on_failuresoft_time_limit
time_limitr   r   _apply_result)	rr   r   r`   r   rL   r   r   traceresultr/   r/   r0   execute_using_poolR  s,    

 zRequest.execute_using_poolc              
   C   s   |   rdS | jjs|   | j\}}}| j}|j||ddf|pDi  t| j| j| j	| j
|| j| jj| jd\}}}}|r| jdd n|   |S )zExecute the task in a :func:`~celery.app.trace.trace_task`.

        Arguments:
            loglevel (int): The loglevel used by the task.
            logfile (str): The logfile used by the task.
        NF)loglevellogfileZis_eager)r]   loaderrv   requeue)r   rL   	acks_lateacknowledge_payloadr@   rq   r   r5   rG   rH   r:   r2   r   reject)rr   r   r   r~   embedrequestretvalIr/   r/   r0   executes  s,     zRequest.executec                 C   s2   | j r.t| j j}|| j kr.t| j dS dS )z%If expired, mark the task as revoked.TN)rU   r   nowtzinforevoked_tasksaddr5   )rr   r   r/   r/   r0   maybe_expire  s
    
zRequest.maybe_expirec                 C   sf   t |p
t}| jr4|| j| | dd|d n
||f| _| jd k	rb|  }|d k	rb|	| d S )N
terminatedTF)
_signalssignumr   
time_startterminate_job
worker_pid_announce_revoked_terminate_on_ackr   	terminaterr   r   signalobjr/   r/   r0   r     s    

zRequest.terminatec                 C   sR   t |p
t}| jr*|| j| |   | jd k	rN|  }|d k	rN|| d S r'   )	r   r   r   r   r   r   _announce_cancelledr   r   r   r/   r/   r0   cancel  s    
zRequest.cancelc                 C   sn   t |  | d d}t|d}| jjj| j|| jd | j|| j| j	| j
d  d| _t| j| jd d d S )Nztask-cancelledzcancelled by Celery)rs   )r   T)r   einfo)
task_ready
send_eventr   rL   backendZmark_as_retryr5   _contexton_retryr_   r`   _already_cancelled
send_retry)rr   reasonr}   r/   r/   r0   r     s    

zRequest._announce_cancelledc                 C   s^   t |  | jd|||d | jjj| j|| j| jd |   d| _	t
| j| j|||d d S )Nztask-revoked)r   r   expiredr   Zstore_resultT)r   r   r   r   )r   r   rL   r   Zmark_as_revokedr5   r   r   r   _already_revokedsend_revoked)rr   r   r   r   r   r/   r/   r0   r     s&        
  zRequest._announce_revokedc           
      C   s  d}| j rdS | jr|  }| jtk}d\}}|s| jr| jD ]~}|tkr>t| }| jjd | }t	|t
tfr|D ] }|t|krtd}||i} qqtn t|t|k||kg}||i} qq>t|||frd}	|r|	d| 7 }	t|	| j| j | |rdnddd	| dS dS )
z%If revoked, skip task and mark state.FT)FNr   zDiscarding revoked task: %s[%s]z (revoked by header: %s)r   r   N)r   rU   r   r5   r   r   revoked_stampsra   rb   
isinstancelisttupler   anyinfor4   r   )
rr   r   Zrevoked_by_idZrevoked_by_headerZrevoking_headerZstampZrevoked_headerZstamped_headerZstamped_valueZlog_msgr/   r/   r0   r     sJ    




   zRequest.revokedc                 K   s4   | j r0| j jr0| jjr0| j j|fd| ji| d S )Nuuid)r;   enabledrL   Zsend_eventssendr5   )rr   r   fieldsr/   r/   r0   r     s    zRequest.send_eventc                 C   sj   || _ t t |  | _t|  | jjs0|   | d t	rPt
d| j| j| | jdk	rf| j| j  dS )z4Handler called when task is accepted by worker pool.ztask-startedzTask accepted: %s[%s] pid:%rN)r   r   r   r   task_acceptedrL   r   r   r   r,   debugr4   r5   r   r   )rr   pidZtime_acceptedr/   r/   r0   r     s    

zRequest.on_acceptedc                 C   sr   |rt d|| j| j nVt|  td|| j| j t|}| jjj| j|| j	| j
d | jjrn| jjrn|   dS )z%Handler called if the task times out.z)Soft time limit (%ss) exceeded for %s[%s]z)Hard time limit (%ss) exceeded for %s[%s]r   N)warnr4   r5   r   errorr   rL   r   mark_as_failurer   r   r   acks_on_failure_or_timeoutr   )rr   Zsoftr   r}   r/   r/   r0   r   	  s*          zRequest.on_timeoutc                 K   st   |\}}}|rD|j }t|tr$|j}t|ttfr6|| j|ddS t| dd | jj	r`| 
  | jd||d dS )z6Handler called if the task was successfully processed.T	return_ok)Z
successfultask-succeededr   runtimeN)	exceptionr   r   r}   
SystemExitKeyboardInterruptr   r   rL   r   r   r   rr   Zfailed__retval__runtimer`   failedr   r   r}   r/   r/   r0   r     s    

zRequest.on_successc                 C   s2   | j jr|   | jdt|jjt|jd dS )z-Handler called if the task should be retried.ztask-retriedr   	tracebackN)	rL   r   r   r   r   r   r}   r	   r   )rr   exc_infor/   r/   r0   r   -  s    
zRequest.on_retryc           
   	   C   s  t |  |j}t|tr|j}t|t}|rP| jsL| jsL| ddt	|d dS t|t
rjt
d| n>t|tr| j|jdS t|tr|  S t|tr| |S d}t|t}| jjr
| jjo|}| jj}	|rd}| j|d d}n|	 r|   n| jdd |s\|s|s\| jjj| j|| j| jd tjj| j| j|| j| j |j!|d |r~| j"d	t#t$|j|j!d
 |st%d||j&d dS )z/Handler called if the task raised an exception.r   TFNzProcess got: r   r   )Zsenderr   r   r_   r`   r   r   ztask-failedr   zTask handler raised error: %r)r   )'r   r   r   r   r}   r   r   r   r   strMemoryErrorr   r   r   r   r   r   r   r   rL   r   Zreject_on_worker_lostr   r   r   r5   r   r   r   Ztask_failurer   r_   r`   r   r   r   r   r   r   )
rr   r   Zsend_failed_eventr   r}   Zis_terminatedr   Zis_worker_lostr   Zackr/   r/   r0   r   6  sx    


   







   zRequest.on_failurec                 C   s   | j s| t| j d| _ dS )zAcknowledge task.TN)acknowledgedr8   r(   r<   r   r/   r/   r0   r     s    zRequest.acknowledgec                 C   s.   | j s*| t| j| d| _ | jd|d d S )NTztask-rejectedr   )r   rA   r(   r<   r   )rr   r   r/   r/   r0   r     s    zRequest.rejectc                 C   sB   | j | j|s| jn| j|s | jn| j| j| j| j| j	| j
| jd
S )N)
r5   r4   r_   r`   r   r]   r   r   r^   r   )r5   r4   rG   rE   rH   rF   r3   r:   r   r   r^   r   )rr   safer/   r/   r0   r     s    zRequest.infoc                 C   s
   d | S )Nz{0.name}[{0.id}])formatr   r/   r/   r0   	humaninfo  s    zRequest.humaninfoc                 C   s@   d |  | jrd| j dnd| jr4d| j dndg S )z``str(self)``. z ETA:[]rQ   z
 expires:[)joinr   r>   r?   stripr   r/   r/   r0   __str__  s
    zRequest.__str__c                 C   s   d t| j|  | j| jS )z``repr(self)``.z<{}: {} {} {}>)r   r   __name__r   rE   rF   r   r/   r/   r0   __repr__  s      zRequest.__repr__c                 C   s   | j S r'   )rh   r   r/   r/   r0   r     s    zRequest._payloadc                 C   s   | j \}}}|dS )Nchordr   ri   rr   r~   r   r/   r/   r0   r    s    zRequest.chordc                 C   s   | j \}}}|dS )Nerrbacksr  r  r/   r/   r0   r    s    zRequest.errbacksc                 C   s   | j dS )Ngroupr   r   r/   r/   r0   r    s    zRequest.groupc                 C   s*   | j }| j\}}}|jf |pi  t|S )z9Context (:class:`~celery.app.task.Context`) of this task.)r@   r   rq   r   )rr   r   r~   r   r/   r/   r0   r     s    zRequest._contextc                 C   s   | j dS )Ngroup_indexr   r   r/   r/   r0   r    s    zRequest.group_index)NN)N)N)TF)F)F)Ur  
__module____qualname____doc__r   r   r   rj   r   r   r   r   r   IS_PYPY	__slots__r   r!   r    r   propertyr^   rs   rx   rd   rv   r{   re   rf   r   rM   rN   rP   r_   r`   rR   rt   ry   setterr]   rS   ru   rw   rL   rT   rU   ro   r   r   r   rZ   r   r   r   r   dictr   r[   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r
   r   r  r  r  r   r  r/   r/   r/   r0   r%   C   s  	       
T




































!
"

)	
P






	r%   c
              	      s^   |j |j|j|j |o |jd kr8|	jr4tntG  fddd| }
|
S )Nc                       s0   e Zd ZfddZ fddZdS )z#create_request_cls.<locals>.Requestc                    sp   | j }|  rt|| j\}} | j|| j| j| j| jf| j	| j
| j| j|pR|pX|d	}t|| _|S )Nr   )r   r   r   rj   r   rx   rd   re   rf   r   r   r   r   r   r   )rr   r   r`   r   r   r   r   )r   default_soft_time_limitdefault_time_limitr   r   r/   r0   r     s(    
 z6create_request_cls.<locals>.Request.execute_using_poolc                    sp   |\}}}|rD|j }t|tr$|j}t|ttfr6|| j|ddS |   rX|   rl| jd||d d S )NTr   r   r   )	r   r   r   r}   r   r   r   r   r   r   )r   eventsr   r/   r0   r     s"    

  z.create_request_cls.<locals>.Request.on_successN)r  r	  r
  r   r   r/   r   r   r  r  r  r   r   r   r/   r0   r%     s   r%   )r   r   r   r   r   r   r   r   )baserL   r   r]   ru   r   r   r   r   rv   r%   r/   r  r0   create_request_cls  s    
",r  )Qr  r*   sysr   r   r   weakrefr   Zbilliard.commonr   Zbilliard.einfor   Zkombu.utils.encodingr   r	   Zkombu.utils.objectsr
   Zceleryr   r   Zcelery.app.taskr   Zcelery.app.tracer   r   r   Zcelery.concurrency.baser   Zcelery.exceptionsr   r   r   r   r   r   r   r   Zcelery.platformsr   Zcelery.utils.functionalr   r   r   Zcelery.utils.logr   Zcelery.utils.nodenamesr   Zcelery.utils.serializationr   Zcelery.utils.timer    r!   r"   rQ   r$   __all__hasattrr  r  r(   r   r   warningr   r   r.   r,   r1   Ztz_or_localZtask_revokedr   r   Z
task_retryr   r   r   r   r   r   r%   r  r/   r/   r/   r0   <module>   sf   (
      !   