U
    .e8                     @   sd  d Z ddlZddlmZ ddlmZmZ ddlmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ dZdZG dd deZG dd dZd4ddZd5ddZdd ZeddfddZdd Zd6ddZdd Zdd  Z d!d" Z!d#d$ Z"G d%d& d&Z#d7d)d*Z$d+d, Z%d-d. Z&d/d0 Z'd1d2 Z(eeed3Z)ee%ed3Z*ee&ed3Z+ee'ed3Z,dS )8z,Message migration tools (Broker <-> Broker).    N)partial)cycleislice)Queue	eventloop)maybe_declare)ensure_bytes)app_or_default)worker_direct)str_to_list)StopFilteringState	republishmigrate_taskmigrate_tasksmove
task_id_eq
task_id_instart_filtermove_task_by_idmove_by_idmapmove_by_taskmapmove_directmove_direct_by_idzGMoving task {state.filtered}/{state.strtotal}: {body[task]}[{body[id]}]c                   @   s   e Zd ZdZdS )r   z*Semi-predicate used to signal filter stop.N)__name__
__module____qualname____doc__ r   r   :/tmp/pip-unpacked-wheel-f4liivr4/celery/contrib/migrate.pyr      s   r   c                   @   s0   e Zd ZdZdZdZdZedd Zdd Z	dS )r   zMigration progress state.r   c                 C   s   | j s
dS t| j S )N?)	total_apxstrselfr   r   r   strtotal&   s    zState.strtotalc                 C   s$   | j rd| j  S | j d| j S )N^/)filteredcountr%   r#   r   r   r   __repr__,   s    zState.__repr__N)
r   r   r   r   r)   r(   r!   propertyr%   r*   r   r   r   r   r      s   
r   c              
   C   s   |sddddg}t |j}|j|j|j  }}}|dkr@|d n|}|dkrT|d n|}|j|j }	}
|dd}|d	d}|dk	rt|nd}|D ]}||d q| j	t |f|||||	|
|d
| dS )zRepublish message.Zapplication_headerscontent_typecontent_encodingheadersNexchangerouting_keycompression
expiration)r/   r0   r1   r.   r,   r-   r2   )
r   bodydelivery_infor.   
propertiesr,   r-   popfloatpublish)producermessager/   r0   Zremove_propsr3   infor.   propsctypeencr1   r2   keyr   r   r   r   2   s8     
 
   r   c                 C   s>   |j }|dkri n|}t| |||d ||d d dS )zMigrate single task message.Nr/   r0   r/   r0   )r4   r   get)r9   Zbody_r:   queuesr;   r   r   r   r   P   s    r   c                    s    fdd}|S )Nc                    s   r| d krd S  | |S NZtaskr   r3   r:   callbacktasksr   r   r(   [   s    z!filter_callback.<locals>.filteredr   )rF   rG   r(   r   rE   r   filter_callbackY   s    rH   c                    sV   t |}t|jj|dd t| d} fdd}t|| |f|d|S )z)Migrate tasks from one broker to another.F)Zauto_declarerB   c                    sh   |  j }| j| j|_|j| jkr:| j|j|_|jj| jkr\| j| j|j_|  d S N)channelrA   namer0   r/   Zdeclare)queueZ	new_queuer9   rB   r   r   on_declare_queuek   s    
z'migrate_tasks.<locals>.on_declare_queue)rB   rO   )r	   prepare_queuesamqpProducerr   r   )sourcedestZmigrateapprB   kwargsrO   r   rN   r   r   c   s    
r   c                 C   s   t |tr| jj| S |S rJ   )
isinstancer"   rQ   rB   )rU   qr   r   r   _maybe_queuey   s    
rY   c	              
      s   t    fdd|pg D p d}
 j|ddT jt 	f	dd}t |fd|
i|	W  5 Q R  S Q R X dS )	aG	  Find tasks by filtering them and move the tasks to a new queue.

    Arguments:
        predicate (Callable): Filter function used to decide the messages
            to move.  Must accept the standard signature of ``(body, message)``
            used by Kombu consumer callbacks.  If the predicate wants the
            message to be moved it must return either:

                1) a tuple of ``(exchange, routing_key)``, or

                2) a :class:`~kombu.entity.Queue` instance, or

                3) any other true value means the specified
                    ``exchange`` and ``routing_key`` arguments will be used.
        connection (kombu.Connection): Custom connection to use.
        source: List[Union[str, kombu.Queue]]: Optional list of source
            queues to use instead of the default (queues
            in :setting:`task_queues`).  This list can also contain
            :class:`~kombu.entity.Queue` instances.
        exchange (str, kombu.Exchange): Default destination exchange.
        routing_key (str): Default destination routing key.
        limit (int): Limit number of messages to filter.
        callback (Callable): Callback called after message moved,
            with signature ``(state, body, message)``.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.

    Also supports the same keyword arguments as :func:`start_filter`.

    To demonstrate, the :func:`move_task_by_id` operation can be implemented
    like this:

    .. code-block:: python

        def is_wanted_task(body, message):
            if body['id'] == wanted_id:
                return Queue('foo', exchange=Exchange('foo'),
                             routing_key='foo')

        move(is_wanted_task)

    or with a transform:

    .. code-block:: python

        def transform(value):
            if isinstance(value, str):
                return Queue(value, Exchange(value), value)
            return value

        move(is_wanted_task, transform=transform)

    Note:
        The predicate may also return a tuple of ``(exchange, routing_key)``
        to specify the destination to where the task should be moved,
        or a :class:`~kombu.entity.Queue` instance.
        Any other true value means that the task will be moved to the
        default exchange/routing_key.
    c                    s   g | ]}t  |qS r   )rY   ).0rM   )rU   r   r   
<listcomp>   s     zmove.<locals>.<listcomp>NF)poolc                    s   | |}|rr|}t |trBt|j |jj|j }}nt|\}}t|||d |	   j
d7  _
 r | | rj
krt d S )Nr@      )rW   r   r   Zdefault_channelr/   rL   r0   expand_destr   ackr(   r   )r3   r:   retexrk)	rF   connr/   limit	predicater9   r0   state	transformr   r   on_task   s$    

 zmove.<locals>.on_taskconsume_from)r	   Zconnection_or_acquirerQ   rR   r   r   )re   
connectionr/   r0   rS   rU   rF   rd   rg   rV   rB   rh   r   )
rU   rF   rc   r/   rd   re   r9   r0   rf   rg   r   r      s    >r   c              	   C   s8   z| \}}W n" t tfk
r.   || }}Y nX ||fS rJ   )	TypeError
ValueError)r`   r/   r0   ra   rb   r   r   r   r^      s
    r^   c                 C   s   |d | kS )z'Return true if task id equals task_id'.idr   )task_idr3   r:   r   r   r   r      s    r   c                 C   s   |d | kS )z-Return true if task id is member of set ids'.rm   r   )idsr3   r:   r   r   r   r      s    r   c                 C   s@   t | tr| d} t | tr0tdd | D } | d kr<i } | S )N,c                 s   s(   | ] }t tt|d ddV  qdS ):N   )tupler   r   splitrZ   rX   r   r   r   	<genexpr>   s   z!prepare_queues.<locals>.<genexpr>)rW   r"   rt   listdictrI   r   r   r   rP      s    


rP   c                   @   sF   e Zd ZdddZdd Zdd	 Zd
d Zdd Zdd Zdd Z	dS )FiltererN      ?Fc                    s   | _ | _| _| _| _| _tt|p0g  _t	| _
|	 _|
 _| _ fdd|pht j
D  _|pxt  _| _d S )Nc                    s   g | ]}t  j|qS r   )rY   rU   ru   r#   r   r   r[   	  s   z%Filterer.__init__.<locals>.<listcomp>)rU   rc   filterrd   timeoutack_messagessetr   rG   rP   rB   rF   foreverrO   rw   ri   r   rf   accept)r$   rU   rc   r{   rd   r|   r}   rG   rB   rF   r   rO   ri   rf   r   rV   r   r#   r   __init__   s     

zFilterer.__init__c              	   C   sh   |  |  N zt| j| j| jdD ]}q&W n( tjk
rD   Y n tk
rV   Y nX W 5 Q R X | jS )N)r|   Zignore_timeouts)	prepare_consumercreate_consumerr   rc   r|   r   socketr   rf   )r$   _r   r   r   start  s    
zFilterer.startc                 C   s.   | j  jd7  _| jr*| j j| jkr*t d S )Nr]   )rf   r)   rd   r   r$   r3   r:   r   r   r   update_state  s    zFilterer.update_statec                 C   s   |   d S rJ   )r_   r   r   r   r   ack_message#  s    zFilterer.ack_messagec                 C   s   | j jj| j| j| jdS )N)rB   r   )rU   rQ   ZTaskConsumerrc   ri   r   r#   r   r   r   r   &  s
    zFilterer.create_consumerc                 C   s   | j }| j}| j}| jr<t|| j}t|| j}t|| j}|| || | jrb|| j | jd k	rt| j| j	}| jrt|| j}|| | 
| |S rJ   )r{   r   r   rG   rH   Zregister_callbackr}   rF   r   rf   declare_queues)r$   consumerr{   r   r   rF   r   r   r   r   -  s$    




zFilterer.prepare_consumerc              	   C   s   |j D ]v}| j r|j| j krq| jd k	r2| | z0||jjdd\}}}|r`| j j|7  _W q | jjk
rz   Y qX qd S )NT)Zpassive)	rB   rL   rO   rK   Zqueue_declarerf   r!   rc   Zchannel_errors)r$   r   rM   r   Zmcountr   r   r   r   A  s    


zFilterer.declare_queues)Nrz   FNNNFNNNN)
r   r   r   r   r   r   r   r   r   r   r   r   r   r   ry      s$                     
ry   rz   Fc                 K   s0   t | ||f|||||||	|
|||d| S )zFilter tasks.)rd   r|   r}   rG   rB   rF   r   rO   ri   rf   r   )ry   r   )rU   rc   r{   rd   r|   r}   rG   rB   rF   r   rO   ri   rf   r   rV   r   r   r   r   Q  s&      r   c                 K   s   t | |if|S )a  Find a task by id and move it to another queue.

    Arguments:
        task_id (str): Id of task to find and move.
        dest: (str, kombu.Queue): Destination queue.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.
        **kwargs (Any): Also supports the same keyword
            arguments as :func:`move`.
    )r   )rn   rT   rV   r   r   r   r   f  s    r   c                    s$    fdd}t |fdt i|S )a  Move tasks by matching from a ``task_id: queue`` mapping.

    Where ``queue`` is a queue to move the task to.

    Example:
        >>> move_by_idmap({
        ...     '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
        ...     'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
        ...     '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
        ...   queues=['hipri'])
    c                    s     |jd S )NZcorrelation_id)rA   r5   rD   mapr   r   task_id_in_map  s    z%move_by_idmap.<locals>.task_id_in_maprd   )r   len)r   rV   r   r   r   r   r   t  s    r   c                    s    fdd}t |f|S )a  Move tasks by matching from a ``task_name: queue`` mapping.

    ``queue`` is the queue to move the task to.

    Example:
        >>> move_by_taskmap({
        ...     'tasks.add': Queue('name'),
        ...     'tasks.mul': Queue('name'),
        ... })
    c                    s     | d S rC   )rA   rD   r   r   r   task_name_in_map  s    z)move_by_taskmap.<locals>.task_name_in_map)r   )r   rV   r   r   r   r   r     s    r   c                 K   s   t tjf | |d| d S )N)rf   r3   )printMOVING_PROGRESS_FMTformat)rf   r3   r:   rV   r   r   r   filter_status  s    r   )rg   )NNN)N)NNNNNNNN)Nrz   FNNNFNNNN)-r   r   	functoolsr   	itertoolsr   r   Zkombur   r   Zkombu.commonr   Zkombu.utils.encodingr   Z
celery.appr	   Zcelery.utils.nodenamesr
   Zcelery.utils.textr   __all__r   	Exceptionr   r   r   r   rH   r   rY   r   r^   r   r   rP   ry   r   r   r   r   r   r   r   Zmove_direct_by_idmapZmove_direct_by_taskmapr   r   r   r   <module>   sj     

	

          
[Z                
