U
    .e!                     @   s   d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
mZmZmZmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ dZ eeddZG dd de!Z"G dd dZ#G dd de#Z$dS )zIntegration testing utilities.    N)defaultdict)partial)count)AnyCallableDictSequenceTextIOTuple)ContentDisallowedretry_over_timestates)TimeoutError)AsyncResult	ResultSet)truncate)humanize_secondsz4Still waiting for {0}.  Trying again {when}: {exc!r}T)microsecondsc                   @   s   e Zd ZdZdS )SentinelzSignifies the end of something.N)__name__
__module____qualname____doc__ r   r   B/tmp/pip-unpacked-wheel-f4liivr4/celery/contrib/testing/manager.pyr      s   r   c                	   @   s   e Zd ZdZd=ddZd>dd	Zd
d Zd?ddZd@ddZdd Z	dAddZ
dBddZdCd d!ZdDd"d#ZdEd%d&ZdFd(d)ZdGd+d,ZdHd-d.Zed/d0 ZdId1d2Zd3d4 Zd5d6 ZdJd7d8Zd9d: Zd;d< ZdS )KManagerMixinz.Mixin that adds :class:`Manager` capabilities.      @FNc                 C   sF   |d krt jn|| _|d kr"t jn|| _| j j| _|| _|| _d S N)	sysstdoutstderrapp
connectionZrecoverable_connection_errors
connerrorsblock_timeoutno_join)selfr&   r'   r!   r"   r   r   r   _init_manager   s
    zManagerMixin._init_manager-c                 C   s   t | | | jd d S )N)file)printr!   )r(   ssepr   r   r   remark(   s    zManagerMixin.remarkc                 C   s   dd |D S )Nc                 S   s    g | ]}|j |jjkr|j qS r   )idbackend_cache).0resr   r   r   
<listcomp>.   s      z0ManagerMixin.missing_results.<locals>.<listcomp>r   )r(   rr   r   r   missing_results,   s    zManagerMixin.missing_resultsthingr   
   皙?      ?      @c              	      s@   |si n|} fdd}j ||f||||||	d|S )zWait for event to happen.

        The `catch` argument specifies the exception that means the event
        has not happened yet.
        c                    s>   t |}r*tj t|dd| d r:| || |S )Nin )whenexc)nextwarnE_STILL_WAITINGformatr   )r@   Z	intervalsretriesintervaldescemit_warningerrbackr(   r   r   on_errorG   s     
 z'ManagerMixin.wait_for.<locals>.on_error)argskwargsrJ   max_retriesinterval_startinterval_stepr   )r(   funcatchrH   rL   rM   rJ   rN   rO   rP   interval_maxrI   optionsrK   r   rG   r   wait_for0   s    
    zManagerMixin.wait_for   {Gz?      ?c	           
   
   K   sD   z| j ||||||||dW S  |k
r0   Y nX td| dS )z;Make sure something does not happen (at least for a while).)rH   rN   rO   rP   rS   rI   zShould not have happened: N)rU   AssertionError)
r(   rQ   rR   rH   rN   rO   rP   rS   rI   rT   r   r   r   ensure_not_for_a_whileY   s         z#ManagerMixin.ensure_not_for_a_whilec                 O   s
   t ||S r   r   )r(   rL   rM   r   r   r   r   j   s    zManagerMixin.retry_over_timec           	         s  | j r
d S t|ts"| j|g}g   fdd}|r>t|ntdD ]}g  d d < z|jf ||d|W   S  tjt	fk
r } z@| 
|}| dt|t  t|td||d W 5 d }~X Y qF | jk
r } z| d|d W 5 d }~X Y qFX qFtd	d S )
Nc                    s     |  d S r   )append)task_idvalueZreceivedr   r   	on_resultt   s    z$ManagerMixin.join.<locals>.on_resultr   )callback	propagatez#Still waiting for {}/{}: [{}]: {!r}z, !zjoin: connection lost: z!Test failed: Missing task results)r'   
isinstancer   r#   ranger   getsockettimeoutr   r7   r/   rD   lenr   joinr%   rY   )	r(   r6   ra   rN   rM   r_   ir@   Zwaiting_forr   r^   r   ri   m   s0    

  &zManagerMixin.join      @c                 C   s   | j jj|dS Nrg   )r#   controlinspect)r(   rg   r   r   r   ro      s    zManagerMixin.inspectc                 c   s&   |  |j| pi }| E d H  d S r   )ro   Z
query_taskitems)r(   idsrg   Ztasksr   r   r   query_tasks   s    zManagerMixin.query_tasksc           	      C   sH   t t}| j||dD ],\}}| D ]\}\}}|| | q&q|S rl   )r   setrr   rp   add)	r(   rq   rg   r   hostnameZreplyr\   state_r   r   r   query_task_states   s
    zManagerMixin.query_task_states waiting for tasks to be acceptedc                 K   s   | j | j|f||d|S N)rF   rH   )assert_task_worker_stateis_acceptedr(   rq   rF   rH   policyr   r   r   assert_accepted   s      zManagerMixin.assert_accepted waiting for tasks to be receivedc                 K   s   | j | j|f||d|S rz   )r{   is_receivedr}   r   r   r   assert_received   s      zManagerMixin.assert_received,waiting for tasks to be started or completedc                 K   s   | j | j|f||d|S rz   )assert_task_state_from_resultis_result_task_in_progress)r(   Zasync_resultsrF   rH   r~   r   r   r   ,assert_result_tasks_in_progress_or_completed   s     z9ManagerMixin.assert_result_tasks_in_progress_or_completedc                 K   s    | j t| j|||dtff|S rl   rU   r   true_or_raiser   )r(   rQ   resultsrF   r~   r   r   r   r      s    z*ManagerMixin.assert_task_state_from_resultc                    s"   t jt jf t fdd| D S )Nc                 3   s   | ]}|j  kV  qd S r   )rv   )r3   resultZpossible_statesr   r   	<genexpr>   s     z:ManagerMixin.is_result_task_in_progress.<locals>.<genexpr>)r   ZSTARTEDSUCCESSall)r   rM   r   r   r   r      s    z'ManagerMixin.is_result_task_in_progressc                 K   s    | j t| j|||dtff|S rl   r   )r(   rQ   rq   rF   r~   r   r   r   r{      s    z%ManagerMixin.assert_task_worker_statec                 K   s   | j dddg|f|S )Nreservedactiveready_ids_matches_stater(   rq   rM   r   r   r   r      s     zManagerMixin.is_receivedc                 K   s   | j ddg|f|S )Nr   r   r   r   r   r   r   r|      s    zManagerMixin.is_acceptedc                    s&   | j ||dt fdd|D S )Nrm   c                 3   s2   | ]* t  fd dfddD D V  qdS )c                 3   s   | ]} |kV  qd S r   r   )r3   r-   tr   r   r      s     z<ManagerMixin._ids_matches_state.<locals>.<genexpr>.<genexpr>c                    s   g | ]} | qS r   r   )r3   kr   r   r   r5      s     z=ManagerMixin._ids_matches_state.<locals>.<genexpr>.<listcomp>N)any)r3   expected_statesr   r   r   r      s   z2ManagerMixin._ids_matches_state.<locals>.<genexpr>)rx   r   )r(   r   rq   rg   r   r   r   r      s    zManagerMixin._ids_matches_statec                 O   s   |||}|st  |S r   )r   )r(   rQ   rL   rM   r4   r   r   r   r      s    
zManagerMixin.true_or_raisec              	   C   s   | j j}| j  p}|j|d}|dkrq,q| }||_ztdd |  D }W n tk
rp   Y q~Y nX |dkr:q~q:W 5 Q R X d S )N)r$   r   c                 s   s   | ]}t |V  qd S r   )rh   )r3   r   r   r   r   r      s     z/ManagerMixin.wait_until_idle.<locals>.<genexpr>)	r#   rn   r$   purgero   sumr   valuesr   )r(   rn   r$   r   ro   r   r   r   wait_until_idle   s    
zManagerMixin.wait_until_idle)r   FNN)r*   )	r8   r   NNr9   r:   r;   r<   F)r8   rV   r:   rW   rX   F)Fr9   )rk   )r;   )r;   )r;   ry   )r;   r   )r;   r   )r;   )r;   )r;   )r   r   r   r   r)   r/   r7   rU   rZ   r   ri   ro   rr   rx   r   r   r   r   staticmethodr   r{   r   r|   r   r   r   r   r   r   r   r      sb         


         
*         




  
  
	  
 



r   c                   @   s   e Zd ZdZdd ZdS )Managerz(Test helpers for task integration tests.c                 K   s   || _ | jf | d S r   )r#   r)   )r(   r#   rM   r   r   r   __init__   s    zManager.__init__N)r   r   r   r   r   r   r   r   r   r      s   r   )%r   rf   r    collectionsr   	functoolsr   	itertoolsr   typingr   r   r   r   r	   r
   Zkombu.exceptionsr   Zkombu.utils.functionalr   Zceleryr   Zcelery.exceptionsr   Zcelery.resultr   r   Zcelery.utils.textr   Zcelery.utils.timer   Z_humanize_secondsrC   	Exceptionr   r   r   r   r   r   r   <module>   s&     P