a
    bg}                     @  s   d dl mZ d dlZd dlmZmZ d dlmZmZmZm	Z	m
Z
mZmZ d dlmZ d dlmZmZ erd dlmZ d dlmZ d	Zd
ZdZdZdZG dd deZdS )    )annotationsN)InvalidStateErrorTask)TYPE_CHECKINGAsyncIteratorIteratorListOptionalSequenceTuple)	ByteStore)	SetupModeaexecute_cql)Session)PreparedStatementzm
    CREATE TABLE IF NOT EXISTS {keyspace}.{table} 
    (row_id TEXT, body_blob BLOB, PRIMARY KEY (row_id));
zDSELECT row_id, body_blob FROM  {keyspace}.{table} WHERE row_id IN ?;z2SELECT row_id, body_blob FROM  {keyspace}.{table};zAINSERT INTO {keyspace}.{table} (row_id, body_blob) VALUES (?, ?);z1DELETE FROM {keyspace}.{table} WHERE row_id IN ?;c                   @  s  e Zd ZdZddejddddddd	d
dZddddZddddZddddZ	ddddZ
ddddZdddddZdddddZddd d!d"Zddd d#d$Zdddd%d&Zdddd'd(Zdd)dd*d+d,d-Zdd)dd.d+d/d0ZdS )1CassandraByteStorea  A ByteStore implementation using Cassandra as the backend.

    Parameters:
        table: The name of the table to use.
        session: A Cassandra session object. If not provided, it will be resolved
            from the cassio config.
        keyspace: The keyspace to use. If not provided, it will be resolved
            from the cassio config.
        setup_mode: The setup mode to use. Default is SYNC  (SetupMode.SYNC).
    N)sessionkeyspace
setup_modestrzOptional[Session]zOptional[str]r   None)tabler   r   r   returnc             	   C  s   |r|sXz.ddl m}m} |p$||| _|p0| | _W qd ttfyT   tdY qd0 n|| _|| _|| _d | _d | _	d | _
tj| j| jd}d | _|tjkrtt| j|| _n| j| d S )Nr   )check_resolve_keyspacecheck_resolve_sessionz_Could not import a recent cassio package.Please install it with `pip install --upgrade cassio`.r   r   )Zcassio.configr   r   r   r   ImportErrorModuleNotFoundErrorr   select_statementinsert_statementdelete_statementCREATE_TABLE_CQL_TEMPLATEformatdb_setup_taskr   ASYNCasynciocreate_taskr   execute)selfr   r   r   r   r   r   Z
create_cql r)   s/var/www/html/cobodadashboardai.evdpl.com/venv/lib/python3.9/site-packages/langchain_community/storage/cassandra.py__init__1   s2    

zCassandraByteStore.__init__)r   c                 C  s4   | j r0z| j   W n ty.   tdY n0 dS )zAEnsure that the DB setup is finished. If not, raise a ValueError.zAsynchronous setup of the DB not finished. NB: AstraDB components sync methods shouldn't be called from the event loop. Consider using their async equivalents.N)r#   resultr   
ValueErrorr(   r)   r)   r*   ensure_db_setupX   s    z"CassandraByteStore.ensure_db_setupc                   s   | j r| j I dH  dS )z:Ensure that the DB setup is finished. If not, wait for it.N)r#   r.   r)   r)   r*   aensure_db_setupd   s    z#CassandraByteStore.aensure_db_setupr   c                 C  s(   | j s"| jtj| j| jd| _ | j S )zGet the prepared select statement for the table.
        If not available, prepare it.

        Returns:
            PreparedStatement: The prepared statement.
        r   )r   r   prepareSELECT_TABLE_CQL_TEMPLATEr"   r   r   r.   r)   r)   r*   get_select_statementi   s    z'CassandraByteStore.get_select_statementc                 C  s(   | j s"| jtj| j| jd| _ | j S )zGet the prepared insert statement for the table.
        If not available, prepare it.

        Returns:
            PreparedStatement: The prepared statement.
        r   )r   r   r1   INSERT_TABLE_CQL_TEMPLATEr"   r   r   r.   r)   r)   r*   get_insert_statementx   s    z'CassandraByteStore.get_insert_statementc                 C  s(   | j s"| jtj| j| jd| _ | j S )zGet the prepared delete statement for the table.
        If not available, prepare it.

        Returns:
            PreparedStatement: The prepared statement.
        r   )r    r   r1   DELETE_TABLE_CQL_TEMPLATEr"   r   r   r.   r)   r)   r*   get_delete_statement   s    z'CassandraByteStore.get_delete_statementzSequence[str]zList[Optional[bytes]])keysr   c                   sT   ddl m} |   i  | j|  ||gD ]}|j |j< q0 fdd|D S )Nr   ValueSequencec                   s   g | ]}  |qS r)   get.0keyZ	docs_dictr)   r*   
<listcomp>       z+CassandraByteStore.mget.<locals>.<listcomp>)cassandra.queryr:   r/   r   r'   r3   	body_blobrow_idr(   r8   r:   rowr)   r@   r*   mget   s    zCassandraByteStore.mgetc                   sb   ddl m} |  I d H  i  t| j|  ||gdI d H D ]}|j |j< q> fdd|D S )Nr   r9   
parametersc                   s   g | ]}  |qS r)   r;   r=   r@   r)   r*   rA      rB   z,CassandraByteStore.amget.<locals>.<listcomp>)rC   r:   r0   r   r   r3   rD   rE   rF   r)   r@   r*   amget   s    zCassandraByteStore.amgetzSequence[Tuple[str, bytes]])key_value_pairsr   c                 C  s4   |    |  }|D ]\}}| j|||f qd S )N)r/   r5   r   r'   r(   rL   r   kvr)   r)   r*   mset   s    zCassandraByteStore.msetc                   sB   |   I d H  |  }|D ]"\}}t| j|||fdI d H  qd S )NrI   )r0   r5   r   r   rM   r)   r)   r*   amset   s    zCassandraByteStore.amsetc                 C  s0   ddl m} |   | j|  ||g d S )Nr   r9   )rC   r:   r/   r   r'   r7   r(   r8   r:   r)   r)   r*   mdelete   s    zCassandraByteStore.mdeletec                   s>   ddl m} |  I d H  t| j|  ||gdI d H  d S )Nr   r9   rI   )rC   r:   r0   r   r   r7   rR   r)   r)   r*   amdelete   s
    zCassandraByteStore.amdelete)prefixzIterator[str])rU   r   c                c  sF   |    | jtj| j| jdD ]}|j}|r:||r"|V  q"d S Nr   )	r/   r   r'   SELECT_ALL_TABLE_CQL_TEMPLATEr"   r   r   rE   
startswithr(   rU   rG   r?   r)   r)   r*   
yield_keys   s    zCassandraByteStore.yield_keyszAsyncIterator[str]c                C sR   |   I d H  t| jtj| j| jdI d H D ]}|j}|rF||r.|V  q.d S rV   )	r0   r   r   rW   r"   r   r   rE   rX   rY   r)   r)   r*   ayield_keys   s    zCassandraByteStore.ayield_keys)__name__
__module____qualname____doc__r   ZSYNCr+   r/   r0   r3   r5   r7   rH   rK   rP   rQ   rS   rT   rZ   r[   r)   r)   r)   r*   r   %   s$   'r   )
__future__r   r%   r   r   typingr   r   r   r   r	   r
   r   Zlangchain_core.storesr   Z'langchain_community.utilities.cassandrar   r   Zcassandra.clusterr   rC   r   r!   r2   rW   r4   r6   r   r)   r)   r)   r*   <module>   s    $
