U
    .e.#                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 z$ddl
Z
ddlZ
ddlZ
ddlZ
W n ek
rx   dZ
Y nX dZeeZd	Zd
ZdZdZdZdZdZdZdd ZG dd de	ZdS )z@Apache Cassandra result store backend using the DataStax driver.    N)states)ImproperlyConfigured)
get_logger   )BaseBackend)CassandraBackendz
You need to install the cassandra-driver library to
use the Cassandra backend.  See https://github.com/datastax/python-driver
z
CASSANDRA_AUTH_PROVIDER you provided is not a valid auth_provider class.
See https://datastax.github.io/python-driver/api/cassandra/auth.html.
z(Cassandra backend improperly configured.z!Cassandra backend not configured.z
INSERT INTO {table} (
    task_id, status, result, date_done, traceback, children) VALUES (
        %s, %s, %s, %s, %s, %s) {expires};
z]
SELECT status, result, date_done, traceback, children
FROM {table}
WHERE task_id=%s
LIMIT 1
z
CREATE TABLE {table} (
    task_id text,
    status text,
    result blob,
    date_done timestamp,
    traceback blob,
    children blob,
    PRIMARY KEY ((task_id), date_done)
) WITH CLUSTERING ORDER BY (date_done DESC);
z
    USING TTL {0}
c                 C   s
   t | dS )Nutf8)bytes)x r   =/tmp/pip-unpacked-wheel-f4liivr4/celery/backends/cassandra.pybuf_tC   s    r   c                       sb   e Zd ZdZdZdZdZd fdd	Zddd	Zdd
dZ	dddZ
dd Zd fdd	Z  ZS )r   aG  Cassandra/AstraDB backend utilizing DataStax driver.

    Raises:
        celery.exceptions.ImproperlyConfigured:
            if module :pypi:`cassandra-driver` is not available,
            or not-exactly-one of the :setting:`cassandra_servers` and
            the :setting:`cassandra_secure_bundle_path` settings is set.
    NTR#  c                    s  t  jf | tstt| jj}|p0|dd | _|pB|dd | _	|pT|dd | _
|pf|dd | _|px|dd | _|di | _| jp| j	}	|	r| jr| jstt| jr| j	rtt|p|dd }
|
d k	rt|
nd| _|d	pd
}|dpd
}ttj|tjj| _ttj|tjj| _d | _|dd }|dd }|r|rttj|d }|svtt|f || _d | _d | _d | _d | _t  | _!d S )NZcassandra_serversZcassandra_secure_bundle_pathZcassandra_portZcassandra_keyspaceZcassandra_tablecassandra_optionsZcassandra_entry_ttl Zcassandra_read_consistencyLOCAL_QUORUMZcassandra_write_consistencyZcassandra_auth_providerZcassandra_auth_kwargs)"super__init__	cassandrar   E_NO_CASSANDRAappconfgetserversbundle_pathportkeyspacetabler   E_CASSANDRA_NOT_CONFIGUREDE_CASSANDRA_MISCONFIGURED	Q_EXPIRESformat
cqlexpiresgetattrZConsistencyLevelr   read_consistencywrite_consistencyauth_providerauth!E_NO_SUCH_CASSANDRA_AUTH_PROVIDER_cluster_session_write_stmt
_read_stmt	threadingRLock_lock)selfr   r   r   Z	entry_ttlr   r   kwargsr   Zdb_directionsexpiresZ	read_consZ
write_consr&   Zauth_kwargsZauth_provider_class	__class__r   r   r   X   s\       zCassandraBackend.__init__Fc                 C   s  | j dk	rdS | j  zTz| j dk	r6W W >dS | jrbtjj| jf| j| j	d| j
| _n$tjjf d| ji| j	d| j
| _| j| j| _ tjtj| j| jd| _| j| j_tjtj| jd| _| j| j_|r,tjtj| jd}| j|_z| j | W n tjk
r*   Y nX W n< tjk
rj   | jdk	rX| j  d| _d| _  Y nX W 5 | j  X dS )zjPrepare the connection for action.

        Arguments:
            write (bool): are we a writer?
        N)r   r&   Zsecure_connect_bundle)Zcloudr&   )r   r2   )r   ) r*   r/   acquirereleaser   r   ZclusterZClusterr   r&   r   r)   r   connectr   queryZSimpleStatementQ_INSERT_RESULTr!   r   r"   r+   r%   Zconsistency_levelQ_SELECT_RESULTr,   r$   Q_CREATE_RESULT_TABLEexecuteZAlreadyExistsZOperationTimedOutshutdown)r0   writeZ	make_stmtr   r   r   _get_connection   sf    



  

	

z CassandraBackend._get_connectionc                 K   sV   | j dd | j| j||t| || j t| |t| | |f dS )z1Store return value and state of an executed task.T)r>   N)	r?   r*   r<   r+   r   encoder   nowZcurrent_task_children)r0   task_idresultstate	tracebackrequestr1   r   r   r   _store_result   s    
zCassandraBackend._store_resultc                 C   s   dS )Nzcassandra://r   )r0   Zinclude_passwordr   r   r   as_uri   s    zCassandraBackend.as_uric              
   C   sf   |    | j| j|f }|s.tjddS |\}}}}}| ||| ||| || |dS )z$Get task meta-data for a task by id.N)statusrC   )rB   rI   rC   	date_donerE   children)	r?   r*   r<   r,   Zoner   ZPENDINGZmeta_from_decodeddecode)r0   rB   resrI   rC   rJ   rE   rK   r   r   r   _get_task_meta_for   s    z#CassandraBackend._get_task_meta_forr   c                    s2   |si n|}| | j| j| jd t ||S )N)r   r   r   )updater   r   r   r   
__reduce__)r0   argsr1   r3   r   r   rP      s    zCassandraBackend.__reduce__)NNNNr   N)F)NN)T)r   N)__name__
__module____qualname____doc__r   r   Zsupports_autoexpirer   r?   rG   rH   rN   rP   __classcell__r   r   r3   r   r   G   s   
    6
I   

r   )rU   r-   Zceleryr   Zcelery.exceptionsr   Zcelery.utils.logr   baser   r   Zcassandra.authZcassandra.clusterZcassandra.queryImportError__all__rR   loggerr   r(   r   r   r9   r:   r;   r    r   r   r   r   r   r   <module>   s0   
