a
    bg`                     @  sR   d dl mZ d dlmZmZmZmZmZ er@d dlm	Z	m
Z
mZ G dd dZdS )    )annotations)TYPE_CHECKINGAnyIterableListOptional)	DataFrameRowSparkSessionc                   @  s   e Zd ZdZd1dddddddd	d
Zed2dddd dddZddddZddddZdddddZ	d3dddddZ
dddddZdd d!d"d#Zd$d%d&d'd(Zd4dddd*d+d,Zd5dddd-d.Zd6dddd*d/d0ZdS )7SparkSQLz;SparkSQL is a utility class for interacting with Spark SQL.N   zOptional[SparkSession]zOptional[str]zOptional[List[str]]int)spark_sessioncatalogschemaignore_tablesinclude_tablessample_rows_in_table_infoc           
      C  s*  zddl m} W n ty*   tdY n0 |r4|n|j | _|durV| jj| |durl| jj| t	| 
 | _|rt	|nt	 | _| jr| j| j }|rtd| d|rt	|nt	 | _| jr| j| j }|rtd| d|  }	|	rt	|	n| j| _t|ts td|| _dS )	a  Initialize a SparkSQL object.

        Args:
            spark_session: A SparkSession object.
              If not provided, one will be created.
            catalog: The catalog to use.
              If not provided, the default catalog will be used.
            schema: The schema to use.
              If not provided, the default schema will be used.
            ignore_tables: A list of tables to ignore.
              If not provided, all tables will be used.
            include_tables: A list of tables to include.
              If not provided, all tables will be used.
            sample_rows_in_table_info: The number of rows to include in the table info.
              Defaults to 3.
        r   r
   Fpyspark is not installed. Please install it with `pip install pyspark`Nzinclude_tables  not found in databasezignore_tables z,sample_rows_in_table_info must be an integer)pyspark.sqlr
   ImportErrorbuildergetOrCreate_sparkr   ZsetCurrentCatalogZsetCurrentDatabaseset_get_all_table_names_all_tables_include_tables
ValueError_ignore_tablesget_usable_table_namesZ_usable_tables
isinstancer   	TypeError_sample_rows_in_table_info)
selfr   r   r   r   r   r   r
   missing_tablesZusable_tables r(   u/var/www/html/cobodadashboardai.evdpl.com/venv/lib/python3.9/site-packages/langchain_community/utilities/spark_sql.py__init__   s@    


zSparkSQL.__init__strzOptional[dict]r   )database_uriengine_argskwargsreturnc                 K  sL   zddl m} W n ty*   tdY n0 |j| }| |fi |S )zzCreating a remote Spark Session via Spark connect.
        For example: SparkSQL.from_uri("sc://localhost:15002")
        r   r   r   )r   r
   r   r   remoter   )clsr,   r-   r.   r
   Zsparkr(   r(   r)   from_uriK   s    
zSparkSQL.from_urizIterable[str])r/   c                 C  s   | j r| j S t| j| j S )zGet names of tables available.)r   sortedr   r!   )r&   r(   r(   r)   r"   \   s    zSparkSQL.get_usable_table_namesc                 C  s(   | j dd }ttdd |S )NzSHOW TABLES	tableNamec                 S  s   | j S N)r4   )rowr(   r(   r)   <lambda>e       z/SparkSQL._get_all_table_names.<locals>.<lambda>)r   sqlselectcollectlistmap)r&   rowsr(   r(   r)   r   c   s    zSparkSQL._get_all_table_names)tabler/   c                 C  s6   | j d|  d j}|d}|d | d S )NzSHOW CREATE TABLE r   ZUSING;)r   r9   r;   Zcreatetab_stmtfind)r&   r?   Z	statementZusing_clause_indexr(   r(   r)   _get_create_table_stmtg   s    
zSparkSQL._get_create_table_stmt)table_namesr/   c                 C  s   |   }|d ur6t||}|r2td| d|}g }|D ]D}| |}| jrx|d7 }|d| | d7 }|d7 }|| q>d|}|S )Nztable_names r   z

/*
z*/z

)	r"   r   
differencer    rB   r%   _get_sample_spark_rowsappendjoin)r&   rC   Zall_table_namesr'   ZtablesZ
table_nameZ
table_infoZ	final_strr(   r(   r)   get_table_infoo   s     

zSparkSQL.get_table_infoc                 C  s   d| d| j  }| j|}dttdd |jj}z"| |}ddd |D }W n t	yr   d	}Y n0 | j  d
| d| d| S )NzSELECT * FROM z LIMIT 	c                 S  s   | j S r5   )name)fr(   r(   r)   r7      r8   z1SparkSQL._get_sample_spark_rows.<locals>.<lambda>rD   c                 S  s   g | ]}d  |qS )rJ   )rH   ).0r6   r(   r(   r)   
<listcomp>   r8   z3SparkSQL._get_sample_spark_rows.<locals>.<listcomp> z rows from z table:
)
r%   r   r9   rH   r<   r=   r   fields_get_dataframe_results	Exception)r&   r?   querydfZcolumns_strZsample_rowsZsample_rows_strr(   r(   r)   rF      s    

zSparkSQL._get_sample_spark_rowsr	   tuple)r6   r/   c                 C  s   t tt|  S r5   )rU   r=   r+   asDictvalues)r&   r6   r(   r(   r)   _convert_row_as_tuple   s    zSparkSQL._convert_row_as_tupler   r<   )rT   r/   c                 C  s   t t| j| S r5   )r<   r=   rX   r;   )r&   rT   r(   r(   r)   rQ      s    zSparkSQL._get_dataframe_resultsall)commandfetchr/   c                 C  s,   | j |}|dkr|d}t| |S )None   )r   r9   limitr+   rQ   )r&   rZ   r[   rT   r(   r(   r)   run   s    
zSparkSQL.runc              
   C  s@   z|  |W S  ty: } zd| W  Y d}~S d}~0 0 dS )af  Get information about specified tables.

        Follows best practices as specified in: Rajkumar et al, 2022
        (https://arxiv.org/abs/2204.00498)

        If `sample_rows_in_table_info`, the specified number of sample rows will be
        appended to each table description. This can increase performance as
        demonstrated in the paper.
        Error: N)rI   r    )r&   rC   er(   r(   r)   get_table_info_no_throw   s    
z SparkSQL.get_table_info_no_throwc              
   C  sB   z|  ||W S  ty< } zd| W  Y d}~S d}~0 0 dS )a*  Execute a SQL command and return a string representing the results.

        If the statement returns rows, a string of the results is returned.
        If the statement returns no rows, an empty string is returned.

        If the statement throws an error, the error message is returned.
        r`   N)r_   rR   )r&   rZ   r[   ra   r(   r(   r)   run_no_throw   s    zSparkSQL.run_no_throw)NNNNNr   )N)N)rY   )N)rY   )__name__
__module____qualname____doc__r*   classmethodr2   r"   r   rB   rI   rF   rX   rQ   r_   rb   rc   r(   r(   r(   r)   r   	   s*         ? r   N)
__future__r   typingr   r   r   r   r   r   r   r	   r
   r   r(   r(   r(   r)   <module>   s   