U
    .e3                     @   s  d Z ddlZddlZddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ ddlmZmZmZ dd	lmZ dd
lmZ ddlmZmZmZ ddlmZmZ ddlmZ ddl m!Z! ddl"m#Z#m$Z$m%Z% ddl&m'Z' ddl(m)Z) dZ*e#e+Z,ej-dZ.e/edZ0ddddddddddddgZ1dZ2dZ3dd Z4dd Z5G d d! d!e)Z6d"d#defd$d%Z7e
d&kree7d'd(ed)Z8nee7d'd#d*Z8e.see7d&d(ed)Z9nd+d,  Z8Z9d-d. Z:e.see7d/e:ed0Z;nd1d2 Z;d3d4 Z<dAd6d7Z=dBd9d:Z>dCd=d>Z?dDd?d@Z@dS )EzWorker command-line program.

This module is the 'program-version' of :mod:`celery.worker`.

It does everything necessary to run that module
as an actual application, like installing signal handlers,
platform tweaks, and so on.
    N)datetime)partial)REMAP_SIGTERM)current_process)safe_str)VERSION_BANNER	platformssignals)trace)	AppLoader)
EX_FAILUREEX_OKcheck_privileges)staticterm)cry)qualname)
get_loggerin_sighandlerset_in_sighandler)	pluralize)WorkController)Workerjavapypy_version_infoz --------------z--- ***** -----z-- ******* ----z- *** --- * ---z- ** ----------z{hostname} v{version}

{platform} {timestamp}

[config]
.> app:         {app}
.> transport:   {conninfo}
.> results:     {results}
.> concurrency: {concurrency}
.> task events: {events}

[queues]
{queues}
z
[tasks]
{tasks}
c                  C   s    ddl m}  tdd |  D S )Nr   	enumeratec                 s   s   | ]}|j d sdV  qdS )zDummy-   N)name
startswith).0t r"   6/tmp/pip-unpacked-wheel-f4liivr4/celery/apps/worker.py	<genexpr>L   s    z&active_thread_count.<locals>.<genexpr>)	threadingr   sumr   r"   r"   r#   active_thread_countJ   s    r'   c                 C   s   t d|  tjdd d S )N
Tfileflush)printsys
__stderr__)msgr"   r"   r#   safe_sayP   s    r0   c                       s   e Zd ZdZd#ddZd$ fdd	Zdd	 Z fd
dZdd Zdd Z	d%ddZ
dd Zd&ddZdd Zd'ddZdd Zdd  Zd!d" Z  ZS )(r   zWorker as a program.Fc                 K   sB   || _ t| j| j tjj| j| | jj|d t	| jjj
 d S )N)senderinstanceconfoptions)quietr
   setup_worker_optimizationsapphostnamer	   Zceleryd_initsendr3   r   Zaccept_content)selfr5   kwargsr"   r"   r#   on_before_initW   s      zWorker.on_before_initNc                    sn   | j d|| _| j d|| _t jf | || _|| _tj	
 | _| j jj| j|d k	r`| n|d| _d S )NZworker_redirect_stdoutsZworker_redirect_stdouts_level)enabled)r7   Zeitherredirect_stdoutsredirect_stdouts_levelsuperZsetup_defaultspurgeno_colorr-   stdoutisatty_isattylogcoloredlogfile)r:   rA   rB   r>   r?   r;   	__class__r"   r#   on_after_initc   s       zWorker.on_after_initc                 C   s   |   | _t| j| j d S N)setup_logging_custom_loggingr
   r6   r7   r8   r:   r"   r"   r#   on_init_blueprints   s    
zWorker.on_init_blueprintc                    s   | j }t   tjj| j| |jd | jr4| 	  | j
sB|   | d | |  | jsp| jrp|j| j d}|j}t|tr| dk}|r|j rtd d S )N)r1   r2   r3   z-active-T)zdjango.conf:settingszPlease run `celery upgrade settings path/to/settings.py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0.)r7   r@   on_startr	   Zceleryd_after_setupr9   r8   r3   rA   purge_messagesr5   emit_bannerset_process_statusinstall_platform_tweaksrN   r>   rF   r?   Z_config_source
isinstancestrlowerZmaybe_warn_deprecated_settingsloggerwarning)r:   r7   Zwarn_deprecatedZconfig_sourcerI   r"   r#   rQ   y   s0    
  



zWorker.on_startc                 C   sl   t  }|rtt t  ttdt| j	
d| j| dt| j	|  pRdgtjdd d S )N z 
)artlinesTr)   )r   Zsupports_imagesr,   Zimgcatr   Zlogor   joinrW   rG   Zcyanstartup_inforeset
extra_infor-   
__stdout__)r:   Z	use_imager"   r"   r#   rS      s      zWorker.emit_bannerc                 C   s$   t jj|d tdt| j d S )N)r1   z	%s ready.)r	   Zworker_readyr9   rY   infor   r8   )r:   Zconsumerr"   r"   r#   on_consumer_ready   s    zWorker.on_consumer_readyc                 C   s8   |d kr| j d k	r| j  }| jjj| j| jd|| jdS )NF)r>   colorizer8   )rB   r7   rF   setuploglevelrH   r8   )r:   rd   r"   r"   r#   rM      s       zWorker.setup_loggingc              	   C   sN   | j  :}| j jj|d}|r@td| dt|d ddd W 5 Q R X d S )N)
connectionzpurge: Erased  messagez from the queue.
T)r+   )r7   Zconnection_for_writecontrolrA   r,   r   )r:   rg   countr"   r"   r#   rR      s    zWorker.purge_messagesTr(   celery.c                    s"   |  fddt| jjD S )Nc                 3   s,   | ]$} s| sn|rd | V  qdS )z  . N)r   )r    Ztaskinclude_builtinsint_r"   r#   r$      s
     z"Worker.tasklist.<locals>.<genexpr>)r]   sortedr7   tasks)r:   rn   sepro   r"   rm   r#   tasklist   s    
zWorker.tasklistc                 C   sB   | j d krd S | j tjkr>| j tjk}| j|d}tj|dS d S )N)rn   )rq   )rf   loggingINFODEBUGrs   EXTRA_INFO_FMTformat)r:   rn   rs   r"   r"   r#   r`      s    
zWorker.extra_infoc                 C   s  | j }t| j}d|jpdt|}t|jtsbt	|j}|
drR|dd  }|d| d7 }| jr| j\}}d| d| d	}| j}t|ts|j}|d|d
d  d7 }d}	| jsd}	tj|t| jt jddt| j   | j j |tt |	|jjjdddd
 }
|r~t|
D ]P\}}zd t!| |
| g|
|< W n& t"k
rx   d|
|  |
|< Y nX q,d |
d S )Nz{}:{:#x}__main__zcelery.loaders    ()z{min=z, max=}.ONz/OFF (enable -E to monitor tasks in this worker)r   )microsecondF)indentindent_first)
r7   r8   	timestampversionZconninforesultsconcurrencyplatformeventsqueuesrh   z                r(   )#r7   rW   r   rx   mainidrV   loaderr   r   r   Z	autoscalepool_cls
__module__splitZtask_eventsBANNERr   r8   r   nowreplacer   rg   as_uribackend	_platformr   Zamqpr   
splitlinesr   r]   ARTLINES
IndexError)r:   r\   r7   r   Zapprr   maxminpoolr   Zbanneri_r"   r"   r#   r^      sJ    







zWorker.startup_infoc                 C   sX   | j jr|   | js0| j jr(t| nt| t| t| t| t	  t
  dS )z1Install platform specific tweaks and workarounds.N)r7   ZIS_macOS macOS_proxy_detection_workaroundrE   !install_HUP_not_supported_handlerinstall_worker_restart_handlerinstall_worker_term_handler install_worker_term_hard_handlerinstall_worker_int_handlerinstall_cry_handlerinstall_rdb_handler)r:   workerr"   r"   r#   rU      s    
zWorker.install_platform_tweaksc                 C   s   t jdd dS )z6See https://github.com/celery/celery/issues#issue/161.Zcelery_dummy_proxyZset_by_celerydN)osenviron
setdefaultrO   r"   r"   r#   r     s    z'Worker.macOS_proxy_detection_workaroundc                 C   s&   t jd| dt tj d| jdS )NZcelerydr{   r|   )rb   r8   )r   Zset_mp_process_titleZstrargvr-   argvr8   )r:   rb   r"   r"   r#   rT     s
    zWorker.set_process_status)F)FNNN)N)Tr(   rl   )T)__name__r   __qualname____doc__r<   rK   rP   rQ   rS   rc   rM   rR   rs   r`   r^   rU   r   rT   __classcell__r"   r"   rI   r#   r   T   s$   
    (


*r   TERMWarmc                    s2    fdd}t d |_|tj< d S )Nc               	      sv   t  f ddlm} t jdkrR r,  td d tjjj	d t
|ddd	  W 5 Q R X d S )
Nr   stateMainProcesszworker: z shutdown (MainProcess))r1   sighowexitcodeshould_stopZshould_terminate)r   Cold)r   celery.workerr   r   _namer0   r	   Zworker_shutting_downr9   r8   setattr)argsr   callbackr   r   r   r   r"   r#   _handle_request  s&      z*_shutdown_handler.<locals>._handle_requestZworker_)rW   r   r   r	   )r   r   r   r   r   r   r"   r   r#   _shutdown_handler  s    r   SIGQUITSIGTERMr   )r   r   r   )r   r   c                  O   s   d S rL   r"   )akwr"   r"   r#   <lambda><      r   c                 C   s   t d t| dd d S )Nz>worker: Hitting Ctrl+C again will terminate all running tasks!SIGINTr   )r0   r   )r   r"   r"   r#   	on_SIGINT?  s    r   r   )r   r   r   c                  O   s   d S rL   r"   )r   r;   r"   r"   r#   r   J  s    r   c                   C   s2   t tjtjtjg ttjtjgtj	  d S rL   )
r   Zclose_open_fdsr-   	__stdin__ra   r.   r   execv
executabler   r"   r"   r"   r#   _reload_current_workerN  s      r   SIGHUPc                 C   s   dd }|t j|< d S )Nc                  W   sH   t d tddtj d ddl}|t ddlm	} t
|_dS )z5Signal handler restarting the current python program.TzRestarting celery worker (rh   r|   r   Nr   )r   r0   r]   r-   r   atexitregisterr   r   r   r   r   )r   r   r   r"   r"   r#   restart_worker_sig_handlerW  s    
zBinstall_worker_restart_handler.<locals>.restart_worker_sig_handlerr   r	   )r   r   r   r"   r"   r#   r   U  s    r   SIGUSR1c                 C   s   t rd S dd }|tj| < d S )Nc               	   W   s    t   tt  W 5 Q R X dS )z=Signal handler logging the stack-trace of all active threads.N)r   r0   r   )r   r"   r"   r#   cry_handlerg  s    z(install_cry_handler.<locals>.cry_handler)is_pypyr   r	   )r   r   r"   r"   r#   r   b  s    r   CELERY_RDBSIGSIGUSR2c                 C   s"   dd }t j| r|tj|< d S )Nc               	   W   sB   t  2 ddlm}m} | r$| d n| j}|| W 5 Q R X dS )z=Signal handler setting a rdb breakpoint at the current frame.r   )_frame	set_tracer   N)r   Zcelery.contrib.rdbr   r   f_back)r   r   r   framer"   r"   r#   rdb_handlerq  s    z(install_rdb_handler.<locals>.rdb_handler)r   r   getr   r	   )Zenvvarr   r   r"   r"   r#   r   n  s    r   c                    s    fdd}|t j < d S )Nc              	      s&   t   tdj d W 5 Q R X d S )NzH{sig} not supported: Restarting with {sig} is unstable on this platform!r   )r   r0   rx   )signumr   r   r"   r#   warn_on_HUP_handler  s    z>install_HUP_not_supported_handler.<locals>.warn_on_HUP_handlerr   )r   r   r   r"   r   r#   r   }  s    r   )r   )r   )r   r   )r   )Ar   rt   r   r   r   r-   r   	functoolsr   Zbilliard.commonr   Zbilliard.processr   Zkombu.utils.encodingr   Zceleryr   r   r	   Z
celery.appr
   Zcelery.loaders.appr   Zcelery.platformsr   r   r   Zcelery.utilsr   r   Zcelery.utils.debugr   Zcelery.utils.importsr   Zcelery.utils.logr   r   r   Zcelery.utils.textr   r   r   __all__r   rY   r   Z	is_jythonhasattrr   r   r   rw   r'   r0   r   r   r   r   r   r   r   r   r   r   r   r"   r"   r"   r#   <module>   s   
 F 

         

  
