U
    .e                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ dZeeZdd Zdd ZejejeejeefddZ dS )z'Task execution strategy (optimization).    N)to_timestamp)signals)trace)InvalidTaskError)symbol_by_name)
get_logger)saferepr)timezone   )create_request_cls)task_reserved)defaultc                 C   s(  z$| dd| di  }}|j W n6 tk
r@   tdY n tk
rZ   tdY nX | d| d| d| d	| d
| d| d| d| d| d| dd| dd| d| d| dd}|| jpi  | d| d| ddd}|||f|d| ddfS )zECreate a fresh protocol 2 message from a hybrid protocol 1/2 message.args kwargs!Message does not have args/kwargs(Task keyword arguments must be a mappinglangtaskidroot_id	parent_idgroupmethshadowetaexpiresretriesr   	timelimit)NNargsrepr
kwargsreprorigin)r   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   	callbackserrbackschordNr"   r#   r$   chainTutc)getitemsKeyErrorr   AttributeErrorupdateheaders)messagebodyr   r   r-   embedr   r   :/tmp/pip-unpacked-wheel-f4liivr4/celery/worker/strategy.pyhybrid_to_proto2   s@    



r2   c                 C   s   z$| dd| di  }}|j W n6 tk
r@   tdY n tk
rZ   tdY nX |jt|t|| jd z|d |d< W n tk
r   Y nX | d	| d
| ddd}|||f|d| ddfS )zConvert Task message protocol 1 arguments to protocol 2.

    Returns:
        Tuple: of ``(body, headers, already_decoded_status, utc)``
    r   r   r   r   r   )r   r    r-   Ztasksetr   r"   r#   r$   Nr%   Tr'   )r(   r)   r*   r   r+   r,   r   r-   )r.   r/   r   r   r0   r   r   r1   proto1_to_proto2B   s0    

r3   c	                    s   j jttjjo&j}	o0j|	o:j	j
jjj jj	j
jjtj}
t|
jd jjjtf 	
fdd	S )zDefault task execution strategy.

    Note:
        Strategies are here as an optimization, so sadly
        it's not very easy to override.
    )appc                    s`  |d kr0d| j kr0| j| jd f\}}}}n2d| j krPt| | j \}}}}n| |\}}}}| ||	||||d r j j j j j	d}	t
j|	d|	id  jsĈ jkrЈ  rd S tjj d r4d j j j j j j jd	d
 j	o j	  jo. j d
 d }
d } j	rz* jr^| j	}n| j	j}W nP ttfk
r } z,d j	| jdddd  jdd W 5 d }~X Y nX rЈ
j}
|r|
rj  | |
dfddS |r"j  | fdd S |
r4 |
dS   |rT fdd|D    d S )Nr   F)Zon_ackZ	on_rejectr4   hostnameeventerr   connection_errorsr/   r-   decodedr'   )r   namer   r   r   data)extra)Zsenderrequestztask-receivedr   r   )	uuidr9   r   r   r   r   r   r   r   z2Couldn't convert ETA %r to timestamp: %r. Task: %rT)safe)exc_info)Zrequeuer
      )priorityc                    s   g | ]}| qS r   r   ).0callbackreqr   r1   
<listcomp>   s     z9default.<locals>.task_message_handler.<locals>.<listcomp>)payloadr/   r-   Zuses_utc_timezoner2   r   r9   r   r    r   
_app_traceZLOG_RECEIVEDr   revokedr   Ztask_receivedsendr   r   Zrequest_dictr(   	isoformatr'   r	   OverflowError
ValueErrorinforejectZqosZincrement_eventually)r.   r/   ZackrO   r"   r   r-   r8   r'   contextZbucketr   excZReqZ
_does_infor4   apply_eta_taskcall_atr7   consumererrorr6   Z
get_buckethandler5   rN   Zlimit_post_etaZ
limit_taskr3   Zrate_limits_enabledZrevoked_tasksZ
send_eventr   task_message_handlerr   Ztask_sends_eventsto_system_tzrD   r1   rX      s       
           
  
 


z%default.<locals>.task_message_handler)r5   r7   loggerisEnabledForloggingINFOZevent_dispatcherenabledrJ   Zsend_eventsZtimerrT   rS   Zdisable_rate_limitsZtask_buckets__getitem__Zon_task_requestZ_limit_taskZ_limit_post_etar   Requestr   pool
controllerstaterI   r   )r   r4   rU   rN   rV   r   rY   bytesr3   eventsr`   r   rR   r1   r   c   s(    





<Lr   )!__doc__r\   Zkombu.asynchronous.timerr   Zceleryr   Z
celery.appr   rH   Zcelery.exceptionsr   Zcelery.utils.importsr   Zcelery.utils.logr   Zcelery.utils.safereprr   Zcelery.utils.timer	   r<   r   rc   r   __all____name__rZ   r2   r3   rN   rV   Z	to_systemrd   r   r   r   r   r1   <module>   s,   )"   