a
    `g1                     @   s6  d dl Z d dlZd dlZd dlZd dlmZmZmZmZm	Z	m
Z
mZmZmZmZ d dlZddlmZmZmZmZmZmZmZmZ ddlmZmZmZ ddlmZ zd dl Z W n e!y   dZ Y n0 G dd deZ"G d	d
 d
eZ#G dd deZ$dd Z%dd Z&ej'dd Z(ej'dd Z)ej*dd Z+ej*d7ddZ,dd Z-dd Z.dd Z/ej'd d! Z0ej'd"d# Z1ej'eg e2f eg e2f d$d%d&Zej'eg e2f d'd(d)Z3ej'e j4e	e# d*d+d,Z5ej'd-d. Z6ej'e j4e	e$ d*d/d0Z7ej'd1d2 Z8ej'e j4e	e" d*d3d4Z9ej'd5d6 Z:dS )8    N)
Any	AwaitableCallableDictIteratorOptionalProtocolTypeUnionoverload   )BaseTestServerRawTestServer
TestClient
TestServerloop_contextsetup_test_loopteardown_test_loopunused_port)ApplicationBaseRequestRequest)_RequestHandlerc                   @   sp   e Zd Zeddeeeeef  ee	e
ef dddZeddeeeeef  ee	edf dddZdS )AiohttpClientNserver_kwargs)_AiohttpClient__paramr   kwargsreturnc                   s   d S N selfr   r   r   r    r    c/var/www/html/cobodadashboardai.evdpl.com/venv/lib/python3.9/site-packages/aiohttp/pytest_plugin.py__call__(   s    zAiohttpClient.__call__c                   s   d S r   r    r!   r    r    r#   r$   0   s    )__name__
__module____qualname__r   r   r   r   strr   r   r   r$   r   r   r    r    r    r#   r   '   s    

r   c                   @   s.   e Zd Zddeee eee dddZ	dS )AiohttpServerNportappr+   r   r   c                K   s   d S r   r    )r"   r-   r+   r   r    r    r#   r$   ;   s    zAiohttpServer.__call__)
r%   r&   r'   r   r   intr   r   r   r$   r    r    r    r#   r)   :   s
   
r)   c                   @   s.   e Zd Zddeee eee dddZ	dS )AiohttpRawServerNr*   handlerr+   r   r   c                K   s   d S r   r    )r"   r1   r+   r   r    r    r#   r$   A   s    zAiohttpRawServer.__call__)
r%   r&   r'   r   r   r.   r   r   r   r$   r    r    r    r#   r/   @   s
   
r/   c                 C   s:   | j ddddd | j dddd	d | j d
dddd d S )N--aiohttp-fast
store_trueFz*run tests faster by disabling extra checks)actiondefaulthelpz--aiohttp-loopstorepyloopz3run tests with specific loop: pyloop, uvloop or all--aiohttp-enable-loop-debugzenable event loop debug mode)Z	addoption)parserr    r    r#   pytest_addoptionF   s$    r;   c                    sd   | j  t rdnt r&dndS dd| jvrJ|  jd7  _d fdd}|| _ dS )zjSet up pytest fixture.

    Allow fixtures to be coroutines. Run coroutine fixtures in an event loop.
    TFNrequestr<   c                     s~   |d }r|d= d|j vr$td|d rf| i | fdd}||   S  | i |S d S )Nr<   loopz^Asynchronous fixtures must depend on the 'loop' fixture or be used in tests depending from it.c                      s(   z   W S  ty"   Y n0 d S r   )run_until_complete	__anext__StopAsyncIterationr    _loopgenr    r#   	finalizer   s    z8pytest_fixture_setup.<locals>.wrapper.<locals>.finalizer)fixturenames	ExceptionZgetfixturevalueZaddfinalizerr?   r@   )argsr   r<   rE   funcZis_async_genZstrip_requestrB   r#   wrapperq   s    


z%pytest_fixture_setup.<locals>.wrapper)rJ   inspectisasyncgenfunctionasyncioiscoroutinefunctionargnames)Z
fixturedefrK   r    rI   r#   pytest_fixture_setup[   s    


 rQ   c                 C   s   | j dS )z--fast config optionr2   config	getoptionr=   r    r    r#   fast   s    rU   c                 C   s   | j dS )z!--enable-loop-debug config optionr9   rR   r=   r    r    r#   
loop_debug   s    rV   c               	   c   st   t jddT} dV  dd | D }|rRtdt|t|dkrBdnd	d
|W d   n1 sf0    Y  dS )zContext manager which checks for RuntimeWarnings.

    This exists specifically to
    avoid "coroutine 'X' was never awaited" warnings being missed.

    If RuntimeWarnings occur in the context a RuntimeError is raised.
    T)recordNc                 S   s"   g | ]}|j tkrd j|dqS )z#{w.filename}:{w.lineno}:{w.message})w)categoryRuntimeWarningformat).0rX   r    r    r#   
<listcomp>   s   
z,_runtime_warning_context.<locals>.<listcomp>z{} Runtime Warning{},
{}r    s
)warningscatch_warningsRuntimeErrorr[   lenjoin)	_warningsrwr    r    r#   _runtime_warning_context   s    	 rh   Fc                 c   s(   | r| V  nt  } | V  t| |d dS )zPassthrough loop context.

    Sets up and tears down a loop unless one is passed in via the loop
    argument when it's passed straight through.
    rU   N)r   r   )r>   rU   r    r    r#   _passthrough_loop_context   s
    rj   c                 C   s(   |  |r$t|r$t| ||S dS )z%Fix pytest collecting for coroutines.N)ZfuncnamefilterrN   rO   listZ_genfunctions)	collectornameobjr    r    r#   pytest_pycollect_makeitem   s    ro   c              	      s    j d}t jr jdp0 jdd}t h t||d<} fdd j	j
D }| jf i | W d   n1 s0    Y  W d   n1 s0    Y  dS dS )	zBRun coroutines in an event loop instead of a normal function call.r2   proactor_loopr>   Nri   c                    s   i | ]}| j | qS r    )funcargs)r\   arg
pyfuncitemr    r#   
<dictcomp>   s   z&pytest_pyfunc_call.<locals>.<dictcomp>T)rS   rT   rN   rO   functionrq   getrh   rj   Z_fixtureinforP   r?   rn   )rt   rU   Zexisting_looprC   Ztestargsr    rs   r#   pytest_pyfunc_call   s    
Rrx   c                 C   s   d| j vrd S | jjj}dtji}td ur4tj|d< |dkr@d}i }|dD ]N}|	d }|
d}||vr|rNtd	|t| f nqN|| ||< qN| jdt| t| d
 d S )Nloop_factoryr8   uvloopallzpyloop,uvloop?,?z ?z&Unknown loop '%s', available loops: %s)Zids)rF   rS   optionZaiohttp_looprN   DefaultEventLoopPolicyrz   ZEventLoopPolicysplitendswithstrip
ValueErrorrk   keysZparametrizevalues)ZmetafuncZloopsZavail_factoriesZ	factoriesrm   requiredr    r    r#   pytest_generate_tests   s0    




r   c                 c   s\   |  }t | t|d.}|r*|d t | |V  W d   n1 sN0    Y  dS )z%Return an instance of the event loop.ri   TN)rN   set_event_loop_policyr   	set_debugset_event_loop)ry   rU   rV   policyrC   r    r    r#   r>     s    


r>   c                  c   sP   t  } t |  t| j }t | |V  W d    n1 sB0    Y  d S r   )rN   ZWindowsProactorEventLoopPolicyr   r   new_event_loopr   )r   rC   r    r    r#   rp     s
    

rp   )aiohttp_unused_portr   c                 C   s   t jdtdd | S )Nz3Deprecated, use aiohttp_unused_port fixture instead   
stacklevelra   warnDeprecationWarning)r   r    r    r#   r     s    r   r   c                   C   s   t S )z1Return a port that is unused on the current host.)_unused_portr    r    r    r#   r   !  s    r   )r>   r   c                 #   sP   g ddt tt ttd fdd}|V  ddfdd} |  dS )	z^Factory to create a TestServer instance, given an app.

    aiohttp_server(app, **kwargs)
    Nr*   r,   c                   s4   t | |d}|jf d i|I d H  | |S Nr*   r>   )r   start_serverappend)r-   r+   r   serverr>   serversr    r#   go/  s    
zaiohttp_server.<locals>.gor   c                      s    r    I d H  q d S r   popcloser    r   r    r#   finalize9  s    z aiohttp_server.<locals>.finalize)r   r   r.   r   r   r?   r>   r   r   r    r   r#   aiohttp_server'  s    
r   c                 C   s   t jdtdd | S )Nz.Deprecated, use aiohttp_server fixture insteadr   r   r   )r   r    r    r#   test_server@  s    r   c                 #   sP   g ddt tt ttd fdd}|V  ddfdd} |  dS )	zpFactory to create a RawTestServer instance, given a web handler.

    aiohttp_raw_server(handler, **kwargs)
    Nr*   r0   c                   s4   t | |d}|jf d i|I d H  | |S r   )r   r   r   )r1   r+   r   r   r   r    r#   r   R  s    
zaiohttp_raw_server.<locals>.gor   c                      s    r    I d H  q d S r   r   r    r   r    r#   r   \  s    z$aiohttp_raw_server.<locals>.finalize)r   r   r.   r   r   r?   r   r    r   r#   aiohttp_raw_serverJ  s    
r   c                 C   s   t jdtdd | S )Nz2Deprecated, use aiohttp_raw_server fixture insteadr   r   r   )r   r    r    r#   raw_test_serverc  s    r   c                 #   s   g  t ddtttttf  ttttf ddd}t ddtttttf  ttt	df ddd}ddt
ttf tttttf  ttttf d fdd}|V  dd	 fd
d}|  dS )zFactory to create a TestClient instance.

    aiohttp_client(app, **kwargs)
    aiohttp_client(server, **kwargs)
    aiohttp_client(raw_server, **kwargs)
    Nr   )__paramr   r   r   c                   s   d S r   r    r   r   r   r    r    r#   r   y  s    zaiohttp_client.<locals>.goc                   s   d S r   r    r   r    r    r#   r     s    )r   rH   r   r   r   c                   s   t | tr4t | ttfs4| g|R i |} i }n|r@J dt | tr||pPi }t| fdi|}t|fdi|}n0t | trt| fdi|}ntdt|  | I d H   	| |S )Nzargs should be emptyr>   zUnknown argument type: %r)

isinstancer   r   r   r   r   r   typer   r   )r   r   rH   r   r   clientclientsr>   r    r#   r     s     


r   c                      s    r    I d H  q d S r   r   r    )r   r    r#   r     s    z aiohttp_client.<locals>.finalize)r   r   r   r   r(   r   r   r   r   r   r
   r?   r   r    r   r#   aiohttp_cliento  s8    




r   c                 C   s   t jdtdd | S )Nz.Deprecated, use aiohttp_client fixture insteadr   r   r   )r   r    r    r#   test_client  s    r   )F);rN   
contextlibrL   ra   typingr   r   r   r   r   r   r   r	   r
   r   ZpytestZ
test_utilsr   r   r   r   r   r   r   r   r   Zwebr   r   r   Zweb_protocolr   rz   ImportErrorr   r)   r/   r;   rQ   ZfixturerU   rV   contextmanagerrh   rj   ro   rx   r   r>   rp   r.   r   AbstractEventLoopr   r   r   r   r   r   r    r    r    r#   <module>   s`   0(

9


 

	"	
	
=