a
    dgu
                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZm	Z	 d dl
mZmZ eeZe jddddddZG d	d
 d
eZdS )    N)StringIO)DictOptional)	BaseModelField)maxsize)returnc                   C   s   t d dS )z*Warn once about the dangers of PythonREPL.z9Python REPL can execute arbitrary code. Use with caution.N)loggerwarning r   r   u/var/www/html/cobodadashboardai.evdpl.com/venv/lib/python3.9/site-packages/langchain_experimental/utilities/python.py	warn_once   s    r   c                   @   s   e Zd ZU dZeeddZee e	d< eeddZ
ee e	d< eeeddd	Zeeee ee ejd
dddZdeee edddZd
S )
PythonREPLz#Simulates a standalone Python REPL.Z_globals)default_factoryaliasglobalsZ_localslocals)queryr   c                 C   s    t dd| } t dd| } | S )zSanitize input to the python REPL.

        Remove whitespace, backtick & python
        (if llm mistakes python console as terminal)

        Args:
            query: The query to sanitize

        Returns:
            str: The sanitized query
        z^(\s|`)*(?i:python)?\s* z(\s|`)*$)resub)r   r   r   r   sanitize_input   s    zPythonREPL.sanitize_inputN)commandr   r   queuer   c           	   
   C   s~   t j}t  t _}z.| |}t||| |t _||  W n8 tyx } z |t _|t| W Y d }~n
d }~0 0 d S )N)	sysstdoutr   r   execputgetvalue	Exceptionrepr)	clsr   r   r   r   Z
old_stdoutZmystdoutZcleaned_commander   r   r   worker+   s    
zPythonREPL.worker)r   timeoutr   c                 C   sv   t   t }|durZtj| j|| j| j|fd}|  || |	 rn|
  dS n| || j| j| | S )zxRun command with own globals/locals and returns anything printed.
        Timeout after the specified number of seconds.N)targetargszExecution timed out)r   multiprocessingQueueProcessr#   r   r   startjoinis_alive	terminateget)selfr   r$   r   pr   r   r   run>   s    
zPythonREPL.run)N)__name__
__module____qualname____doc__r   dictr   r   r   __annotations__r   staticmethodstrr   classmethodr'   r(   r#   intr1   r   r   r   r   r      s   
r   )	functoolsloggingr'   r   r   ior   typingr   r   Zpydanticr   r   	getLoggerr2   r	   	lru_cacher   r   r   r   r   r   <module>   s   

