U
    .e_                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	m
Z
 ddlmZ ddlmZ ddlmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZmZm Z  ddl!m"Z" ddl#m$Z$m%Z% ddl&m'Z' ddl(m)Z)m*Z* ddl+m,Z,m-Z- ddl.m/Z/m0Z0 dZ1eddZ2e,e3Z4e4j5e4j6e4j7e4j8f\Z5Z6Z7Z8dZ9G dd de:Z;G dd dZ<eG dd dZ=dd Z>d d! Z?G d"d# d#Z@G d$d% d%e@ZAG d&d' d'ZBG d(d) d)eZCz
e  W n eDk
r   dZEY nX G d*d+ d+eZEd.d,d-ZFdS )/zThe periodic task scheduler.    N)timegm)
namedtuple)total_ordering)EventThread)ensure_multiprocessing)reset_signals)Process)maybe_evaluatereprcall)cached_property   )__version__	platformssignals)reraise)crontabmaybe_schedule)is_numeric_value)load_extension_class_namessymbol_by_name)
get_loggeriter_open_logger_fds)humanize_secondsmaybe_make_aware)SchedulingErrorScheduleEntry	SchedulerPersistentSchedulerServiceEmbeddedServiceevent_t)timepriorityentryi,  c                   @   s   e Zd ZdZdS )r   z*An error occurred while scheduling a task.N)__name__
__module____qualname____doc__ r)   r)   //tmp/pip-unpacked-wheel-f4liivr4/celery/beat.pyr   ,   s   r   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	BeatLazyFuncao  A lazy function declared in 'beat_schedule' and called before sending to worker.

    Example:

        beat_schedule = {
            'test-every-5-minutes': {
                'task': 'test',
                'schedule': 300,
                'kwargs': {
                    "current": BeatCallBack(datetime.datetime.now)
                }
            }
        }

    c                 O   s   || _ ||d| _d S )N)argskwargsZ_funcZ_func_params)selffuncr,   r-   r)   r)   r*   __init__A   s    zBeatLazyFunc.__init__c                 C   s   |   S N)delayr/   r)   r)   r*   __call__H   s    zBeatLazyFunc.__call__c                 C   s   | j | jd | jd S )Nr,   r-   r.   r4   r)   r)   r*   r3   K   s    zBeatLazyFunc.delayN)r%   r&   r'   r(   r1   r5   r3   r)   r)   r)   r*   r+   0   s   r+   c                
   @   s   e Zd ZdZdZdZdZdZdZdZ	dZ
dddZdd	 ZeZdd
dZe ZZdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd ZdS )r   a  An entry in the scheduler.

    Arguments:
        name (str): see :attr:`name`.
        schedule (~celery.schedules.schedule): see :attr:`schedule`.
        args (Tuple): see :attr:`args`.
        kwargs (Dict): see :attr:`kwargs`.
        options (Dict): see :attr:`options`.
        last_run_at (~datetime.datetime): see :attr:`last_run_at`.
        total_run_count (int): see :attr:`total_run_count`.
        relative (bool): Is the time relative to when the server starts?
    Nr   r)   Fc                 C   sb   |
| _ || _|| _|| _|r |ni | _|r.|ni | _t||	| j d| _|pP|  | _	|pZd| _
d S )N)appr   )r6   nametaskr,   r-   optionsr   scheduledefault_nowlast_run_attotal_run_count)r/   r7   r8   r<   r=   r:   r,   r-   r9   relativer6   r)   r)   r*   r1   s   s    zScheduleEntry.__init__c                 C   s   | j r| j  S | j S r2   )r:   nowr6   r4   r)   r)   r*   r;      s    zScheduleEntry.default_nowc                 C   s$   | j f t| |p|  | jd dS )z8Return new instance, with date and count fields updated.r   )r<   r=   )	__class__dictr;   r=   )r/   r<   r)   r)   r*   _next_instance   s
    
zScheduleEntry._next_instancec              	   C   s*   | j | j| j| j| j| j| j| j| jffS r2   )	r@   r7   r8   r<   r=   r:   r,   r-   r9   r4   r)   r)   r*   
__reduce__   s          zScheduleEntry.__reduce__c                 C   s&   | j |j|j|j|j|jd dS )zUpdate values from another entry.

        Will only update "editable" fields:
            ``task``, ``schedule``, ``args``, ``kwargs``, ``options``.
        )r8   r:   r,   r-   r9   N)__dict__updater8   r:   r,   r-   r9   r/   otherr)   r)   r*   rE      s      zScheduleEntry.updatec                 C   s   | j | jS )z.See :meth:`~celery.schedules.schedule.is_due`.)r:   is_duer<   r4   r)   r)   r*   rH      s    zScheduleEntry.is_duec                 C   s   t t|  S r2   )itervarsitemsr4   r)   r)   r*   __iter__   s    zScheduleEntry.__iter__c                 C   s,   dj | t| j| jpd| jpi t| jdS )Nz%<{name}: {0.name} {call} {0.schedule}r)   )callr7   )formatr   r8   r,   r-   typer%   r4   r)   r)   r*   __repr__   s
    zScheduleEntry.__repr__c                 C   s   t |trt| t|k S tS r2   )
isinstancer   idNotImplementedrF   r)   r)   r*   __lt__   s    
zScheduleEntry.__lt__c                 C   s(   dD ]}t | |t ||kr dS qdS )N)r8   r,   r-   r9   r:   FT)getattr)r/   rG   attrr)   r)   r*   editable_fields_equal   s    z#ScheduleEntry.editable_fields_equalc                 C   s
   |  |S )zTest schedule entries equality.

        Will only compare "editable" fields:
        ``task``, ``schedule``, ``args``, ``kwargs``, ``options``.
        )rW   rF   r)   r)   r*   __eq__   s    zScheduleEntry.__eq__)
NNNNNr)   NNFN)N)r%   r&   r'   r(   r7   r:   r,   r-   r9   r<   r=   r1   r;   Z_default_nowrB   __next__nextrC   rE   rH   rL   rP   rT   rW   rX   r)   r)   r)   r*   r   O   s:                

r   c                 C   s   | sg S dd | D S )Nc                 S   s    g | ]}t |tr| n|qS r)   rQ   r+   ).0vr)   r)   r*   
<listcomp>   s   z(_evaluate_entry_args.<locals>.<listcomp>r)   )
entry_argsr)   r)   r*   _evaluate_entry_args   s
    r`   c                 C   s   | si S dd |   D S )Nc                 S   s&   i | ]\}}|t |tr| n|qS r)   r[   )r\   kr]   r)   r)   r*   
<dictcomp>   s    z*_evaluate_entry_kwargs.<locals>.<dictcomp>)rK   )entry_kwargsr)   r)   r*   _evaluate_entry_kwargs   s
    rd   c                   @   s@  e Zd ZdZeZdZeZdZ	dZ
dZdZeZd>ddZdd	 Zd?d
dZd@ddZdd ZefddZeejfddZeeejejfddZdd Zdd Zdd ZdAddZ d d! Z!d"d# Z"d$d% Z#d&d' Z$d(d) Z%d*d+ Z&d,d- Z'd.d/ Z(d0d1 Z)d2d3 Z*d4d5 Z+d6d7 Z,e-e+e,Ze.d8d9 Z/e.d:d; Z0e-d<d= Z1dS )Br   a  Scheduler for periodic tasks.

    The :program:`celery beat` program may instantiate this class
    multiple times for introspection purposes, but then with the
    ``lazy`` argument set.  It's important for subclasses to
    be idempotent when this argument is set.

    Arguments:
        schedule (~celery.schedules.schedule): see :attr:`schedule`.
        max_interval (int): see :attr:`max_interval`.
        lazy (bool): Don't set up the schedule.
    N   r   Fc                 K   sp   || _ t|d kri n|| _|p,|jjp,| j| _|p:|jj| _d | _d | _	|d krZ|jj
n|| _|sl|   d S r2   )r6   r
   dataconfbeat_max_loop_intervalmax_intervalZamqpProducer_heapold_schedulersZbeat_sync_everysync_every_taskssetup_schedule)r/   r6   r:   ri   rj   lazyrm   r-   r)   r)   r*   r1      s    zScheduler.__init__c                 C   sJ   i }| j jjr<| j jjs<d|kr<dtdddddid|d< | | d S )Nzcelery.backend_cleanup04*expiresi  )r8   r:   r9   )r6   rg   Zresult_expiresbackendZsupports_autoexpirer   update_from_dict)r/   rf   entriesr)   r)   r*   install_default_entries
  s    


z!Scheduler.install_default_entriesc              
   C   s   t d|j|j z| j||dd}W n6 tk
rZ } ztd|t dd W 5 d }~X Y n.X |r|t|dr|t	d|j|j
 nt	d	|j d S )
Nz#Scheduler: Sending due task %s (%s)F)produceradvancezMessage Error: %s
%sTexc_inforR   z%s sent. id->%sz%s sent.)infor7   r8   apply_async	Exceptionerror	tracebackformat_stackhasattrdebugrR   )r/   r$   rx   resultexcr)   r)   r*   apply_entry  s      zScheduler.apply_entry{Gzc                 C   s   |r|dkr|| S |S )Nr   r)   )r/   nZdriftr)   r)   r*   adjust"  s    zScheduler.adjustc                 C   s   |  S r2   )rH   )r/   r$   r)   r)   r*   rH   '  s    zScheduler.is_duec                 C   s4   | j }t| }|| |jd  ||p0d S )z9Return a utc timestamp, make sure heapq in correct order.g    .Ar   )r   r   r;   utctimetuplemicrosecond)r/   r$   next_time_to_runmktimer   Zas_nowr)   r)   r*   _when*  s    

zScheduler._whenc                 C   s\   d}g | _ | j D ]8}| \}}| j || ||r:dn|pBd|| q|| j  dS )z:Populate the heap with the data contained in the schedule.   r   N)rk   r:   valuesrH   appendr   )r/   r!   heapifyr#   r$   rH   Znext_call_delayr)   r)   r*   populate_heap4  s    
 zScheduler.populate_heapc                 C   s   | j }| j}| jdks&| | j| js<t| j| _|   | j}|sJ|S |d }|d }	| |	\}
}|
r||}||kr| 	|	}| j
|	| jd |||| |||d | dS ||| ||d |S ||}|t|r|n||S )zRun a tick - one iteration of the scheduler.

        Executes one due task per call.

        Returns:
            float: preferred delay in seconds for next call.
        Nr      )rx   r   )r   ri   rk   schedules_equalrl   r:   copyr   rH   reserver   rx   r   r   )r/   r!   minheappopheappushr   ri   Heventr$   rH   r   verifyZ
next_entryZadjusted_next_time_to_runr)   r)   r*   tickD  s:    	

 
zScheduler.tickc                 C   s   ||  krd krn ndS |d ks,|d kr0dS t | t | krLdS | D ]*\}}||}|sp dS ||krT dS qTdS )NTF)setkeysrK   get)r/   Zold_schedulesZnew_schedulesr7   Z	old_entry	new_entryr)   r)   r*   r   l  s    
zScheduler.schedules_equalc                 C   s.   | j  p,t | j  | jkp,| jo,| j| jkS r2   )
_last_syncr"   	monotonic
sync_everyrm   _tasks_since_syncr4   r)   r)   r*   should_sync{  s    
zScheduler.should_syncc                 C   s   t | }| j|j< |S r2   )rZ   r:   r7   )r/   r$   r   r)   r)   r*   r     s    zScheduler.reserveTc           	   
   K   s   |r|  |n|}| jj|j}zz`t|j	}t
|j}|r^|j||fd|i|jW W rS | j|j||fd|i|jW W NS W nD tk
r } z&tttdj||dt d  W 5 d }~X Y nX W 5 |  jd7  _|  r|   X d S )Nr   rx   z-Couldn't apply scheduled task {0.name}: {exc})r   r   )r   r6   Ztasksr   r8   r   r   _do_syncr`   r,   rd   r-   r}   r9   	send_taskr~   r   r   rN   sysr{   )	r/   r$   rx   ry   r-   r8   r_   rc   r   r)   r)   r*   r}     s8    

 
zScheduler.apply_asyncc                 O   s   | j j||S r2   )r6   r   r/   r,   r-   r)   r)   r*   r     s    zScheduler.send_taskc                 C   s    |  | j | | jjj d S r2   )rw   rf   merge_inplacer6   rg   beat_scheduler4   r)   r)   r*   rn     s    zScheduler.setup_schedulec                 C   s,   ztd |   W 5 t  | _d| _X d S )Nr   zbeat: Synchronizing schedule...)r"   r   r   r   r   syncr4   r)   r)   r*   r     s
    
zScheduler._do_syncc                 C   s   d S r2   r)   r4   r)   r)   r*   r     s    zScheduler.syncc                 C   s   |    d S r2   )r   r4   r)   r)   r*   close  s    zScheduler.closec                 K   s&   | j f d| ji|}|| j|j< |S )Nr6   )Entryr6   r:   r7   )r/   r-   r$   r)   r)   r*   add  s    zScheduler.addc                 C   s0   t || jr| j|_|S | jf t||| jdS N)r7   r6   )rQ   r   r6   rA   )r/   r7   r$   r)   r)   r*   _maybe_entry  s    zScheduler._maybe_entryc                    s"    j  fdd| D  d S )Nc                    s   i | ]\}}|  ||qS r)   )r   )r\   r7   r$   r4   r)   r*   rb     s    z.Scheduler.update_from_dict.<locals>.<dictcomp>)r:   rE   rK   )r/   Zdict_r)   r4   r*   ru     s    zScheduler.update_from_dictc                 C   s~   | j }t|t| }}||A D ]}||d  q |D ]B}| jf t|| || jd}||rp|| | q6|||< q6d S r   )r:   r   popr   rA   r6   r   rE   )r/   br:   ABkeyr$   r)   r)   r*   r     s    
zScheduler.merge_inplacec                 C   s   dd }| j || jjjS )Nc                 S   s   t d| | d S )Nz9beat: Connection error: %s. Trying again in %s seconds...)r   )r   intervalr)   r)   r*   _error_handler  s     z3Scheduler._ensure_connected.<locals>._error_handler)
connectionZensure_connectionr6   rg   Zbroker_connection_max_retries)r/   r   r)   r)   r*   _ensure_connected  s
     zScheduler._ensure_connectedc                 C   s   | j S r2   rf   r4   r)   r)   r*   get_schedule  s    zScheduler.get_schedulec                 C   s
   || _ d S r2   r   r/   r:   r)   r)   r*   set_schedule  s    zScheduler.set_schedulec                 C   s
   | j  S r2   )r6   Zconnection_for_writer4   r)   r)   r*   r     s    zScheduler.connectionc                 C   s   | j |  ddS )NF)Zauto_declare)rj   r   r4   r)   r)   r*   rx     s    zScheduler.producerc                 C   s   dS )N r)   r4   r)   r)   r*   r|     s    zScheduler.info)NNNFN)N)r   )NT)2r%   r&   r'   r(   r   r   r:   DEFAULT_MAX_INTERVALri   r   rm   r   r   loggerr1   rw   r   r   rH   r   r   r!   heapqr   r   r   r   r   r   r   r   r   r}   r   rn   r   r   r   r   r   ru   r   r   r   r   propertyr   r   rx   r|   r)   r)   r)   r*   r      s\         




(



r   c                       s   e Zd ZdZeZdZdZ fddZdd Z	dd	 Z
d
d Zdd Zdd Zdd Zdd ZeeeZdd Zdd Zedd Z  ZS )r   z+Scheduler backed by :mod:`shelve` database.)r   z.dbz.datz.bakz.dirNc                    s   | d| _t j|| d S )Nschedule_filename)r   r   superr1   r   r@   r)   r*   r1     s    zPersistentScheduler.__init__c              
   C   s8   | j D ],}ttj t| j|  W 5 Q R X qd S r2   )known_suffixesr   Zignore_errnoerrnoENOENTosremover   )r/   suffixr)   r)   r*   
_remove_db  s    
zPersistentScheduler._remove_dbc                 C   s   | j j| jddS )NT)Z	writeback)persistenceopenr   r4   r)   r)   r*   _open_schedule  s    z"PersistentScheduler._open_schedulec                 C   s"   t d| j|dd |   |  S )Nz'Removing corrupted schedule file %r: %rTrz   )r   r   r   r   )r/   r   r)   r)   r*    _destroy_open_corrupted_schedule  s      z4PersistentScheduler._destroy_open_corrupted_schedulec              
   C   sD  z|   | _| j  W n. tk
rF } z| || _W 5 d }~X Y nX |   | jjj}| j	d}|d k	r||krt
d|| | j  | jjj}| j	d}|d k	r||krddd}t
d|| ||  | j  | jdi }| | jjj | | j | jt||d	 |   td
ddd | D   d S )Ntzz%Reset: Timezone changed from %r to %rutc_enabledenableddisabled)TFz Reset: UTC changed from %s to %srv   )r   r   r   zCurrent schedule:

c                 s   s   | ]}t |V  qd S r2   )repr)r\   r$   r)   r)   r*   	<genexpr>4  s    z5PersistentScheduler.setup_schedule.<locals>.<genexpr>)r   _storer   r~   r   _create_scheduler6   rg   timezoner   warningclearZ
enable_utc
setdefaultr   r   rw   r:   rE   r   r   r   joinr   )r/   r   r   Z	stored_tzutcZ
stored_utcchoicesrv   r)   r)   r*   rn     s@    




 
z"PersistentScheduler.setup_schedulec                 C   s   dD ]}z| j d  W n\ tk
rr   zi | j d< W n8 tk
rl } z| || _ W Y Y qW 5 d }~X Y nX Y nZX d| j krtd | j   n:d| j krtd | j   nd| j krtd | j    qqd S )	N)r   r   rv   r   z+DB Reset: Account for new __version__ fieldr   z"DB Reset: Account for new tz fieldr   z+DB Reset: Account for new utc_enabled field)r   KeyErrorr   r   r   )r/   _r   r)   r)   r*   r   7  s&    "



z$PersistentScheduler._create_schedulec                 C   s
   | j d S Nrv   r   r4   r)   r)   r*   r   N  s    z PersistentScheduler.get_schedulec                 C   s   || j d< d S r   r   r   r)   r)   r*   r   Q  s    z PersistentScheduler.set_schedulec                 C   s   | j d k	r| j   d S r2   )r   r   r4   r)   r)   r*   r   U  s    
zPersistentScheduler.syncc                 C   s   |    | j  d S r2   )r   r   r   r4   r)   r)   r*   r   Y  s    zPersistentScheduler.closec                 C   s   d| j  S )Nz    . db -> )r   r4   r)   r)   r*   r|   ]  s    zPersistentScheduler.info)r%   r&   r'   r(   shelver   r   r   r1   r   r   r   rn   r   r   r   r   r:   r   r   r|   __classcell__r)   r)   r   r*   r     s"   &
r   c                   @   sX   e Zd ZdZeZdddZdd Zddd	Zd
d Z	dddZ
dddZedd ZdS )r   zCelery periodic task service.Nc                 C   sB   || _ |p|jj| _|p| j| _|p*|jj| _t | _t | _	d S r2   )
r6   rg   rh   ri   scheduler_clsZbeat_schedule_filenamer   r   _is_shutdown_is_stopped)r/   r6   ri   r   r   r)   r)   r*   r1   g  s    
zService.__init__c                 C   s   | j | j| j| j| jffS r2   )r@   ri   r   r   r6   r4   r)   r)   r*   rC   s  s     zService.__reduce__Fc              	   C   s   t d tdt| jj tjj| d |rDtjj| d t	
d z~zV| j s| j }|rH|dkrHtdt|dd t| | j rH| j  qHW n" ttfk
r   | j  Y nX W 5 |   X d S )	Nzbeat: Starting...z#beat: Ticking with max interval->%s)Zsenderzcelery beatg        zbeat: Waking up %s.zin )prefix)r|   r   r   	schedulerri   r   Z	beat_initsendZbeat_embedded_initr   Zset_process_titler   r   is_setr   r"   sleepr   r   KeyboardInterrupt
SystemExitr   )r/   embedded_processr   r)   r)   r*   startw  s*    






zService.startc                 C   s   | j   | j  d S r2   )r   r   r   r   r4   r)   r)   r*   r     s    
zService.syncc                 C   s$   t d | j  |o| j  d S )Nzbeat: Shutting down...)r|   r   r   r   wait)r/   r   r)   r)   r*   stop  s    
zService.stopcelery.beat_schedulersc                 C   s0   | j }tt|}t| j|d| j|| j|dS )N)aliases)r6   r   ri   ro   )r   rA   r   r   r   r6   ri   )r/   ro   Zextension_namespacefilenamer   r)   r)   r*   get_scheduler  s    zService.get_schedulerc                 C   s   |   S r2   )r   r4   r)   r)   r*   r     s    zService.scheduler)NNN)F)F)Fr   )r%   r&   r'   r(   r   r   r1   rC   r   r   r   r   r   r   r)   r)   r)   r*   r   b  s     


  
r   c                       s0   e Zd ZdZ fddZdd Zdd Z  ZS )	_Threadedz(Embedded task scheduler using threading.c                    s.   t    || _t|f|| _d| _d| _d S )NTBeat)r   r1   r6   r   servicedaemonr7   r/   r6   r-   r   r)   r*   r1     s
    
z_Threaded.__init__c                 C   s   | j   | j  d S r2   )r6   set_currentr   r   r4   r)   r)   r*   run  s    
z_Threaded.runc                 C   s   | j jdd d S )NT)r   )r   r   r4   r)   r)   r*   r     s    z_Threaded.stop)r%   r&   r'   r(   r1   r  r   r   r)   r)   r   r*   r     s   r   c                       s,   e Zd Z fddZdd Zdd Z  ZS )_Processc                    s(   t    || _t|f|| _d| _d S )Nr   )r   r1   r6   r   r   r7   r   r   r)   r*   r1     s    
z_Process.__init__c                 C   sP   t dd ttjtjtjgtt   | j	
  | j	  | jjdd d S )NF)fullT)r   )r   r   Zclose_open_fdsr   	__stdin__
__stdout__
__stderr__listr   r6   set_defaultr  r   r   r4   r)   r)   r*   r    s    
  

z_Process.runc                 C   s   | j   |   d S r2   )r   r   	terminater4   r)   r)   r*   r     s    
z_Process.stop)r%   r&   r'   r1   r  r   r   r)   r)   r   r*   r    s   	r  c                 K   s<   | ddstdkr(t| fddi|S t| fd|i|S )zReturn embedded clock service.

    Arguments:
        thread (bool): Run threaded instead of as a separate process.
            Uses :mod:`multiprocessing` by default, if available.
    threadFNri   r   )r   r  r   )r6   ri   r-   r)   r)   r*   r      s    r    )N)Gr(   r   r   r   r   r   r   r"   r   calendarr   collectionsr   	functoolsr   	threadingr   r   Zbilliardr   Zbilliard.commonr   Zbilliard.contextr	   Zkombu.utils.functionalr
   r   Zkombu.utils.objectsr   r   r   r   r   
exceptionsr   Z	schedulesr   r   Zutils.functionalr   Zutils.importsr   r   Z	utils.logr   r   Z
utils.timer   r   __all__r!   r%   r   r   r|   r   r   r   r~   r   r+   r   r`   rd   r   r   r   r   NotImplementedErrorr  r    r)   r)   r)   r*   <module>   sd   
 w		   kF

