U
    ~fhx?                     @  s,  d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	m
Z
mZmZmZmZmZmZmZ ddlmZ ddlmZmZmZ ddlmZ ddlmZmZ dd	lmZmZm Z m!Z! dd
l"m#Z#m$Z$m%Z% ddl&m'Z'm(Z(m)Z) ddl*m+Z+ ddl,m-Z-m.Z.m/Z/m0Z0m1Z1 ddl2m3Z3 e
rddlm4Z4 ddl5m6Z6 ddl7m8Z8 ddl9m:Z: ddlm;Z;m<Z<m=Z= ddl*m>Z> ddl?m@Z@ ddlAmBZB ddlCmDZDmEZEmFZFmGZG ddlHmIZI dZJd>dddddd d!d"dd#d$d%d&d'dd(d)ddd*dd+d,d-d.d/ZKefdd&d0d1d2d3d4ZLdd5d6d7d8d9ZMdd0d5d:d;d<d=ZNdS )?z&Internal network layer helper methods.    )annotationsN)TYPE_CHECKINGAnyMappingMutableMappingOptionalSequenceUnioncast)_decode_all_selective)_csothelpers_sharedmessage)MAX_MESSAGE_SIZE)_NO_COMPRESSION
decompress)NotPrimaryErrorOperationFailureProtocolError_OperationCancelled)_COMMAND_LOGGER_CommandStatusMessage
_debug_log)_UNPACK_REPLY_OpMsg_OpReply)_is_speculative_authenticate)_POLL_TIMEOUT_UNPACK_COMPRESSION_HEADER_UNPACK_HEADERBLOCKING_IO_ERRORSasync_sendall)_errno_from_exception)CodecOptions)AsyncClientSession)AsyncMongoClient)AsyncConnection)SnappyContextZlibContextZstdContext)_EventListeners)ReadConcern)_ServerMode)_Address_CollationIn_DocumentOut_DocumentType)WriteConcernFTr&   strzMutableMapping[str, Any]boolzOptional[_ServerMode]zCodecOptions[_DocumentType]zOptional[AsyncClientSession]zOptional[AsyncMongoClient]z#Optional[Sequence[Union[str, int]]]zOptional[_Address]zOptional[_EventListeners]zOptional[int]zOptional[ReadConcern]zOptional[_CollationIn]z4Union[SnappyContext, ZlibContext, ZstdContext, None]zOptional[Mapping[str, Any]]zOptional[WriteConcern]r0   )conndbnamespec	is_mongosread_preferencecodec_optionssessionclientcheckallowable_errorsaddress	listenersmax_bson_sizeread_concernparse_write_concern_error	collationcompression_ctx
use_op_msgunacknowledgeduser_fieldsexhaust_allowedwrite_concernreturnc           (        s  t t|}|d }d}|}|r<|s<|dk	s0tt||}|rj|rJ|jsj|jrZ|j|d< |rj|||  |dk	rz||d< |dk	o|j	}t
j
 }|rt||}|r| tkrd}|r|jr|jjs|j|||I dH  }}|r| || t|| |rn|rtjnd}||r tjndO }tj||||||d\}}}} |r|dk	r| |krt||| ntd|dd|d||\}}}|dk	r||tj krt|||tj  |dk	rttjrt t|j!j"t#j$|t t||||| j%| j&| j'd | j'd	 | j(d
 |rP|dk	s(t|
dk	s6t|j)||||
| j&| j(d zt*| j+|I dH  |r~|r~d}!dd	i}"n`t,| |I dH }!|!j-| _-|!j.||d}#|#d }"|r|/|"|I dH  |rt0j1|"| j2|	|d W n t3k
r }$ zt
j
 | }%t4|$t5t6fr|$j7}&n
t8|$}&|dk	rttjrt t|j!j"t#j9|%|&t t||||| j%| j&| j'd | j'd	 | j(t4|$t6d |r|dk	st|
dk	st|j:|%|&|||
| j&| j(|d  W 5 d}$~$X Y nX t
j
 | }%|dk	rLttjrLt t|j!j"t#j;|%|"t t||||| j%| j&| j'd | j'd	 | j(d|kd |r|dk	s`t|
dk	snt|j<|%|"|||
| j&| j(||d	 |r|jr|!r|j=|!> I dH }'t?dt@|'||d }"|"S )a  Execute a command over the socket, or raise socket.error.

    :param conn: a AsyncConnection instance
    :param dbname: name of the database on which to run the command
    :param spec: a command document as an ordered dict type, eg SON.
    :param is_mongos: are we connected to a mongos?
    :param read_preference: a read preference
    :param codec_options: a CodecOptions instance
    :param session: optional AsyncClientSession instance.
    :param client: optional AsyncMongoClient instance for updating $clusterTime.
    :param check: raise OperationFailure if there are errors
    :param allowable_errors: errors to ignore if `check` is True
    :param address: the (host, port) of `conn`
    :param listeners: An instance of :class:`~pymongo.monitoring.EventListeners`
    :param max_bson_size: The maximum encoded bson size for this server
    :param read_concern: The read concern for this command.
    :param parse_write_concern_error: Whether to parse the ``writeConcernError``
        field in the command response.
    :param collation: The collation for this command.
    :param compression_ctx: optional compression Context.
    :param use_op_msg: True if we should use OP_MSG.
    :param unacknowledged: True if this is an unacknowledged command.
    :param user_fields: Response fields that should be decoded
        using the TypeDecoders from codec_options, passed to
        bson._decode_all_selective.
    :param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed.
    z.$cmdFNZreadConcernrC   r   )ctx   )clientIdr   commandcommandNamedatabaseName	requestIdoperationIddriverConnectionIdserverConnectionId
serverHost
serverPort	serviceId)
service_idok)r9   rG   )rB   )rN   r   
durationMSfailurerP   rQ   rR   rS   rT   rU   rV   rW   rX   ZisServerSideError)rY   database_nameZspeculativeAuthenticate)rN   r   r[   replyrP   rQ   rR   rS   rT   rU   rV   rW   rX   Zspeculative_authenticate)rY   speculative_hellor]   r/   )AnextiterAssertionErrorr   Z_maybe_add_read_preferenceZin_transactionleveldocumentZ_update_read_concernZenabled_for_commandsdatetimenowr   lowerr   Z
_encrypterZ_bypass_auto_encryptionZencryptZapply_timeoutr   Zapply_write_concernr   ZMORE_TO_COMEZEXHAUST_ALLOWEDZ_op_msgZ_raise_document_too_largeZ_queryZ_COMMAND_OVERHEADr   isEnabledForloggingDEBUGr   Z_topology_settingsZ_topology_idr   ZSTARTEDidZserver_connection_idr>   rY   Zpublish_command_startr!   r4   receive_messageZmore_to_comeZunpack_responseZ_process_responser   Z_check_command_responseZmax_wire_version	Exception
isinstancer   r   detailsZ_convert_exceptionZFAILEDZpublish_command_failureZ	SUCCEEDEDZpublish_command_successZdecryptZraw_command_responser
   r   )(r4   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r?   r@   rA   rB   rC   rD   rE   rF   rG   rH   rI   namensr_   origpublishstartflags
request_idmsgsizeZmax_doc_sizer^   Zresponse_docZunpacked_docsexcdurationr\   Z	decrypted r{   @/tmp/pip-unpacked-wheel-36gvocj8/pymongo/asynchronous/network.pyrO   H   sd   3


            


	
 






 rO   intzUnion[_OpReply, _OpMsg])r4   rv   max_message_sizerJ   c              	     sH  t  rt  }n | j }|r.t | }nd}tt| d|I dH \}}}}|dk	rt||krtt	d|d||dkrt	d|d||krt	d|d|d|d	krt
t| d
|I dH \}}}	tt| |d |I dH |	}
nt| |d |I dH }
zt| }W n0 tk
r>   t	d|dt dY nX ||
S )z1Receive a raw BSON message or raise socket.error.N   zGot response id z but expected zMessage length (z3) not longer than standard message header size (16)z*) is larger than server max message size ()i  	      zGot opcode )r   get_timeoutZget_deadliner4   
gettimeouttime	monotonicr   _receive_data_on_socketr   r   r   r   KeyErrorkeys)r4   rv   r~   deadlinetimeoutlength_Zresponse_toZop_codeZcompressor_iddataZunpack_replyr{   r{   r|   rl   4  sF    



rl   zOptional[float]None)r4   r   rJ   c                   s   | j }d}| dkrdS t|dr6| dkr6d}nD|rd|t  }|dkrRd}tt|td}nt}| j	j
|d|d}| jjrtd|rdS |rtd	tdI dH  qdS )
zABlock until at least one byte is read, or a timeout, or a cancel.FrL   Npendingr   T)readr   zoperation cancelled	timed out)r4   filenohasattrr   r   r   maxminr   Zsocket_checkerselectZcancel_contextZ	cancelledr   socketr   asynciosleep)r4   r   sockZ	timed_outreadable	remainingr   r{   r{   r|   wait_for_readb  s(    
r   
memoryview)r4   r   r   rJ   c              
     s   t |}t|}d}||k rzPt| |I d H  t rV|d k	rV| t|t  d | j	
||d  }W nX tk
r   tdd Y n: tk
r } zt|tjkrW Y q W 5 d }~X Y nX |dkrtd||7 }q|S )Nr   r   zconnection closed)	bytearrayr   r   r   r   Zset_conn_timeoutr   r   r   r4   	recv_intor    r   r   OSErrorr"   errnoZEINTR)r4   r   r   bufmv
bytes_readZchunk_lengthry   r{   r{   r|   r     s&    
r   )TNNNNNFNNFFNFN)O__doc__
__future__r   r   re   r   ri   r   r   typingr   r   r   r   r   r   r	   r
   Zbsonr   Zpymongor   r   r   Zpymongo.commonr   Zpymongo.compression_supportr   r   Zpymongo.errorsr   r   r   r   Zpymongo.loggerr   r   r   Zpymongo.messager   r   r   Zpymongo.monitoringr   Zpymongo.network_layerr   r   r   r    r!   Zpymongo.socket_checkerr"   r#   Z#pymongo.asynchronous.client_sessionr$   Z!pymongo.asynchronous.mongo_clientr%   Zpymongo.asynchronous.poolr&   r'   r(   r)   r*   Zpymongo.read_concernr+   Zpymongo.read_preferencesr,   Zpymongo.typingsr-   r.   r/   r0   Zpymongo.write_concernr1   Z_IS_SYNCrO   rl   r   r   r{   r{   r{   r|   <module>   sd   (              < n."