U
    .e                     @  s   d Z ddlmZ ddlZddlmZmZ ddlmZ ddl	m
Z
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZmZ dZde e!eZ"e# Z$G dd dej%Z%G dd dej&Z&dS )aG  SQLAlchemy Transport module for kombu.

Kombu transport using SQL Database as the message store.

Features
========
* Type: Virtual
* Supports Direct: yes
* Supports Topic: yes
* Supports Fanout: no
* Supports Priority: no
* Supports TTL: no

Connection String
=================

.. code-block::

    sqla+SQL_ALCHEMY_CONNECTION_STRING
    sqlalchemy+SQL_ALCHEMY_CONNECTION_STRING

For details about ``SQL_ALCHEMY_CONNECTION_STRING`` see SQLAlchemy Engine Configuration documentation.

Examples
--------
.. code-block::

    # PostgreSQL with default driver
    sqla+postgresql://scott:tiger@localhost/mydatabase

    # PostgreSQL with psycopg2 driver
    sqla+postgresql+psycopg2://scott:tiger@localhost/mydatabase

    # PostgreSQL with pg8000 driver
    sqla+postgresql+pg8000://scott:tiger@localhost/mydatabase

    # MySQL with default driver
    sqla+mysql://scott:tiger@localhost/foo

    # MySQL with mysqlclient driver (a maintained fork of MySQL-Python)
    sqla+mysql+mysqldb://scott:tiger@localhost/foo

    # MySQL with PyMySQL driver
    sqla+mysql+pymysql://scott:tiger@localhost/foo

Transport Options
=================

* ``queue_tablename``: Name of table storing queues.
* ``message_tablename``: Name of table storing messages.

Moreover parameters of :func:`sqlalchemy.create_engine()` function can be passed as transport options.
    )annotationsN)dumpsloads)Empty)create_enginetext)OperationalError)sessionmaker)virtual)cached_property)bytes_to_str   )Message)	ModelBase)Queue)class_registrymetadata)r      r   .c                      s   e Zd ZdZdZi Z fddZdd Zdd Zd	d
 Z	e
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zedd Zedd  Z  ZS )!ChannelzThe channel class.Nc                   s"   |  |jj t j|f| d S N)_configure_entity_tablenamesclienttransport_optionssuper__init__)self
connectionkwargs	__class__ G/tmp/pip-unpacked-wheel-48hrr5dg/kombu/transport/sqlalchemy/__init__.pyr   \   s    zChannel.__init__c                 C  s,   | dd| _| dd| _| jo&| j d S )Nqueue_tablenameZkombu_queuemessage_tablenameZkombu_message)getr#   r$   	queue_clsmessage_cls)r   optsr!   r!   r"   r   `   s    z$Channel._configure_entity_tablenamesc                 C  s8   | j j}|j }|dd  |dd  t|jf|S )Nr#   r$   )r   r   r   copypopr   hostname)r   conninfor   r!   r!   r"   _engine_from_configk   s
    
zChannel._engine_from_configc              
   C  s   | j j}|j| jkrttV |j| jkr>| j|j W  5 Q R  S |  }t|d}t| ||f| j|j< W 5 Q R X | j|j S )N)bind)	r   r   r+   _engines_MUTEXr-   r	   r   Z
create_all)r   r,   ZengineSessionr!   r!   r"   _openr   s    

zChannel._openc                 C  s$   | j d kr|  \}}| | _ | j S r   )_sessionr2   )r   _r1   r!   r!   r"   session   s    
zChannel.sessionc              
   C  s   | j | j| jj|k }|st~ | j | j| jj|k }|r^|W  5 Q R  S | |}| j | z| j   W n t	k
r   | j 
  Y nX W 5 Q R X |S r   )r5   queryr&   filternamefirstr0   addcommitr   rollbackr   queueobjr!   r!   r"   _get_or_create   s"    


zChannel._get_or_createc                 K  s   |  | d S r   )r@   )r   r>   r   r!   r!   r"   
_new_queue   s    zChannel._new_queuec                 K  sX   |  |}| t||}| j| z| j  W n tk
rR   | j  Y nX d S r   )r@   r'   r   r5   r:   r;   r   r<   )r   r>   payloadr   r?   messager!   r!   r"   _put   s    
zChannel._putc                 C  s   |  |}| jjjdkr(| jtd zt| j| j	 
| jj|jk
| jjdk| jj| jjd }|rd|_tt|jW S t W 5 | j  X d S )NZsqlitezBEGIN IMMEDIATE TRANSACTIONFr   )r@   r5   r.   r8   executer   r;   r6   r'   Zwith_for_updater7   queue_ididZvisibleZorder_byZsent_atlimitr9   r   r   rB   r   )r   r>   r?   msgr!   r!   r"   _get   s(    


zChannel._getc                 C  s(   |  |}| j| j| jj|jkS r   )r@   r5   r6   r'   r7   rF   rG   r=   r!   r!   r"   
_query_all   s    
zChannel._query_allc                 C  sD   |  |jdd}z| j  W n tk
r>   | j  Y nX |S )NF)Zsynchronize_session)rK   deleter5   r;   r   r<   )r   r>   countr!   r!   r"   _purge   s    zChannel._purgec                 C  s   |  | S r   )rK   rM   )r   r>   r!   r!   r"   _size   s    zChannel._sizec              
   C  sX   |t krPt> |t kr*t | W  5 Q R  S tt||tf|W  5 Q R  S Q R X t | S r   )r   r0   typestrr   )r   r8   basensr!   r!   r"   _declarative_cls   s    &zChannel._declarative_clsc                 C  s   |  dtd| jiS )Nr   __tablename__)rT   	QueueBaser#   r   r!   r!   r"   r&      s
    zChannel.queue_clsc                 C  s   |  dtd| jiS )Nr   rU   )rT   MessageBaser$   rW   r!   r!   r"   r'      s
    zChannel.message_cls)__name__
__module____qualname____doc__r3   r/   r   r   r-   r2   propertyr5   r@   rA   rD   rJ   rK   rN   rO   rT   r   r&   r'   __classcell__r!   r!   r   r"   r   V   s*   
	
r   c                   @  s2   e Zd ZdZeZdZdZdZdZe	fZ
dd ZdS )		TransportzThe transport class.Tr   Zsql
sqlalchemyc                 C  s   dd l }|jS )Nr   )r`   __version__)r   r`   r!   r!   r"   driver_version   s    zTransport.driver_versionN)rY   rZ   r[   r\   r   Zcan_parse_urldefault_portZdriver_typeZdriver_namer   Zconnection_errorsrb   r!   r!   r!   r"   r_      s   r_   )'r\   
__future__r   	threadingjsonr   r   r>   r   r`   r   r   Zsqlalchemy.excr   Zsqlalchemy.ormr	   Zkombu.transportr
   Zkombu.utilsr   Zkombu.utils.encodingr   modelsr   rX   r   r   rV   r   r   VERSIONjoinmaprQ   ra   RLockr0   r   r_   r!   r!   r!   r"   <module>   s(   5	 