U
    .e"                     @  s  d Z ddlmZ ddlZddlmZ ddlmZmZ ddl	m
Z
 ddlmZ ddlmZmZ dd	lmZ d
dlmZ zddlmZ W n ek
r   dZY nX zddlmZmZ W n ek
r   dZdZY nX dd ejD ZG dd dejZG dd dejZdS )a  Azure Storage Queues transport module for kombu.

More information about Azure Storage Queues:
https://azure.microsoft.com/en-us/services/storage/queues/

Features
========
* Type: Virtual
* Supports Direct: *Unreviewed*
* Supports Topic: *Unreviewed*
* Supports Fanout: *Unreviewed*
* Supports Priority: *Unreviewed*
* Supports TTL: *Unreviewed*

Connection String
=================

Connection string has the following formats:

.. code-block::

    azurestoragequeues://<STORAGE_ACCOUNT_ACCESS_KEY>@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://<SAS_TOKEN>@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL>

Note that if the access key for the storage account contains a forward slash
(``/``), it will have to be regenerated before it can be used in the connection
URL.

.. code-block::

    azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL>

If you wish to use an `Azure Managed Identity` you may use the
``DefaultAzureCredential`` format of the connection string which will use
``DefaultAzureCredential`` class in the azure-identity package. You may want to
read the `azure-identity documentation` for more information on how the
``DefaultAzureCredential`` works.

.. _azure-identity documentation:
https://learn.microsoft.com/en-us/python/api/overview/azure/identity-readme?view=azure-python
.. _Azure Managed Identity:
https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview

Transport Options
=================

* ``queue_name_prefix``
    )annotationsN)Empty)AnyOptional)ResourceExistsError)safe_str)dumpsloads)cached_property   )virtual)QueueServiceClient)DefaultAzureCredentialManagedIdentityCredentialc                 C  s   i | ]}t |d qS )-   )ord).0c r   F/tmp/pip-unpacked-wheel-48hrr5dg/kombu/transport/azurestoragequeues.py
<dictcomp>Q   s     r   c                      s   e Zd ZU dZdZded< dZded< i Zded	< d
Zded< e	 Z
ded<  fddZ fddZefddddZdd Zdd Zdd Zd+ddZdd Zd d! Zed"dd#d$Zed%d& Zed'd( Zeddd)d*Z  ZS ),ChannelzAzure Storage Queues channel.zkombu%(vhost)sstrdomain_formatNzOptional[QueueServiceClient]_queue_servicezdict[Any, Any]_queue_name_cacheTboolno_ackzset[Any]_noack_queuesc                   sV   t d krtdt j|| t| jj\| _| _	| j
 D ]}|| j|d < q>d S )NzGAzure Storage Queues transport requires the azure-storage-queue libraryname)r   ImportErrorsuper__init__	Transport	parse_uriconninfohostname_credential_urlqueue_serviceZlist_queuesr   )selfargskwargsqueue	__class__r   r   r"   _   s    zChannel.__init__c                   s&   |r| j | t j||f||S N)r   addr!   basic_consume)r*   r-   r   r+   r,   r.   r   r   r2   m   s    zChannel.basic_consume)returnc                 C  s   t t||S )z=Format AMQP queue name into a valid Azure Storage Queue name.)r   r   	translate)r*   r   tabler   r   r   entity_namet   s    zChannel.entity_namec                 C  s   |  | j| }z| jj| j| d}W nV tk
r~   z| j|}W n" tk
rj   | jj|d}Y nX |	 | j|< Y nX |S )zEnsure a queue exists.)r-   )
r6   queue_name_prefixr   Zget_queue_clientr   KeyErrorr)   Zcreate_queuer   get_queue_propertiesr*   r-   qr   r   r   _ensure_queuex   s    
zChannel._ensure_queuec                 O  s(   |  |}| j|d | j| dS )zDelete queue by name.N)r6   r   popr)   Zdelete_queue)r*   r-   r+   r,   
queue_namer   r   r   _delete   s    
zChannel._deletec                 K  s    |  |}t|}|| dS )zPut message onto queue.N)r<   r   send_message)r*   r-   messager,   r;   Zencoded_messager   r   r   _put   s    
zChannel._putc                 C  sZ   |  |}|jd|d}zt|}W n tk
r>   t Y nX t|j}|j|d |S )z/Try to retrieve a single message off ``queue``.r   )Zmessages_per_pagetimeout)rA   )r<   Zreceive_messagesnextStopIterationr   r	   contentZdelete_message)r*   r-   rC   r;   messagesrA   rF   r   r   r   _get   s    

zChannel._getc                 C  s   |  |}| jS )z)Return the number of messages in a queue.)r<   r9   Zapproximate_message_countr:   r   r   r   _size   s    
zChannel._sizec                 C  s"   |  |}| |j}|  |S )z'Delete all current messages in a queue.)r<   rI   r>   Zclear_messages)r*   r-   r;   nr   r   r   _purge   s    
zChannel._purger   c                 C  s"   | j d krt| j| jd| _ | j S )N)Zaccount_url
credential)r   r   r(   r'   r*   r   r   r   r)      s    
 zChannel.queue_servicec                 C  s   | j jS r0   )
connectionclientrM   r   r   r   r%      s    zChannel.conninfoc                 C  s
   | j jjS r0   )rN   rO   transport_optionsrM   r   r   r   rP      s    zChannel.transport_optionsc                 C  s   | j ddS )Nr7    )rP   getrM   r   r   r   r7      s    zChannel.queue_name_prefix)N)__name__
__module____qualname____doc__r   __annotations__r   r   r   setr   r"   r2   CHARS_REPLACE_TABLEr6   r<   r?   rB   rH   rI   rK   propertyr)   r%   rP   r
   r7   __classcell__r   r   r.   r   r   V   s.   



r   c                   @  sh   e Zd ZU dZeZdZded< dZded< dZd	ed
< e	dddddZ
eddd	dddddZdS )r#   zAzure Storage Queues transport.r   intpolling_intervalNzOptional[int]default_portTr   can_parse_urlr   ztuple[str | dict, str])urir3   c                 C  s   z|  dd} | dd\}}d | krFtd kr>tdt }nBd | krntd krftdt }nd	|krd
|krd	|d}t||gstW n tk
r   t	dY nX ||fS )Nzazurestoragequeues://rQ   @r   r   z`Azure Storage Queues transport with a DefaultAzureCredential requires the azure-identity libraryr   zcAzure Storage Queues transport with a ManagedIdentityCredential requires the azure-identity libraryZdevstoreaccount1z.core.windows.net)Zaccount_nameZaccount_keyzNeed a URI like azurestoragequeues://{SAS or access key}@{URL}, azurestoragequeues://DefaultAzureCredential@{URL}, , or azurestoragequeues://ManagedIdentityCredential@{URL})
replacersplitlowerr   r    r   allAssertionError	Exception
ValueError)r`   rL   urlr   r   r   r$      s*    
zTransport.parse_uriF**)r`   include_passwordmaskr3   c                 C  s"   |  |\}}d|r|n||S )Nzazurestoragequeues://{}@{})r$   format)clsr`   rk   rl   rL   ri   r   r   r   as_uri   s
    
 zTransport.as_uri)Frj   )rS   rT   rU   rV   r   r]   rW   r^   r_   staticmethodr$   classmethodro   r   r   r   r   r#      s   
0   r#   )rV   
__future__r   stringr-   r   typingr   r   Zazure.core.exceptionsr   Zkombu.utils.encodingr   Zkombu.utils.jsonr   r	   Zkombu.utils.objectsr
   rQ   r   Zazure.storage.queuer   r    Zazure.identityr   r   punctuationrY   r   r#   r   r   r   r   <module>   s.   4

p