U
    .eɊ                     @   sj  d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZmZmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZmZmZ ddlmZmZ zddlZW n ek
r   dZY nX dZ dZ!dd Z"edd Z#edd Z$G dd dZ%ej&G dd de%Z'ej&G dd de%Z(ej&G dd de(Z)ej&G dd de'Z*d"d d!Z+dS )#z3Task results/state and results for groups of tasks.    N)deque)contextmanager)proxy)cached_property)Thenablebarrierpromise   )current_appstates)_set_task_join_will_blocktask_join_will_block)app_or_default)ImproperlyConfiguredIncompleteStreamTimeoutError)DependencyGraphGraphFormatter)
ResultBaseAsyncResult	ResultSetGroupResultEagerResultresult_from_tuplezNever call result.get() within a task!
See https://docs.celeryq.dev/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks
c                   C   s   t  rttd S N)r   RuntimeErrorE_WOULDBLOCK r   r   1/tmp/pip-unpacked-wheel-f4liivr4/celery/result.pyassert_will_not_block#   s    r   c                  c   s(   t  } td z
d V  W 5 t|  X d S NFr   r   Zreset_valuer   r   r   allow_join_result(   s
    
r#   c                  c   s(   t  } td z
d V  W 5 t|  X d S NTr!   r"   r   r   r   denied_join_result2   s
    
r%   c                   @   s   e Zd ZdZdZdS )r   zBase class for results.N)__name__
__module____qualname____doc__parentr   r   r   r   r   <   s   r   c                   @   s4  e Zd ZdZdZeZdZdZdhddZe	dd Z
e
jdd Z
did	d
Zdd Zdd Zdd Zdd ZdjddZdkddZdddddddddejejfddZeZdd Zdd Zdldd Zd!d" Zdmd#d$Zd%d& Zd'd( Zd)d* Zd+d, Z dnd-d.Z!e!Z"d/d0 Z#dod1d2Z$d3d4 Z%d5d6 Z&d7d8 Z'd9d: Z(d;d< Z)d=d> Z*d?d@ Z+dAdB Z,e-dCdD Z.e	dEdF Z/e	dGdH Z0dIdJ Z1dKdL Z2dMdN Z3dOdP Z4e	dQdR Z5e5Z6e	dSdT Z7e	dUdV Z8e8Z9e	dWdX Z:e:jdYdX Z:e	dZd[ Z;e	d\d] Z<e	d^d_ Z=e	d`da Z>e	dbdc Z?e	ddde Z@e	dfdg ZAdS )pr   zxQuery task state.

    Arguments:
        id (str): See :attr:`id`.
        backend (Backend): See :attr:`backend`.
    Nc                 C   sd   |d krt dt| t|p$| j| _|| _|p:| jj| _|| _t| jdd| _	d | _
d| _d S )Nz#AsyncResult requires valid id, not TweakF)
ValueErrortyper   appidbackendr*   r   _on_fulfilledon_ready_cache_ignored)selfr0   r1   Z	task_namer/   r*   r   r   r   __init__W   s    zAsyncResult.__init__c                 C   s   t | dr| jS dS )z+If True, task result retrieval is disabled.r5   F)hasattrr5   r6   r   r   r   ignorede   s    
zAsyncResult.ignoredc                 C   s
   || _ dS )z%Enable/disable task result retrieval.N)r5   )r6   valuer   r   r   r:   l   s    Fc                 C   s   | j j| |d | j||S )Nr+   )r1   add_pending_resultr3   thenr6   callbackZon_errorr,   r   r   r   r=   q   s    zAsyncResult.thenc                 C   s   | j |  |S r   r1   remove_pending_resultr6   resultr   r   r   r2   u   s    zAsyncResult._on_fulfilledc                 C   s   | j }| j|o| fd fS r   )r*   r0   as_tuple)r6   r*   r   r   r   rD   y   s    zAsyncResult.as_tuplec                 C   s0   g }| j }|| j |dk	r,||  |S )zReturn as a list of task IDs.N)r*   appendr0   extendas_list)r6   resultsr*   r   r   r   rG   }   s    zAsyncResult.as_listc                 C   s(   d| _ | jr| j  | j| j dS )z/Forget the result of this task and its parents.N)r4   r*   forgetr1   r0   r9   r   r   r   rI      s    
zAsyncResult.forgetc                 C   s    | j jj| j|||||d dS )a  Send revoke signal to all workers.

        Any worker receiving the task, or having reserved the
        task, *must* ignore it.

        Arguments:
            terminate (bool): Also terminate the process currently working
                on the task (if any).
            signal (str): Name of signal to send to process if terminate.
                Default is TERM.
            wait (bool): Wait for replies from workers.
                The ``timeout`` argument specifies the seconds to wait.
                Disabled by default.
            timeout (float): Time in seconds to wait for replies when
                ``wait`` is enabled.
        
connection	terminatesignalreplytimeoutN)r/   controlrevoker0   r6   rK   rL   rM   waitrO   r   r   r   rQ      s      zAsyncResult.revokec                 C   s   | j jj||||||d dS )a7  Send revoke signal to all workers only for tasks with matching headers values.

        Any worker receiving the task, or having reserved the
        task, *must* ignore it.
        All header fields *must* match.

        Arguments:
            headers (dict[str, Union(str, list)]): Headers to match when revoking tasks.
            terminate (bool): Also terminate the process currently working
                on the task (if any).
            signal (str): Name of signal to send to process if terminate.
                Default is TERM.
            wait (bool): Wait for replies from workers.
                The ``timeout`` argument specifies the seconds to wait.
                Disabled by default.
            timeout (float): Time in seconds to wait for replies when
                ``wait`` is enabled.
        rJ   N)r/   rP   revoke_by_stamped_headers)r6   headersrK   rL   rM   rS   rO   r   r   r   rT      s      z%AsyncResult.revoke_by_stamped_headersT      ?c              
   C   s   | j r
dS |	rt  t }|r>|r>| jr>t| jdd}|   |rL|| | jrh|rb| j|d | jS | j	
|  | j	j| |||||||dS )a  Wait until task is ready, and return its result.

        Warning:
           Waiting for tasks within a task may lead to deadlocks.
           Please read :ref:`task-synchronous-subtasks`.

        Warning:
           Backends use resources to store and transmit results. To ensure
           that resources are released, you must eventually call
           :meth:`~@AsyncResult.get` or :meth:`~@AsyncResult.forget` on
           EVERY :class:`~@AsyncResult` instance returned after calling
           a task.

        Arguments:
            timeout (float): How long to wait, in seconds, before the
                operation times out. This is the setting for the publisher
                (celery client) and is different from `timeout` parameter of
                `@app.task`, which is the setting for the worker. The task
                isn't terminated even if timeout occurs.
            propagate (bool): Re-raise exception if the task failed.
            interval (float): Time to wait (in seconds) before retrying to
                retrieve the result.  Note that this does not have any effect
                when using the RPC/redis result store backends, as they don't
                use polling.
            no_ack (bool): Enable amqp no ack (automatically acknowledge
                message).  If this is :const:`False` then the message will
                **not be acked**.
            follow_parents (bool): Re-raise any exception raised by
                parent tasks.
            disable_sync_subtasks (bool): Disable tasks to wait for sub tasks
                this is the default configuration. CAUTION do not enable this
                unless you must.

        Raises:
            celery.exceptions.TimeoutError: if `timeout` isn't
                :const:`None` and the result does not arrive within
                `timeout` seconds.
            Exception: If the remote call raised an exception then that
                exception will be re-raised in the caller process.
        NTr+   )r?   )rO   intervalon_intervalno_ack	propagater?   
on_message)r:   r   r   r*   _maybe_reraise_parent_errorr=   r4   maybe_throwrC   r1   r<   Zwait_for_pending)r6   rO   rZ   rW   rY   Zfollow_parentsr?   r[   rX   disable_sync_subtasksEXCEPTION_STATESPROPAGATE_STATESZ_on_intervalr   r   r   get   s2    -
 zAsyncResult.getc                 C   s"   t t|  D ]}|  qd S r   )reversedlist_parentsr]   r6   noder   r   r   r\     s    z'AsyncResult._maybe_reraise_parent_errorc                 c   s   | j }|r|V  |j }qd S r   r*   re   r   r   r   rd   	  s    zAsyncResult._parentsc                 k   s,   | j |dD ]\}}||jf |fV  qdS )a  Collect results as they return.

        Iterator, like :meth:`get` will wait for the task to complete,
        but will also follow :class:`AsyncResult` and :class:`ResultSet`
        returned by the task, yielding ``(result, value)`` tuples for each
        result in the tree.

        An example would be having the following tasks:

        .. code-block:: python

            from celery import group
            from proj.celery import app

            @app.task(trail=True)
            def A(how_many):
                return group(B.s(i) for i in range(how_many))()

            @app.task(trail=True)
            def B(i):
                return pow2.delay(i)

            @app.task(trail=True)
            def pow2(i):
                return i ** 2

        .. code-block:: pycon

            >>> from celery.result import ResultBase
            >>> from proj.tasks import A

            >>> result = A.delay(10)
            >>> [v for v in result.collect()
            ...  if not isinstance(v, (ResultBase, tuple))]
            [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

        Note:
            The ``Task.trail`` option must be enabled
            so that the list of children is stored in ``result.children``.
            This is the default but enabled explicitly for illustration.

        Yields:
            Tuple[AsyncResult, Any]: tuples containing the result instance
            of the child task, and the return value of that task.
        intermediateNiterdepsra   )r6   ri   kwargs_Rr   r   r   collect  s    .zAsyncResult.collectc                 C   s"   d }|   D ]\}}| }q|S r   rj   )r6   r;   rm   rn   r   r   r   get_leaf@  s    
zAsyncResult.get_leafc                 #   sf   t d | fg}| }|rb| \} | fV    rV| fdd jpLg D  q|rt qd S )Nc                 3   s   | ]} |fV  qd S r   r   .0childrf   r   r   	<genexpr>O  s     z'AsyncResult.iterdeps.<locals>.<genexpr>)r   popleftreadyrF   childrenr   )r6   ri   stackZis_incomplete_streamr*   r   rt   r   rk   F  s    
 zAsyncResult.iterdepsc                 C   s   | j | jjkS )zReturn :const:`True` if the task has executed.

        If the task is still running, pending, or is waiting
        for retry then :const:`False` is returned.
        )stater1   READY_STATESr9   r   r   r   rw   T  s    zAsyncResult.readyc                 C   s   | j tjkS )z7Return :const:`True` if the task executed successfully.)rz   r   SUCCESSr9   r   r   r   
successful\  s    zAsyncResult.successfulc                 C   s   | j tjkS )z(Return :const:`True` if the task failed.)rz   r   FAILUREr9   r   r   r   failed`  s    zAsyncResult.failedc                 O   s   | j j|| d S r   )r3   throwr6   argsrl   r   r   r   r   d  s    zAsyncResult.throwc                 C   sn   | j d kr|  n| j }|d |d |d  }}}|tjkrV|rV| || | |d k	rj|| j| |S )NstatusrC   	traceback)r4   _get_task_metara   r   r`   r   _to_remote_tracebackr0   )r6   rZ   r?   cacherz   r;   tbr   r   r   r]   g  s      
zAsyncResult.maybe_throwc                 C   s*   |r&t d k	r&| jjjr&t j| S d S r   )tblibr/   confZtask_remote_tracebacks	TracebackZfrom_stringZas_traceback)r6   r   r   r   r   r   r  s    z AsyncResult._to_remote_tracebackc                 C   sL   t |pt| jddd}| j|dD ]"\}}|| |r$||| q$|S )NZoval)rootshape)	formatterrh   )r   r   r0   rk   Zadd_arcZadd_edge)r6   ri   r   graphr*   rf   r   r   r   build_graphv  s    
zAsyncResult.build_graphc                 C   s
   t | jS z`str(self) -> self.id`.strr0   r9   r   r   r   __str__  s    zAsyncResult.__str__c                 C   s
   t | jS z`hash(self) -> hash(self.id)`.hashr0   r9   r   r   r   __hash__  s    zAsyncResult.__hash__c                 C   s   dt | j d| j dS )N<: >)r.   r&   r0   r9   r   r   r   __repr__  s    zAsyncResult.__repr__c                 C   s.   t |tr|j| jkS t |tr*|| jkS tS r   )
isinstancer   r0   r   NotImplementedr6   otherr   r   r   __eq__  s
    


zAsyncResult.__eq__c                 C   s   |  | j| jd | j| jS r   )	__class__r0   r1   r/   r*   r9   r   r   r   __copy__  s        zAsyncResult.__copy__c                 C   s   | j |  fS r   r   __reduce_args__r9   r   r   r   
__reduce__  s    zAsyncResult.__reduce__c                 C   s   | j | jd d | jfS r   )r0   r1   r*   r9   r   r   r   r     s    zAsyncResult.__reduce_args__c                 C   s   | j dk	r| j |  dS )z9Cancel pending operations when the instance is destroyed.Nr@   r9   r   r   r   __del__  s    
zAsyncResult.__del__c                 C   s   |   S r   )r   r9   r   r   r   r     s    zAsyncResult.graphc                 C   s   | j jS r   )r1   supports_native_joinr9   r   r   r   r     s    z AsyncResult.supports_native_joinc                 C   s   |   dS )Nrx   r   ra   r9   r   r   r   rx     s    zAsyncResult.childrenc                 C   s:   |r6|d }|t jkr6| | j|}| |  |S |S )Nr   )r   r{   
_set_cacher1   Zmeta_from_decodedr3   )r6   metarz   dr   r   r   _maybe_set_cache  s    

zAsyncResult._maybe_set_cachec                 C   s$   | j d kr| | j| jS | j S r   )r4   r   r1   Zget_task_metar0   r9   r   r   r   r     s    
zAsyncResult._get_task_metac                 K   s   t |  gS r   )iterr   r6   rl   r   r   r   
_iter_meta  s    zAsyncResult._iter_metac                    s.   | d}|r$ fdd|D |d< | _|S )Nrx   c                    s   g | ]}t | jqS r   )r   r/   rq   r9   r   r   
<listcomp>  s    z*AsyncResult._set_cache.<locals>.<listcomp>)ra   r4   )r6   r   rx   r   r9   r   r     s    


zAsyncResult._set_cachec                 C   s   |   d S )zTask return value.

        Note:
            When the task has been executed, this contains the return value.
            If the task raised an exception, this will be the exception
            instance.
        rC   r   r9   r   r   r   rC     s    	zAsyncResult.resultc                 C   s   |   dS )z#Get the traceback of a failed task.r   r   r9   r   r   r   r     s    zAsyncResult.tracebackc                 C   s   |   d S )a  The tasks current state.

        Possible values includes:

            *PENDING*

                The task is waiting for execution.

            *STARTED*

                The task has been started.

            *RETRY*

                The task is to be retried, possibly because of failure.

            *FAILURE*

                The task raised an exception, or has exceeded the retry limit.
                The :attr:`result` attribute then contains the
                exception raised by the task.

            *SUCCESS*

                The task executed successfully.  The :attr:`result` attribute
                then contains the tasks return value.
        r   r   r9   r   r   r   rz     s    zAsyncResult.statec                 C   s   | j S )zCompat. alias to :attr:`id`.r0   r9   r   r   r   task_id  s    zAsyncResult.task_idc                 C   s
   || _ d S r   r   )r6   r0   r   r   r   r     s    c                 C   s   |   dS )Nnamer   r9   r   r   r   r     s    zAsyncResult.namec                 C   s   |   dS )Nr   r   r9   r   r   r   r     s    zAsyncResult.argsc                 C   s   |   dS )Nrl   r   r9   r   r   r   rl   
  s    zAsyncResult.kwargsc                 C   s   |   dS )Nworkerr   r9   r   r   r   r     s    zAsyncResult.workerc                 C   s.   |   d}|r*t|tjs*tj|S |S )zUTC date and time.	date_done)r   ra   r   datetimefromisoformat)r6   r   r   r   r   r     s    zAsyncResult.date_donec                 C   s   |   dS )Nretriesr   r9   r   r   r   r     s    zAsyncResult.retriesc                 C   s   |   dS )Nqueuer   r9   r   r   r   r     s    zAsyncResult.queue)NNNN)NF)NFNFN)NFNFN)F)F)TN)FN)Br&   r'   r(   r)   r/   r   r0   r1   r7   propertyr:   setterr=   r2   rD   rG   rI   rQ   rT   r   r_   r`   ra   rS   r\   rd   ro   rp   rk   rw   r}   r   r   r]   maybe_reraiser   r   r   r   r   r   r   r   r   r   r   r   r   rx   r   r   r   r   rC   infor   rz   r   r   r   r   rl   r   r   r   r   r   r   r   r   r   C   s        



	    
    
    
H
1

	




		
	









r   c                   @   s6  e Zd ZdZdZdZdCddZdd Zdd Zd	d
 Z	dd Z
dd Zdd Zdd Zdd ZdDddZeZdd Zdd Zdd Zdd ZdEd!d"Zd#d$ Zd%d& ZdFd(d)ZdGd*d+ZdHd,d-ZdId.d/ZdJd0d1Zd2d3 Zd4d5 Zd6d7 Zd8d9 Z d:d; Z!e"d<d= Z#e"d>d? Z$e$j%d@d? Z$e"dAdB Z&dS )Kr   zpA collection of results.

    Arguments:
        results (Sequence[AsyncResult]): List of result instances.
    Nc                 K   sL   || _ || _tt| fd| _|p(t|| _| jrH| jt| jdd d S )N)r   Tr+   )	_apprH   r   r   r3   r   _on_fullr=   	_on_ready)r6   rH   r/   Zready_barrierrl   r   r   r   r7   0  s    zResultSet.__init__c                 C   s,   || j kr(| j | | jr(| j| dS )zvAdd :class:`AsyncResult` as a new member of the set.

        Does nothing if the result is already a member.
        N)rH   rE   r   addrB   r   r   r   r   8  s    
zResultSet.addc                 C   s   | j jr|   d S r   )r1   Zis_asyncr3   r9   r   r   r   r   B  s    zResultSet._on_readyc                 C   sH   t |tr| j|}z| j| W n tk
rB   t|Y nX dS )z~Remove result from the set; it must be a member.

        Raises:
            KeyError: if the result isn't a member.
        N)r   r   r/   r   rH   remover-   KeyErrorrB   r   r   r   r   F  s    
zResultSet.removec                 C   s(   z|  | W n tk
r"   Y nX dS )zbRemove result from the set if it is a member.

        Does nothing if it's not a member.
        N)r   r   rB   r   r   r   discardS  s    zResultSet.discardc                    s    j  fdd|D  dS )z Extend from iterable of results.c                 3   s   | ]}| j kr|V  qd S r   rH   rr   rr9   r   r   ru   _  s     
 z#ResultSet.update.<locals>.<genexpr>N)rH   rF   )r6   rH   r   r9   r   update]  s    zResultSet.updatec                 C   s   g | j dd< dS )z!Remove all results from this set.Nr   r9   r   r   r   cleara  s    zResultSet.clearc                 C   s   t dd | jD S )zReturn true if all tasks successful.

        Returns:
            bool: true if all of the tasks finished
                successfully (i.e. didn't raise an exception).
        c                 s   s   | ]}|  V  qd S r   )r}   rr   rC   r   r   r   ru   l  s     z'ResultSet.successful.<locals>.<genexpr>allrH   r9   r   r   r   r}   e  s    zResultSet.successfulc                 C   s   t dd | jD S )zReturn true if any of the tasks failed.

        Returns:
            bool: true if one of the tasks failed.
                (i.e., raised an exception)
        c                 s   s   | ]}|  V  qd S r   )r   r   r   r   r   ru   u  s     z#ResultSet.failed.<locals>.<genexpr>anyrH   r9   r   r   r   r   n  s    zResultSet.failedTc                 C   s   | j D ]}|j||d qd S )N)r?   rZ   )rH   r]   )r6   r?   rZ   rC   r   r   r   r]   w  s    
zResultSet.maybe_throwc                 C   s   t dd | jD S )zReturn true if any of the tasks are incomplete.

        Returns:
            bool: true if one of the tasks are still
                waiting for execution.
        c                 s   s   | ]}|   V  qd S r   rw   r   r   r   r   ru     s     z$ResultSet.waiting.<locals>.<genexpr>r   r9   r   r   r   waiting|  s    zResultSet.waitingc                 C   s   t dd | jD S )zDid all of the tasks complete? (either by success of failure).

        Returns:
            bool: true if all of the tasks have been executed.
        c                 s   s   | ]}|  V  qd S r   r   r   r   r   r   ru     s     z"ResultSet.ready.<locals>.<genexpr>r   r9   r   r   r   rw     s    zResultSet.readyc                 C   s   t dd | jD S )a  Task completion count.

        Note that `complete` means `successful` in this context. In other words, the
        return value of this method is the number of ``successful`` tasks.

        Returns:
            int: the number of complete (i.e. successful) tasks.
        c                 s   s   | ]}t | V  qd S r   )intr}   r   r   r   r   ru     s     z,ResultSet.completed_count.<locals>.<genexpr>)sumrH   r9   r   r   r   completed_count  s    	zResultSet.completed_countc                 C   s   | j D ]}|  qdS )z?Forget about (and possible remove the result of) all the tasks.N)rH   rI   rB   r   r   r   rI     s    
zResultSet.forgetFc                 C   s*   | j jjdd | jD |||||d dS )a[  Send revoke signal to all workers for all tasks in the set.

        Arguments:
            terminate (bool): Also terminate the process currently working
                on the task (if any).
            signal (str): Name of signal to send to process if terminate.
                Default is TERM.
            wait (bool): Wait for replies from worker.
                The ``timeout`` argument specifies the number of seconds
                to wait.  Disabled by default.
            timeout (float): Time in seconds to wait for replies when
                the ``wait`` argument is enabled.
        c                 S   s   g | ]
}|j qS r   r   r   r   r   r   r     s     z$ResultSet.revoke.<locals>.<listcomp>)rK   rO   rL   rM   rN   N)r/   rP   rQ   rH   rR   r   r   r   rQ     s       zResultSet.revokec                 C   s
   t | jS r   )r   rH   r9   r   r   r   __iter__  s    zResultSet.__iter__c                 C   s
   | j | S )z`res[i] -> res.results[i]`.r   )r6   indexr   r   r   __getitem__  s    zResultSet.__getitem__rV   c	           	   
   C   s&   | j r| jn| j||||||||dS )zSee :meth:`join`.

        This is here for API compatibility with :class:`AsyncResult`,
        in addition it uses :meth:`join_native` if available for the
        current result backend.
        )rO   rZ   rW   r?   rY   r[   r^   rX   )r   join_nativejoin)	r6   rO   rZ   rW   r?   rY   r[   r^   rX   r   r   r   ra     s    	    zResultSet.getc	              	   C   s   |r
t   t }	d}
|dk	r&tdg }| jD ]^}d}
|r\|t |	  }
|
dkr\td|j|
|||||d}|r||j| q0|| q0|S )a  Gather the results of all tasks as a list in order.

        Note:
            This can be an expensive operation for result store
            backends that must resort to polling (e.g., database).

            You should consider using :meth:`join_native` if your backend
            supports it.

        Warning:
            Waiting for tasks within a task may lead to deadlocks.
            Please see :ref:`task-synchronous-subtasks`.

        Arguments:
            timeout (float): The number of seconds to wait for results
                before the operation times out.
            propagate (bool): If any of the tasks raises an exception,
                the exception will be re-raised when this flag is set.
            interval (float): Time to wait (in seconds) before retrying to
                retrieve a result from the set.  Note that this does not have
                any effect when using the amqp result store backend,
                as it does not use polling.
            callback (Callable): Optional callback to be called for every
                result received.  Must have signature ``(task_id, value)``
                No results will be returned by this function if a callback
                is specified.  The order of results is also arbitrary when a
                callback is used.  To get access to the result object for
                a particular id you'll have to generate an index first:
                ``index = {r.id: r for r in gres.results.values()}``
                Or you can create new result objects on the fly:
                ``result = app.AsyncResult(task_id)`` (both will
                take advantage of the backend cache anyway).
            no_ack (bool): Automatic message acknowledgment (Note that if this
                is set to :const:`False` then the messages
                *will not be acknowledged*).
            disable_sync_subtasks (bool): Disable tasks to wait for sub tasks
                this is the default configuration. CAUTION do not enable this
                unless you must.

        Raises:
            celery.exceptions.TimeoutError: if ``timeout`` isn't
                :const:`None` and the operation takes longer than ``timeout``
                seconds.
        Nz,Backend does not support on_message callbackg        zjoin operation timed out)rO   rZ   rW   rY   rX   r^   )	r   time	monotonicr   rH   r   ra   r0   rE   )r6   rO   rZ   rW   r?   rY   r[   r^   rX   Z
time_start	remainingrH   rC   r;   r   r   r   r     s6    /
   zResultSet.joinc                 C   s   | j ||S r   r3   r=   r>   r   r   r   r=     s    zResultSet.thenc                 C   s   | j j| |||||dS )a0  Backend optimized version of :meth:`iterate`.

        .. versionadded:: 2.2

        Note that this does not support collecting the results
        for different task types using different backends.

        This is currently only supported by the amqp, Redis and cache
        result backends.
        )rO   rW   rY   r[   rX   )r1   iter_native)r6   rO   rW   rY   r[   rX   r   r   r   r     s       zResultSet.iter_nativec	                 C   s   |r
t   |rdndd t| jD }	|r.dndd tt| D }
| |||||D ]j\}}t|trg }|D ]}||	  qpn|d }|r|d t
jkr||r||| qV||
|	| < qV|
S )a-  Backend optimized version of :meth:`join`.

        .. versionadded:: 2.2

        Note that this does not support collecting the results
        for different task types using different backends.

        This is currently only supported by the amqp, Redis and cache
        result backends.
        Nc                 S   s   i | ]\}}|j |qS r   r   )rr   irC   r   r   r   
<dictcomp>6  s     z)ResultSet.join_native.<locals>.<dictcomp>c                 S   s   g | ]}d qS r   r   )rr   rm   r   r   r   r   9  s     z)ResultSet.join_native.<locals>.<listcomp>rC   r   )r   	enumeraterH   rangelenr   r   rc   rE   ra   r   r`   )r6   rO   rZ   rW   r?   rY   r[   rX   r^   Zorder_indexaccr   r   r;   Zchildren_resultr   r   r   r   &  s*    
 
zResultSet.join_nativec                 K   s.   dd | j jdd | jD fddi|D S )Nc                 s   s   | ]\}}|V  qd S r   r   )rr   rm   r   r   r   r   ru   K  s     z'ResultSet._iter_meta.<locals>.<genexpr>c                 S   s   h | ]
}|j qS r   r   r   r   r   r   	<setcomp>L  s     z'ResultSet._iter_meta.<locals>.<setcomp>Zmax_iterationsr	   )r1   Zget_manyrH   r   r   r   r   r   J  s    zResultSet._iter_metac                 C   s   dd | j D S )Nc                 s   s,   | ]$}|j |jr|jtjkr|V  qd S r   )r1   Z	is_cachedr0   rz   r   r`   )rr   resr   r   r   ru   P  s    z0ResultSet._failed_join_report.<locals>.<genexpr>r   r9   r   r   r   _failed_join_reportO  s    zResultSet._failed_join_reportc                 C   s
   t | jS r   )r   rH   r9   r   r   r   __len__T  s    zResultSet.__len__c                 C   s   t |tr|j| jkS tS r   )r   r   rH   r   r   r   r   r   r   W  s    
zResultSet.__eq__c                 C   s*   dt | j dddd | jD  dS )Nr   z: [, c                 s   s   | ]}|j V  qd S r   r   r   r   r   r   ru   ]  s     z%ResultSet.__repr__.<locals>.<genexpr>]>)r.   r&   r   rH   r9   r   r   r   r   \  s    zResultSet.__repr__c                 C   s(   z| j d jW S  tk
r"   Y nX d S Nr   )rH   r   
IndexErrorr9   r   r   r   r   _  s    zResultSet.supports_native_joinc                 C   s,   | j d kr&| jr| jd jnt | _ | j S r   )r   rH   r/   r
   Z_get_current_objectr9   r   r   r   r/   f  s
    
zResultSet.appc                 C   s
   || _ d S r   )r   )r6   r/   r   r   r   r/   m  s    c                 C   s   | j r| j jS | jd jS r   )r/   r1   rH   r9   r   r   r   r1   q  s    zResultSet.backend)NN)NT)NFNFN)NTrV   NTNTN)NTrV   NTNTN)NF)NrV   TNN)NTrV   NTNNT)'r&   r'   r(   r)   r   rH   r7   r   r   r   r   r   r   r}   r   r]   r   r   rw   r   rI   rQ   r   r   ra   r   r=   r   r   r   r   r   r   r   r   r   r/   r   r1   r   r   r   r   r   #  s   


		
	    
         
         
J
    
          
$


r   c                       s   e Zd ZdZdZdZd fdd	Z fddZd ddZd!d	d
Z	dd Z
dd Zdd ZeZdd Zdd Zdd Zdd Zdd Zedd Zed"ddZ  ZS )#r   az  Like :class:`ResultSet`, but with an associated id.

    This type is returned by :class:`~celery.group`.

    It enables inspection of the tasks state and return values as
    a single entity.

    Arguments:
        id (str): The id of the group.
        results (Sequence[AsyncResult]): List of result instances.
        parent (ResultBase): Parent result of this group.
    Nc                    s    || _ || _t j|f| d S r   )r0   r*   superr7   )r6   r0   rH   r*   rl   r   r   r   r7     s    zGroupResult.__init__c                    s   | j |  t   d S r   )r1   rA   r   r   r9   r   r   r   r     s    zGroupResult._on_readyc                 C   s   |p
| j j| j| S )zSave group-result for later retrieval using :meth:`restore`.

        Example:
            >>> def save_and_restore(result):
            ...     result.save()
            ...     result = GroupResult.restore(result.id)
        )r/   r1   Z
save_groupr0   r6   r1   r   r   r   save  s    zGroupResult.savec                 C   s   |p
| j j| j dS )z.Remove this result if it was previously saved.N)r/   r1   Zdelete_groupr0   r   r   r   r   delete  s    zGroupResult.deletec                 C   s   | j |  fS r   r   r9   r   r   r   r     s    zGroupResult.__reduce__c                 C   s   | j | jfS r   )r0   rH   r9   r   r   r   r     s    zGroupResult.__reduce_args__c                 C   s   t | jp| jS r   )boolr0   rH   r9   r   r   r   __bool__  s    zGroupResult.__bool__c                 C   sF   t |tr.|j| jko,|j| jko,|j| jkS t |trB|| jkS tS r   )r   r   r0   rH   r*   r   r   r   r   r   r   r     s    




zGroupResult.__eq__c              	   C   s2   dt | j d| j dddd | jD  dS )Nr   r   z [r   c                 s   s   | ]}|j V  qd S r   r   r   r   r   r   ru     s     z'GroupResult.__repr__.<locals>.<genexpr>r   )r.   r&   r0   r   rH   r9   r   r   r   r     s    zGroupResult.__repr__c                 C   s
   t | jS r   r   r9   r   r   r   r     s    zGroupResult.__str__c                 C   s
   t | jS r   r   r9   r   r   r   r     s    zGroupResult.__hash__c                 C   s&   | j | jo| j fdd | jD fS )Nc                 S   s   g | ]}|  qS r   )rD   r   r   r   r   r     s     z(GroupResult.as_tuple.<locals>.<listcomp>)r0   r*   rD   rH   r9   r   r   r   rD     s    zGroupResult.as_tuplec                 C   s   | j S r   r   r9   r   r   r   rx     s    zGroupResult.childrenc                 C   s.   |pt | jts| jnt}|p"|j}||S )z&Restore previously saved group result.)r   r/   r   r
   r1   Zrestore_group)clsr0   r1   r/   r   r   r   restore  s
    
zGroupResult.restore)NNN)N)N)NN)r&   r'   r(   r)   r0   rH   r7   r   r   r   r   r   r   __nonzero__r   r   r   r   rD   r   rx   classmethodr   __classcell__r   r   r   r   r   v  s(   



r   c                   @   s   e Zd ZdZd%ddZd&ddZdd	 Zd
d Zdd Zdd Z	dd Z
d'ddZeZdd Zdd Zdd Zedd Zedd Zedd  ZeZed!d" Zed#d$ ZdS )(r   z.Result that we know has already been executed.Nc                 C   s.   || _ || _|| _|| _t | _| |  d S r   )r0   _result_state
_tracebackr   r3   )r6   r0   Z	ret_valuerz   r   r   r   r   r7     s    zEagerResult.__init__Fc                 C   s   | j ||S r   r   r>   r   r   r   r=     s    zEagerResult.thenc                 C   s   | j S r   )r4   r9   r   r   r   r     s    zEagerResult._get_task_metac                 C   s   | j |  fS r   r   r9   r   r   r   r     s    zEagerResult.__reduce__c                 C   s   | j | j| j| jfS r   r0   r   r   r   r9   r   r   r   r     s    zEagerResult.__reduce_args__c                 C   s   |   \}}|| S r   )r   )r6   r   r   r   r   r   r     s    zEagerResult.__copy__c                 C   s   dS r$   r   r9   r   r   r   rw     s    zEagerResult.readyTc                 K   sN   |r
t   |  r| jS | jtjkrJ|rDt| jtr:| jnt| j| jS d S r   )r   r}   rC   rz   r   r`   r   	Exception)r6   rO   rZ   r^   rl   r   r   r   ra     s     
zEagerResult.getc                 C   s   d S r   r   r9   r   r   r   rI     s    zEagerResult.forgetc                 O   s   t j| _d S r   )r   ZREVOKEDr   r   r   r   r   rQ     s    zEagerResult.revokec                 C   s   d| j  dS )Nz<EagerResult: r   r   r9   r   r   r   r     s    zEagerResult.__repr__c                 C   s   | j | j| j| jdS )N)r   rC   r   r   r   r9   r   r   r   r4     s
    zEagerResult._cachec                 C   s   | j S )zThe tasks return value.)r   r9   r   r   r   rC     s    zEagerResult.resultc                 C   s   | j S )zThe tasks state.)r   r9   r   r   r   rz     s    zEagerResult.statec                 C   s   | j S )z!The traceback if the task failed.)r   r9   r   r   r   r   "  s    zEagerResult.tracebackc                 C   s   dS r    r   r9   r   r   r   r   '  s    z EagerResult.supports_native_join)N)NF)NTT)r&   r'   r(   r)   r7   r=   r   r   r   r   rw   ra   rS   rI   rQ   r   r   r4   rC   rz   r   r   r   r   r   r   r   r     s4   


  




r   c                    s   t    j}t| ts~| \}}t|ttfr2|n|df\}}|rLt| }|dk	rr j| fdd|D |dS |||dS | S )zDeserialize result from tuple.Nc                    s   g | ]}t | qS r   )r   rq   r/   r   r   r   :  s     z%result_from_tuple.<locals>.<listcomp>rg   )r   r   r   r   rc   tupler   r   )r   r/   ZResultr   Znodesr0   r*   r   r   r   r   ,  s    

 r   )N),r)   r   r   collectionsr   
contextlibr   weakrefr   Zkombu.utils.objectsr   Zviner   r   r    r
   r   r   r   r   r/   r   
exceptionsr   r   r   Zutils.graphr   r   r   ImportError__all__r   r   r#   r%   r   registerr   r   r   r   r   r   r   r   r   <module>   sL   

	
	   b  T_U