U
    .e                     @   s&  d Z ddlZddlZddlZddlmZ ddlmZmZm	Z	 ddl
ZddlmZmZ ddlmZmZ ddlmZ ddlmZ ejd	d
Zedi dZedddhdZeddhdZG dd dejZeddeddddfddZeddededdfddZeddedfddZddddZ dS ) z'Embedded workers for integration tests.    N)contextmanager)AnyIterableUnion)Celeryworker)_set_task_join_will_blockallow_join_result)Signal)anon_nodenameWORKER_LOGLEVELerrortest_worker_starting)nameZproviding_argstest_worker_startedr   consumertest_worker_stoppedc                       sT   e Zd ZdZdZ fddZG dd dejjZ fddZ	d	d
 Z
dd Z  ZS )TestWorkControllerz3Worker that can synchronize on being fully started.Nc                    s   t  | _t j|| | jjdd dkrddlm	} | | _
t | _zddlm} |  W n tk
rx   Y nX tj| j
t | _| j  d S )N.Zpreforkr   )Queue)pickling_support)	threadingEvent_on_startedsuper__init__pool_cls
__module__splitZbilliardr   logger_queueosgetpidpidZtblibr   installImportErrorlogginghandlersQueueListener	getLoggerZqueue_listenerstart)selfargskwargsr   r   	__class__ A/tmp/pip-unpacked-wheel-f4liivr4/celery/contrib/testing/worker.pyr   #   s    

zTestWorkController.__init__c                   @   s   e Zd Zdd Zdd ZdS )zTestWorkController.QueueHandlerc                 C   s
   d|_ |S )NT)
from_queuer+   recordr0   r0   r1   prepare:   s    z'TestWorkController.QueueHandler.preparec                 C   s   t jr d S )N)r&   raiseExceptionsr3   r0   r0   r1   handleError?   s    z+TestWorkController.QueueHandler.handleErrorN)__name__r   __qualname__r5   r7   r0   r0   r0   r1   QueueHandler9   s   r:   c                    s@    j r6  j }| fdd t }|| t  S )Nc                    s   | j  jkot| dd S )Nr2   F)processr#   getattr)rr+   r0   r1   <lambda>F       z*TestWorkController.start.<locals>.<lambda>)r    r:   	addFilterr&   r)   
addHandlerr   r*   )r+   handlerloggerr.   r>   r1   r*   C   s    
zTestWorkController.startc                 C   s    | j   tj| j| |d dS )z=Callback called when the Consumer blueprint is fully started.)senderr   r   N)r   setr   sendapp)r+   r   r0   r0   r1   on_consumer_readyK   s    
  z$TestWorkController.on_consumer_readyc                 C   s   | j   dS )zWait for worker to be fully up and running.

        Warning:
            Worker must be started within a thread for this to work,
            or it will block forever.
        N)r   waitr>   r0   r0   r1   ensure_startedR   s    z!TestWorkController.ensure_started)r8   r   r9   __doc__r    r   r&   r'   r:   r*   rI   rK   __classcell__r0   r0   r.   r1   r      s   
r      ZsoloTg      $@c              
   k   s   t j| d d}	znt| f||||||d|F}	|rlddlm}
 t  |
 j|ddksbt	W 5 Q R X |	V  W 5 Q R X W 5 tj| |	d X dS )	z[Start embedded worker.

    Yields:
        celery.app.worker.Worker: worker instance.
    )rE   N)rE   r   )concurrencypoolloglevellogfileperform_ping_checkshutdown_timeoutrN   )ping)timeoutZpong)
r   rG   r   _start_worker_threadtasksrU   r	   delaygetAssertionError)rH   rO   rP   rQ   rR   rS   Zping_task_timeoutrT   r-   r   rU   r0   r0   r1   start_worker]   s(    "r\   c                 k   s   t | || |rd| jkst| jtjdd}	|	jj W 5 Q R X |f | |t	 |||d|
ddddd
|}
tj|
jdd}|  |
  td	 z
|
V  W 5 d
dlm} d
|_|| | rtdd|_X dS )zaStart Celery worker in a thread.

    Yields:
        celery.worker.Worker: worker instance.
    zcelery.pingZTEST_BROKER)hostnameNwithout_heartbeatT)
rH   rO   r]   rP   rQ   rR   Zready_callbackr^   Zwithout_mingleZwithout_gossip)targetdaemonFr   )statezWorker thread failed to exit within the allocated timeout. Consider raising `shutdown_timeout` if your tasks take longer to execute.)setup_app_for_workerrX   r[   
connectionr!   environrZ   Zdefault_channelZqueue_declarer   popr   Threadr*   rK   r   Zcelery.workerra   Zshould_terminatejoinis_aliveRuntimeError)rH   rO   rP   rQ   rR   WorkControllerrS   rT   r-   connr   tra   r0   r0   r1   rW      sB    


rW   c           	      k   sH   ddl m}m} |   ||dg}|  z
dV  W 5 |  X dS )zfStart worker in separate process.

    Yields:
        celery.app.worker.Worker: worker instance.
    r   )ClusterNodeztestworker1@%hN)Zcelery.apps.multirm   rn   set_currentr*   Zstopwait)	rH   rO   rP   rQ   rR   r-   rm   rn   Zclusterr0   r0   r1   _start_worker_process   s    
rp   )returnc                 C   s8   |    |   |   dt| j_| jj||d dS )z9Setup the app to be used for starting an embedded worker.F)rQ   rR   N)finalizero   set_defaulttypelog_setupsetup)rH   rQ   rR   r0   r0   r1   rb      s
    rb   )!rL   r&   r!   r   
contextlibr   typingr   r   r   Zcelery.worker.consumerZceleryr   r   Zcelery.resultr   r	   Zcelery.utils.dispatchr
   Zcelery.utils.nodenamesr   rd   rZ   r   r   r   r   rj   r   r\   rW   rp   rb   r0   r0   r0   r1   <module>   sb   ?'8