U
    .e(                     @  s|  d Z ddlmZ ddlZddlZddlZddlZddlmZ ddl	m
Z
 ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZmZ ddlmZ dZdeeeZej dkrddl!Z!ddl"Z"ddl#Z#e"j$Z%dZ&e"j'Z(e!) Z*dd Z+dd Z,n>ej dkr<ddl-Z-ddl-m%Z%m&Z& dd Z+dd Z,ne.deddddgZ/G dd dej0Z0G dd  d ej1Z1dS )!a	  File-system Transport module for kombu.

Transport using the file-system as the message store. Messages written to the
queue are stored in `data_folder_in` directory and
messages read from the queue are read from `data_folder_out` directory. Both
directories must be created manually. Simple example:

* Producer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_in', 'data_folder_out': 'data_out'
        }
    )
    conn.connect()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            producer = kombu.Producer(channel)
            producer.publish(
                        {'hello': 'world'},
                        retry=True,
                        exchange=test_queue.exchange,
                        routing_key=test_queue.routing_key,
                        declare=[test_queue],
                        serializer='pickle'
            )

* Consumer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_out', 'data_folder_out': 'data_in'
        }
    )
    conn.connect()

    def callback(body, message):
        print(body, message)
        message.ack()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            consumer = kombu.Consumer(
                conn, [test_queue], accept=['pickle']
            )
            consumer.register_callback(callback)
            with consumer:
                conn.drain_events(timeout=1)

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: No
* Supports TTL: No

Connection String
=================
Connection string is in the following format:

.. code-block::

    filesystem://

Transport Options
=================
* ``data_folder_in`` - directory where are messages stored when written
  to queue.
* ``data_folder_out`` - directory from which are messages read when read from
  queue.
* ``store_processed`` - if set to True, all processed messages are backed up to
  ``processed_folder``.
* ``processed_folder`` - directory where are backed up processed files.
* ``control_folder`` - directory where are exchange-queue table stored.
    )annotationsN)
namedtuple)Path)Empty)	monotonic)ChannelError)virtual)bytes_to_strstr_to_bytes)dumpsloads)cached_property)   r   r   .ntc                 C  s$   t |  }t ||ddt dS )Create file lock.r         N)	win32file_get_osfhandlefilenoZ
LockFileEx__overlapped)fileflagshfile r   >/tmp/pip-unpacked-wheel-48hrr5dg/kombu/transport/filesystem.pylock}   s    r   c                 C  s"   t |  }t |ddt dS )Remove file lock.r   r   N)r   r   r   ZUnlockFileExr   )r   r   r   r   r   unlock   s    r   posix)LOCK_EXLOCK_SHc                 C  s   t |  | dS )r   N)fcntlflockr   )r   r   r   r   r   r      s    c                 C  s   t |  t j dS )r   N)r"   r#   r   ZLOCK_UN)r   r   r   r   r      s    z9Filesystem plugin only defined for NT and POSIX platformsexchange_queue_trouting_keypatternqueuec                   @  s   e Zd ZdZdZdd Zdd Zdd Zd	d
 Zdd Z	dd Z
dd Zedd Zedd Zedd Zedd Zedd Zedd ZdS )ChannelzFilesystem Channel.Tc                 C  s   | j | d }zP|d}z.t|t tt| }dd |D W W S t| |  X W n8 t	k
rx   g  Y S  t
k
r   td| Y nX d S )N	.exchangerc                 S  s   g | ]}t | qS r   r$   .0qr   r   r   
<listcomp>   s     z%Channel.get_table.<locals>.<listcomp>zCannot open )control_folderopenr   closer   r!   r   r	   readFileNotFoundErrorOSErrorr   )selfexchanger   f_objexchange_tabler   r   r   	get_table   s    

zChannel.get_tablec           
      C  s   | j | d }| j jdd t|p&d|p,d|p2d}z| r|jddd}t|t t	t
| }dd	 |D }	||	kr|	d| |d |tt|	 n0|jd
dd}t|t |g}	|tt|	 W 5 t| |  X d S )Nr)   T)exist_ok zrb+r   	bufferingc                 S  s   g | ]}t | qS r   r+   r,   r   r   r   r/      s     z'Channel._queue_bind.<locals>.<listcomp>wb)r0   mkdirr$   r   r2   existsr1   r   r    r   r	   r3   insertseekwriter
   r   )
r6   r7   r%   r&   r'   r   Z	queue_valr8   r9   Zqueuesr   r   r   _queue_bind   s*    


zChannel._queue_bindc                 K  s&   |  |D ]}| j|j|f| q
d S N)r:   _putr'   )r6   r7   payloadr%   kwargsr.   r   r   r   _put_fanout   s    zChannel._put_fanoutc                 K  s   d ttt d t |}tj| j	|}zXz.t|ddd}t|t |tt| W n$ tk
r   td|dY nX W 5 t
| |  X dS )	zPut `message` onto `queue`.z{}_{}.{}.msgi  r?   r   r=   zCannot add file z to directoryN)formatintroundr   uuidZuuid4ospathjoindata_folder_outr   r2   r1   r   r    rD   r
   r   r5   r   )r6   r'   rH   rI   filenamefr   r   r   rG      s     

zChannel._putc                 C  s  d| d }t | j}t|}t|dkr|d}||dk rHq | jrV| j}nt	
 }ztt j| j|| W n tk
r   Y q Y nX t j||}z.t|d}| }|  | jst | W n$ tk
r   td|dY nX tt|S t dS )zGet next message from `queue`.r   .msgr   rbzCannot read file z from queue.N)rO   listdirdata_folder_insortedlenpopfindstore_processedprocessed_foldertempfile
gettempdirshutilmoverP   rQ   r5   r1   r3   r2   remover   r   r	   r   )r6   r'   
queue_findfolderrS   r^   rT   rH   r   r   r   _get   s:    




zChannel._getc                 C  s   d}d| d }t | j}t|dkr| }z8||dk rDW qt j| j|}t | |d7 }W q t	k
r|   Y qX q|S )z!Remove all messages from `queue`.r   r   rU   r   )
rO   rW   rX   rZ   r[   r\   rP   rQ   rc   r5   r6   r'   countrd   re   rS   r   r   r   _purge	  s    
zChannel._purgec                 C  sN   d}d| d}t | j}t|dkrJ| }||dk r@q|d7 }q|S )z<Return the number of messages in `queue` as an :class:`int`.r   r   rU   r   )rO   rW   rX   rZ   r[   r\   rg   r   r   r   _size"  s    
zChannel._sizec                 C  s
   | j jjS rF   )
connectionclienttransport_optionsr6   r   r   r   rm   3  s    zChannel.transport_optionsc                 C  s   | j ddS )NrX   Zdata_inrm   getrn   r   r   r   rX   7  s    zChannel.data_folder_inc                 C  s   | j ddS )NrR   Zdata_outro   rn   r   r   r   rR   ;  s    zChannel.data_folder_outc                 C  s   | j ddS )Nr]   Fro   rn   r   r   r   r]   ?  s    zChannel.store_processedc                 C  s   | j ddS )Nr^   	processedro   rn   r   r   r   r^   C  s    zChannel.processed_folderc                 C  s   t | jddS )Nr0   control)r   rm   rp   rn   r   r   r   r0   G  s    zChannel.control_folderN)__name__
__module____qualname____doc__Zsupports_fanoutr:   rE   rJ   rG   rf   ri   rj   propertyrm   r   rX   rR   r]   r^   r0   r   r   r   r   r(      s*   (




r(   c                      s\   e Zd ZdZejjjdedddgdZe	Z	e
 ZdZdZdZ fd	d
Zdd Z  ZS )	TransportzFilesystem Transport.FdirectZtopicZfanout)ZasynchronousZexchange_typer   
filesystemc                   s   t  j|f| | j| _d S rF   )super__init__global_statestate)r6   rl   rI   	__class__r   r   r|   [  s    zTransport.__init__c                 C  s   dS )NzN/Ar   rn   r   r   r   driver_version_  s    zTransport.driver_version)rs   rt   ru   rv   r   rx   Z
implementsextend	frozensetr(   ZBrokerStater}   default_portZdriver_typeZdriver_namer|   r   __classcell__r   r   r   r   rx   L  s   rx   )2rv   
__future__r   rO   ra   r_   rN   collectionsr   pathlibr   r'   r   timer   Zkombu.exceptionsr   Zkombu.transportr   Zkombu.utils.encodingr	   r
   Zkombu.utils.jsonr   r   Zkombu.utils.objectsr   VERSIONrQ   mapstr__version__nameZ
pywintypesZwin32conr   ZLOCKFILE_EXCLUSIVE_LOCKr    r!   ZLOCKFILE_FAIL_IMMEDIATELYZLOCK_NBZ
OVERLAPPEDr   r   r   r"   RuntimeErrorr$   r(   rx   r   r   r   r   <module>   sP   [

 .