U
    .e.                     @  s  U d Z ddlmZ ddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZmZ dd	lmZ dd
lmZ ddlmZmZmZmZ ddlmZ ddlmZ dZeeZ da!de"d< dZ#G dd de$Z%dd Z&edd Z'ddddZ(dddddZ)G dd  d Z*dS )!zEvent loop implementation.    )annotationsN)contextmanager)Empty)sleep)GeneratorType)Thenablepromise)
get_logger)fileno)ERRREADWRITEpoll)cached_property   )Timer)Hubget_event_loopset_event_loopz
Hub | None_current_loopz<Received unknown event %r for fd %r, please contact support!c                   @  s   e Zd ZdZdS )StopzStops the event loop.N)__name__
__module____qualname____doc__ r   r   :/tmp/pip-unpacked-wheel-48hrr5dg/kombu/asynchronous/hub.pyr      s   r   c                   C  s
   t  d S N)r   r   r   r   r   _raise_stop_error#   s    r   c                  o  s
   d V  d S r   r   )argskwargsr   r   r   _dummy_context'   s    r!   )returnc                   C  s   t S )zGet current event loop object.r   r   r   r   r   r   ,   s    r   )loopr"   c                 C  s   | a | S )z"Set the current event loop object.r#   )r$   r   r   r   r   1   s    r   c                
   @  sN  e Zd ZdZeZeZeZdZdFddZe	dd Z
e
jdd Z
dd	 Zd
d Zdd Zdd Zdd ZdGddZdd ZdHddZdd Zdd Zd d! Zd"d# Zd$d% Zd&d' Zd(d) Zd*d+ Zd,d- Zd.d/ Zd0d1 Zd2d3 Zd4d5 Z d6d7 Z!d8d9 Z"d:d; Z#e$e%e&e'e(e)e*eeef
d<d=Z+d>d? Z,d@dA Z-e.dBdC Z/e	dDdE Z0dS )Ir   z~Event loop object.

    Arguments:
    ---------
        timer (kombu.asynchronous.Timer): Specify custom timer instance.
    Nc                 C  sn   |d k	r|nt  | _i | _i | _t | _t | _t | _t	 | _
d| _d | _t | _d | _d| _|   d S )NFr   )r   timerreaderswritersseton_tickon_close_ready	threadingLock_ready_lock_running_loopconsolidateconsolidate_callbackpropagate_errors_create_poller)selfr%   r   r   r   __init__M   s    
	zHub.__init__c                 C  s   | j s|   | j S r   )_pollerr4   r5   r   r   r   pollerh   s    z
Hub.pollerc                 C  s
   || _ d S r   )r7   )r5   valuer   r   r   r9   n   s    c                 C  s   |    |   d S r   )closer4   r8   r   r   r   resetr   s    z	Hub.resetc                 C  s    t  | _| jj| _| jj| _d S r   )r   r7   register_register_fd
unregister_unregister_fdr8   r   r   r   r4   v   s    
zHub._create_pollerc                 C  s*   | j d k	r&| j   d | _ d | _d | _d S r   )r7   r;   r>   r@   r8   r   r   r   _close_poller{   s
    

zHub._close_pollerc                 C  s   |  t d S r   )	call_soonr   r8   r   r   r   stop   s    zHub.stopc                 C  s   d t| t| jt| jS )Nz<Hub@{:#x}: R:{} W:{}>)formatidlenr&   r'   r8   r   r   r   __repr__   s
      zHub.__repr__r   
   r   c           
      C  s   | j }d }|r|jrt|D ]}t| j\}}|d kr: qz
|  W q |k
rZ    Y q ttfk
rr    Y q tk
r }	 z"|	jtj	kr t
jd|	dd W 5 d }	~	X Y q tk
r }	 zt
jd|	dd W 5 d }	~	X Y qX qt|p||S )NzError in timer: %rr   exc_info)r%   _queuerangenext	schedulerMemoryErrorAssertionErrorOSErrorerrnoZENOMEMloggererror	Exceptionmin)
r5   Z	min_delayZ	max_delayZ
max_timers	propagater%   delayientryexcr   r   r   fire_timers   s(    

 $zHub.fire_timersc              	   C  s    z| | W 5 |  | X d S r   )_discard_unregisterr5   fdr   r   r   _remove_from_loop   s    zHub._remove_from_loopFc                 C  sz   t |}z| j|| W n  tk
r:   | |  Y n<X |t@ rJ| jn| j}|rj| j	| d ||< n||f||< d S r   )
r
   r9   r=   
ValueErrorra   r   r&   r'   r1   add)r5   r`   callbackflagsr   r1   destr   r   r   rc      s    

zHub.addc                 C  s   t |}| | d S r   )r
   ra   r_   r   r   r   remove   s    z
Hub.removec                 C  s@   d| _ z,z|   W q tk
r,   Y q0Y qX qW 5 d| _ X d S )NTF)r/   run_oncer   r8   r   r   r   run_forever   s    zHub.run_foreverc                 C  s.   zt | j W n tk
r(   d | _Y nX d S r   )rM   r$   StopIterationr0   r8   r   r   r   rh      s    zHub.run_oncec              	   G  s6   t |tst||}| j | j| W 5 Q R X |S r   )
isinstancer   r   r.   r+   rc   )r5   rd   r   r   r   r   rB      s
    

zHub.call_soonc                 G  s   | j |||S r   )r%   Z
call_afterr5   rX   rd   r   r   r   r   
call_later   s    zHub.call_laterc                 G  s   | j |||S r   )r%   call_at)r5   whenrd   r   r   r   r   rn      s    zHub.call_atc                 G  s   | j |||S r   )r%   call_repeatedlyrl   r   r   r   rp      s    zHub.call_repeatedlyc                 G  s   |  ||ttB |S r   )rc   r   r   r5   Zfdsrd   r   r   r   r   
add_reader   s    zHub.add_readerc                 G  s   |  ||t|S r   )rc   r   rq   r   r   r   
add_writer   s    zHub.add_writerc                 C  sH   || j k}| j |}z| | W 5 |rB|\}}| ||t| X d S r   )r'   getrc   r   ra   )r5   r`   writableZon_writecbr   r   r   r   remove_reader   s    
zHub.remove_readerc                 C  sL   || j k}| j |}z| | W 5 |rF|\}}| ||ttB | X d S r   )r&   rt   rc   r   r   ra   )r5   r`   readableZon_readrv   r   r   r   r   remove_writer   s    
zHub.remove_writerc              
   C  s0   z| j | W n tttfk
r*   Y nX d S r   )r9   r?   AttributeErrorKeyErrorrQ   r_   r   r   r   r^      s    zHub._unregisterc              
   C  s0   | j   | j}t | _|W  5 Q R  S Q R X d S r   )r.   r+   r(   )r5   readyr   r   r   
_pop_ready   s    zHub._pop_readyc                   s~    fdd j D   j    fdd jD   j   j      jD ]}|  qT  }|D ]
}|  qnd S )Nc                   s   g | ]}  |qS r   r^   .0r`   r8   r   r   
<listcomp>  s     zHub.close.<locals>.<listcomp>c                   s   g | ]}  |qS r   r~   r   r8   r   r   r     s     )r&   clearr'   r1   rA   r*   r}   )r5   r   rd   Ztodositemr   r8   r   r;     s    




z	Hub.closec                 C  s4   t |}| j|d  | j|d  | j| d S r   )r
   r&   popr'   r1   discardr_   r   r   r   r]     s    zHub._discardc                 C  s   t jd||dd d S )Nz Callback %r raised exception: %rr   rI   )rS   rT   )r5   rd   r[   r   r   r   on_callback_error  s       zHub.on_callback_errorc           !      c  s  | j | j }}| jj}| j}| j}| jj}| j}| j	}| j
}| j}|  }|D ]}|rN|  qN|rl||dnd}|D ]
}|  qt|s|rg }z||}W n tk
r   Y d S X |pdD ]\}}d}||kr||d kr|| qd  }}||@ r4z|| \}}W n$ |k
r0   | | Y qY nX nd||	@ rvz|| \}}W n$ |k
rr   | | Y qY nX n"||
@ rd}ntt|| d}|rz||p||\}}W n tk
r   Y nX |d kr| | qt||rtz|| W nn tk
r> }  z| jtjkr& || W 5 d } ~ X Y n4 |k
rR   Y n  tk
rp   ||  Y nX qz||  W q |k
r   Y qX q|r|| n|||d d V  qBd S )N)rW   r   r   FTg?)r&   r'   r9   r   r\   rg   r%   rK   r1   r2   r)   r3   r}   rb   rt   appendrw   ry   rS   infoW_UNKNOWN_EVENT	TypeErrorrk   rQ   rR   EBADFrU   )!r5   	generatorr   rV   rM   r   rj   r{   r   r   r   r&   r'   r   r\   Z
hub_removeZ	scheduledr1   r2   r)   rW   todor   Zpoll_timeoutZtick_callbackZto_consolidateeventsr`   eventZgeneral_errorrv   Zcbargsr[   r   r   r   create_loop  s    











zHub.create_loopc                 C  s   ddl m} || S )Nr   )repr_active)debugr   )r5   r   r   r   r   r     s    zHub.repr_activec                 C  s   ddl m} || |pg S )Nr   )repr_events)r   r   )r5   r   r   r   r   r   r     s    zHub.repr_eventsc                 C  s
   t | jS r   )iterr%   r8   r   r   r   rN     s    zHub.schedulerc                 C  s   | j d kr|  | _ | j S r   )r0   r   r8   r   r   r   r$     s    

zHub.loop)N)r   rH   rH   r   )r   F)1r   r   r   r   r   r   r   r*   r6   propertyr9   setterr<   r4   rA   rC   rG   r\   ra   rc   rg   ri   rh   rB   rm   rn   rp   rr   rs   rw   ry   r^   r}   r;   r]   r   r   r   rV   rM   r   rj   r{   r   r   r   r   rN   r$   r   r   r   r   r   8   sj   


  



       
`
r   )+r   
__future__r   rR   r,   
contextlibr   queuer   timer   typesr   r   Zviner   r   Z	kombu.logr	   Zkombu.utils.compatr
   Zkombu.utils.eventior   r   r   r   Zkombu.utils.objectsr   r%   r   __all__r   rS   r   __annotations__r   BaseExceptionr   r   r!   r   r   r   r   r   r   r   <module>   s0   
