U
    ~fhA                     @  s$  d Z ddlmZ ddlmZ ddlmZmZmZm	Z	m
Z
mZmZmZmZ ddlmZmZ ddlmZ ddlmZ ddlmZ dd	lmZmZmZ dd
lmZmZmZm Z m!Z! ddl"m#Z# ddl$m%Z%m&Z&m'Z' erddl(m)Z) ddl*m+Z+ ddl,m-Z- dZ.G dd de	e' Z/G dd de/e' Z0dS )z4CommandCursor class to iterate over command results.    )annotations)deque)	TYPE_CHECKINGAnyAsyncIteratorGenericMappingNoReturnOptionalSequenceUnion)CodecOptions&_convert_raw_document_lists_to_streams)_csot)_ConnectionManager)_CURSOR_CLOSED_ERRORS)ConnectionFailureInvalidOperationOperationFailure)_CursorAddress_GetMore_OpMsg_OpReply_RawBatchGetMore)PinnedResponse)_Address_DocumentOut_DocumentType)AsyncClientSession)AsyncCollection)AsyncConnectionFc                   @  s  e Zd ZdZeZd[ddddd	d
dddd	ddZddddZdddddZddddZ	e
ddddZddddd Zd\d!d	d"ddd#d$d%d&Ze
ddd'd(Ze
ddd)d*Ze
ddd+d,Ze
d
dd-d.Zd/dd0d1Zddd2d3Zddd4d5Zddd6d7Zddd8d9Zd:dd;d<d=Zddd>d?Zd@ddAdBZdCddDdEZdCddFdGZddHdIdJdKZd]dLd	ddMdNdOZdHddPdQZdddRdSZdddddTdUdVZ e!j"d^d	dWdXdYdZZ#dS )_AsyncCommandCursorz7An asynchronous cursor / iterator over command cursors.r   NFAsyncCollection[_DocumentType]Mapping[str, Any]Optional[_Address]intOptional[int]Optional[AsyncClientSession]boolr   None	
collectioncursor_infoaddress
batch_sizemax_await_time_mssessionexplicit_sessioncommentreturnc	           	      C  s   d| _ || _|d | _t|d | _|d| _|| _|| _|| _	| jj
jjj| _|| _|| _| jdk| _|| _| jr~|   d|kr|d | _n|j| _| | t|ts|dk	rtddS )zCreate a new command cursor.Nid
firstBatchpostBatchResumeTokenr   nsz,max_await_time_ms must be an integer or None)	_sock_mgr_collection_idr   _dataget_postbatchresumetoken_address_batch_size_max_await_time_msdatabaseclientoptionstimeout_timeout_session_explicit_session_killed_comment_end_session_nsZ	full_namer.   
isinstancer%   	TypeError	selfr+   r,   r-   r.   r/   r0   r1   r2    rP   G/tmp/pip-unpacked-wheel-36gvocj8/pymongo/asynchronous/command_cursor.py__init__;   s.    

zAsyncCommandCursor.__init__)r3   c                 C  s   |    d S N)_die_no_lockrO   rP   rP   rQ   __del__c   s    zAsyncCommandCursor.__del__z!AsyncCommandCursor[_DocumentType])r.   r3   c                 C  s8   t |tstd|dk r"td|dkr.dp0|| _| S )a  Limits the number of documents returned in one batch. Each batch
        requires a round trip to the server. It can be adjusted to optimize
        performance and limit data transfer.

        .. note:: batch_size can not override MongoDB's internal limits on the
           amount of data it will return to the client in a single batch (i.e
           if you set batch size to 1,000,000,000, MongoDB will currently only
           return 4-16MB of results per batch).

        Raises :exc:`TypeError` if `batch_size` is not an integer.
        Raises :exc:`ValueError` if `batch_size` is less than ``0``.

        :param batch_size: The size of each batch of results requested.
        zbatch_size must be an integerr   zbatch_size must be >= 0      )rL   r%   rM   
ValueErrorr?   )rO   r.   rP   rP   rQ   r.   f   s    
zAsyncCommandCursor.batch_sizec                 C  s   t | jdkS )z^Returns `True` if the cursor has documents remaining from the
        previous batch.
        r   )lenr;   rU   rP   rP   rQ   	_has_next}   s    zAsyncCommandCursor._has_nextOptional[Mapping[str, Any]]c                 C  s   | j S )zlRetrieve the postBatchResumeToken from the response to a
        changeStream aggregate or getMore.
        )r=   rU   rP   rP   rQ   _post_batch_resume_token   s    z+AsyncCommandCursor._post_batch_resume_tokenr    )connr3   c                   sV   | j jj}|| jsd S | jsR|  t|d}| jdkrL|	 I d H  n|| _d S )NFr   )
r9   rA   rB   Z_should_pin_cursorrF   r8   Z
pin_cursorr   r:   close)rO   r^   rB   Zconn_mgrrP   rP   rQ   _maybe_pin_connection   s    


z(AsyncCommandCursor._maybe_pin_connectionUnion[_OpReply, _OpMsg]zCodecOptions[Mapping[str, Any]]zSequence[_DocumentOut]response	cursor_idcodec_optionsuser_fieldslegacy_responser3   c                 C  s   | ||||S rS   )Zunpack_response)rO   rc   rd   re   rf   rg   rP   rP   rQ   _unpack_response   s    z#AsyncCommandCursor._unpack_responsec                 C  s   t t| jp| j S )a  Does this cursor have the potential to return more data?

        Even if :attr:`alive` is ``True``, :meth:`next` can raise
        :exc:`StopIteration`. Best to use a for loop::

            async for doc in collection.aggregate(pipeline):
                print(doc)

        .. note:: :attr:`alive` can be True while iterating a cursor from
          a failed server. In this case :attr:`alive` will return False after
          :meth:`next` fails to retrieve the next batch of results from the
          server.
        )r(   rZ   r;   rH   rU   rP   rP   rQ   alive   s    zAsyncCommandCursor.alivec                 C  s   | j S )zReturns the id of the cursor.)r:   rU   rP   rP   rQ   rd      s    zAsyncCommandCursor.cursor_idc                 C  s   | j S )zUThe (host, port) of the server used, or None.

        .. versionadded:: 3.0
        )r>   rU   rP   rP   rQ   r-      s    zAsyncCommandCursor.addressc                 C  s   | j r| jS dS )zThe cursor's :class:`~pymongo.asynchronous.client_session.AsyncClientSession`, or None.

        .. versionadded:: 3.6
        N)rG   rF   rU   rP   rP   rQ   r0      s    zAsyncCommandCursor.sessionz$tuple[int, Optional[_CursorAddress]]c                 C  sJ   | j }d| _ | jr:|s:| j}| jd k	s*tt| j| j}nd}d }||fS )NTr   )rH   r:   r>   AssertionErrorr   rK   )rO   Zalready_killedrd   r-   rP   rP   rQ   _prepare_to_die   s    
z"AsyncCommandCursor._prepare_to_diec                 C  s@   |   \}}| jjj||| j| j| j | js6d| _d| _dS )z,Closes this cursor without acquiring a lock.N)rk   r9   rA   rB   Z_cleanup_cursor_no_lockr8   rF   rG   rO   rd   r-   rP   rP   rQ   rT      s    
    zAsyncCommandCursor._die_no_lockc                   sF   |   \}}| jjj||| j| j| jI dH  | js<d| _d| _dS )zCloses this cursor.N)rk   r9   rA   rB   Z_cleanup_cursor_lockr8   rF   rG   rl   rP   rP   rQ   	_die_lock   s    

zAsyncCommandCursor._die_lockc                 C  s    | j r| js| j   d | _ d S rS   )rF   rG   Z_end_implicit_sessionrU   rP   rP   rQ   rJ      s    
zAsyncCommandCursor._end_sessionc                   s   |   I dH  dS )z$Explicitly close / kill this cursor.N)rm   rU   rP   rP   rQ   r_      s    zAsyncCommandCursor.closer   )	operationr3   c              
     sf  | j jj}z|j|| j| jdI dH }W n tk
rz } z4|jtkrJd| _	|j
rZ|   n|  I dH   W 5 d}~X Y nL tk
r   d| _	|  I dH   Y n$ tk
r   |  I dH   Y nX t|tr| jst|j|j| _|jr|jd d }|d }|d| _|d | _n"|j}t|jts4t|jj| _| jdkrX|  I dH  t|| _dS )	z/Send a getmore message and handle the response.)r-   NTr   cursorZ	nextBatchr6   r4   ) r9   rA   rB   Z_run_operationrh   r>   r   coder   rH   rD   rT   r_   r   	ExceptionrL   r   r8   r   r^   Zmore_to_comeZfrom_commandZdocsr<   r=   r:   datar   rj   rd   r   r;   )rO   rn   rB   rc   excro   Z	documentsrP   rP   rQ   _send_message   sF    
  



z AsyncCommandCursor._send_messagec                   s   t | js| jrt | jS | jr| jdd\}}| j| j}| 	| 
||| j| j| jj|| j| jjj| j| jd| jI dH  n|  I dH  t | jS )a  Refreshes the cursor with more data from the server.

        Returns the length of self._data after refresh. Will exit early if
        self._data is already non-empty. Raises OperationFailure when the
        cursor cannot be refreshed due to an error on the query.
        .rW   FN)rZ   r;   rH   r:   rK   splitr9   Z_read_preference_forr0   rt   _getmore_classr?   re   rF   rA   rB   r@   r8   rI   rm   )rO   ZdbnameZcollnameZ	read_prefrP   rP   rQ   _refresh$  s.    
zAsyncCommandCursor._refreshzAsyncIterator[_DocumentType]c                 C  s   | S rS   rP   rU   rP   rP   rQ   	__aiter__F  s    zAsyncCommandCursor.__aiter__r   c                   s,   | j r$| dI dH }|dk	r |S q tdS )zAdvance the cursor.TN)ri   	_try_nextStopAsyncIteration)rO   docrP   rP   rQ   nextI  s
    zAsyncCommandCursor.nextc                   s   |   I d H S rS   )r}   rU   rP   rP   rQ   	__anext__S  s    zAsyncCommandCursor.__anext__zOptional[_DocumentType])get_more_allowedr3   c                   s>   t | js"| js"|r"|  I dH  t | jr6| j S dS dS )z<Advance the cursor blocking for at most one getMore command.N)rZ   r;   rH   rx   popleft)rO   r   rP   rP   rQ   rz   V  s
    

zAsyncCommandCursor._try_nextlist)resulttotalr3   c                   s~   t | js| js|  I dH  t | jrv|dkrH|| j | j  n*ttt | j|D ]}|| j	  q\dS dS dS )z4Get all or some available documents from the cursor.NTF)
rZ   r;   rH   rx   extendclearrangeminappendr   )rO   r   r   _rP   rP   rQ   _next_batch_  s    
zAsyncCommandCursor._next_batchc                   s   | j ddI dH S )ar  Advance the cursor without blocking indefinitely.

        This method returns the next document without waiting
        indefinitely for data.

        If no document is cached locally then this method runs a single
        getMore command. If the getMore yields any documents, the next
        document is returned, otherwise, if the getMore returns no documents
        (because there is no additional data) then ``None`` is returned.

        :return: The next document or ``None`` when no document is available
          after running a single getMore or when the cursor is closed.

        .. versionadded:: 4.5
        T)r   N)rz   rU   rP   rP   rQ   try_nextn  s    zAsyncCommandCursor.try_nextc                   s   | S rS   rP   rU   rP   rP   rQ   
__aenter__  s    zAsyncCommandCursor.__aenter__)exc_typeexc_valexc_tbr3   c                   s   |   I d H  d S rS   )r_   )rO   r   r   r   rP   rP   rQ   	__aexit__  s    zAsyncCommandCursor.__aexit__zlist[_DocumentType])lengthr3   c                   s`   g }|}t |tr"|dk r"td| jr\| ||I dH s<q\|dk	r"|t| }|dkr"q\q"|S )a}  Converts the contents of this cursor to a list more efficiently than ``[doc async for doc in cursor]``.

        To use::

          >>> await cursor.to_list()

        Or, so read at most n items from the cursor::

          >>> await cursor.to_list(n)

        If the cursor is empty or has no more results, an empty list will be returned.

        .. versionadded:: 4.9
        rW   z'to_list() length must be greater than 0Nr   )rL   r%   rY   ri   r   rZ   )rO   r   res	remainingrP   rP   rQ   to_list  s    zAsyncCommandCursor.to_list)r   NNFN)NF)N)N)$__name__
__module____qualname____doc__r   rw   rR   rV   r.   r[   propertyr]   r`   rh   ri   rd   r-   r0   rk   rT   rm   rJ   r_   rt   rx   ry   r}   r~   rz   r   r   r   r   r   applyr   rP   rP   rP   rQ   r!   6   sT         (  
	
,"
	r!   c                      sb   e Zd ZeZddddddd	d
ddd	 fddZdddddd
ddddZdddddZ  ZS )AsyncRawBatchCommandCursorr   NFr"   r#   r$   r%   r&   r'   r(   r   r)   r*   c	           	   
     s,   | drtt |||||||| dS )a^  Create a new cursor / iterator over raw batches of BSON data.

        Should not be called directly by application developers -
        see :meth:`~pymongo.asynchronous.collection.AsyncCollection.aggregate_raw_batches`
        instead.

        .. seealso:: The MongoDB documentation on `cursors <https://dochub.mongodb.org/core/cursors>`_.
        r5   N)r<   rj   superrR   rN   	__class__rP   rQ   rR     s    z#AsyncRawBatchCommandCursor.__init__ra   r   r\   zlist[Mapping[str, Any]]rb   c                 C  s"   |j ||d}|st|d  |S )N)rf   r   )raw_responser   )rO   rc   rd   re   rf   rg   r   rP   rP   rQ   rh     s    z+AsyncRawBatchCommandCursor._unpack_responser	   )indexr3   c                 C  s   t dd S )Nz5Cannot call __getitem__ on AsyncRawBatchCommandCursor)r   )rO   r   rP   rP   rQ   __getitem__  s    z&AsyncRawBatchCommandCursor.__getitem__)r   NNFN)NF)	r   r   r   r   rw   rR   rh   r   __classcell__rP   rP   r   rQ   r     s        $$  r   N)1r   
__future__r   collectionsr   typingr   r   r   r   r   r	   r
   r   r   Zbsonr   r   Zpymongor   Zpymongo.asynchronous.cursorr   Zpymongo.cursor_sharedr   Zpymongo.errorsr   r   r   Zpymongo.messager   r   r   r   r   Zpymongo.responser   Zpymongo.typingsr   r   r   Z#pymongo.asynchronous.client_sessionr   Zpymongo.asynchronous.collectionr   Zpymongo.asynchronous.poolr    Z_IS_SYNCr!   r   rP   rP   rP   rQ   <module>   s(   ,  p