U
    ~fh}                     @  s  d Z ddlmZ ddlZddlZddlZddlmZ ddlm	Z	 ddl
mZmZmZmZmZmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZ ddlm Z  ddl!m"Z" erddl#m$Z$ ddl%m&Z& ddl'm(Z(m)Z) ddl*m+Z+m,Z,m-Z- ddl.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4 ddl5m6Z6 ddl7m8Z8m9Z9m:Z: ddl;m<Z<m=Z=m>Z>m?Z?m@Z@ ddlAmBZB ddlCmDZDmEZEmFZFmGZG ddlHmIZImJZJ ddlKmLZL dZMG dd dZNdS )zIThe client-level bulk write operations interface.

.. versionadded:: 4.9
    )annotationsN)MutableMapping)islice)TYPE_CHECKINGAnyMappingOptionalTypeUnion)ObjectId)RawBSONDocument)_csotcommon)AsyncClientSession_validate_session_write_concern)AsyncCollection)AsyncCommandCursor)AsyncDatabase)_handle_reauth)AsyncMongoClient)AsyncConnection)_merge_command"_throw_client_bulk_write_exception)validate_is_document_typevalidate_ok_for_replacevalidate_ok_for_update)ConfigurationErrorConnectionFailureInvalidOperationNotPrimaryErrorOperationFailureWaitQueueTimeoutError)_RETRYABLE_ERROR_CODES)_COMMAND_LOGGER_CommandStatusMessage
_debug_log)_ClientBulkWriteContext_convert_client_bulk_exception_convert_exception_convert_write_result_randint)ReadPreference)ClientBulkWriteResultDeleteResultInsertOneResultUpdateResult)_DocumentOut	_Pipeline)WriteConcernFc                   @  s  e Zd ZdZdQddddd	d
dddddZeddddZddddddZdRdddddddddd	ddZdSdddddddd d!d"Z	dTddddddd#d$d%Z
ed&d'd(d)d*d*dd+d,d-d.Zd&d'd(d/d*d*ddd,d0d1Zd&d+d2d3d4d5d6d7Zd&d+d2d3d8d5d9d:Zd'd'd;d<dd=d>d?ZdUdd<d;d(dd'd@ddAdBdCZd<dd'dDdEdFZd;ddGdHdIZd;ddGdJdKZd;ddGdLdMZd<ddNdDdOdPZdS )V_AsyncClientBulkz4The private guts of the client-level bulk write API.TNFr   r2   boolzOptional[bool]zOptional[str]zOptional[Any]None)clientwrite_concernorderedbypass_document_validationcommentletverbose_resultsreturnc                 C  s   || _ || _|| _| jdk	r*td| j || _|| _|| _|| _g | _	g | _
d| _d| _d| _d| _d| _d| _d| _d| _| j jj| _d| _d| _dS )z'Initialize a _AsyncClientBulk instance.Nr;   r   F)r6   r7   r;   r   r   r8   bypass_doc_valr:   r<   ops
namespaces
idx_offset	total_opsexecuteduses_upsertuses_collationuses_array_filtersuses_hint_updateuses_hint_deleteoptionsZretry_writesis_retryableretryingstarted_retryable_write)selfr6   r7   r8   r9   r:   r;   r<    rN   D/tmp/pip-unpacked-wheel-36gvocj8/pymongo/asynchronous/client_bulk.py__init__Y   s,    
z_AsyncClientBulk.__init__zType[_ClientBulkWriteContext])r=   c                 C  s   t S )N)r&   )rM   rN   rN   rO   bulk_ctx_class~   s    z_AsyncClientBulk.bulk_ctx_classstrr0   )	namespacedocumentr=   c                 C  s^   t d| t|ts&d|ks&t |d< d|d}| jd|f | j| |  jd7  _dS )z*Add an insert document to the list of ops.rT   _id)insertrT   rW      N)r   
isinstancer   r   r?   appendr@   rB   )rM   rS   rT   cmdrN   rN   rO   
add_insert   s    


z_AsyncClientBulk.add_insertzMapping[str, Any]z#Union[Mapping[str, Any], _Pipeline]zOptional[Mapping[str, Any]]z!Optional[list[Mapping[str, Any]]]z Union[str, dict[str, Any], None])	rS   selectorupdatemultiupsert	collationarray_filtershintr=   c	           
      C  s   t | d|||d}	|dk	r,d| _||	d< |dk	rBd| _||	d< |dk	rXd| _||	d< |dk	rnd| _||	d< |rxd	| _| jd
|	f | j| |  j	d7  _	dS )z8Create an update document and add it to the list of ops.rV   r^   filterZ
updateModsr_   NTr`   ZarrayFiltersrc   ra   Fr^   rX   )
r   rD   rF   rG   rE   rJ   r?   rZ   r@   rB   )
rM   rS   r]   r^   r_   r`   ra   rb   rc   r[   rN   rN   rO   
add_update   s.    z_AsyncClientBulk.add_update)rS   r]   replacementr`   ra   rc   r=   c                 C  s   t | d||dd}|dk	r,d| _||d< |dk	rBd| _||d< |dk	rXd| _||d< | jd	|f | j| |  jd
7  _dS )z8Create a replace document and add it to the list of ops.rV   Frd   NTr`   rc   ra   replacerX   )r   rD   rG   rE   r?   rZ   r@   rB   )rM   rS   r]   rg   r`   ra   rc   r[   rN   rN   rO   add_replace   s$    
z_AsyncClientBulk.add_replace)rS   r]   r_   ra   rc   r=   c                 C  sp   d||d}|dk	r"d| _ ||d< |dk	r8d| _||d< |rBd| _| jd|f | j| |  jd	7  _dS )
z7Create a delete document and add it to the list of ops.rV   )deletere   r_   NTrc   ra   Frj   rX   )rH   rE   rJ   r?   rZ   r@   rB   )rM   rS   r]   r_   ra   rc   r[   rN   rN   rO   
add_delete   s    	z_AsyncClientBulk.add_deleter&   zMutableMapping[str, Any]intzUnion[bytes, dict[str, Any]]zlist[Mapping[str, Any]]zdict[str, Any])bwcr[   
request_idmsgop_docsns_docsr6   r=   c                   sR  ||d< ||d< t tjrjtt |jjtj|t	t
||j|||jj|jj|jjd |jjd |jjd |jr||||| z|j|||jI dH }tj |j }	t tjrtt |jjtj|	|t	t
||j|||jj|jj|jjd |jjd |jjd |jr||||	 | j||jI dH  W n tk
rL }
 ztj |j }	t|
tt frt|
j!}nt"|
}t tjrtt |jjtj#|	|t	t
||j|||jj|jj|jjd |jjd |jjt|
t d |jr|$|||	 d	|
i}t|
t r&| j|
j!|jI dH  n| ji |jI dH  W 5 d}
~
X Y nX |S )
zHA proxy for AsyncConnection.write_command that handles event publishing.r?   ZnsInfor   rX   clientIdmessagecommandcommandNamedatabaseName	requestIdoperationIddriverConnectionIdserverConnectionId
serverHost
serverPort	serviceIdNrs   rt   
durationMSreplyrv   rw   rx   ry   rz   r{   r|   r}   r~   rs   rt   r   failurerv   rw   rx   ry   rz   r{   r|   r}   r~   ZisServerSideErrorerror)%r#   isEnabledForloggingDEBUGr%   _topology_settings_topology_idr$   STARTEDnextiterdb_nameconnidserver_connection_idaddress
service_idpublish_startwrite_commandcodecdatetimenow
start_time	SUCCEEDED_succeedr6   Z_process_responsesession	ExceptionrY   r   r    detailsr(   FAILED_fail)rM   rm   r[   rn   ro   rp   rq   r6   r   durationexcr   rN   rN   rO   r      s    








(z_AsyncClientBulk.write_commandbytesc                   s8  t tjrZtt |jjtj|t	t
||j|||jj|jj|jjd |jjd |jjd |jrp|||||}z|j||jI dH }tj |j }	|dk	rt|j||}
nfddi}
t tjrtt |jjtj|	|
t	t
||j|||jj|jj|jjd |jjd |jjd |jr,|||
|	 W n tk
r2 } ztj |j }	t|trrt|j||j}nt|t r|j}nt!|}t tjrtt |jjtj"|	|t	t
||j|||jj|jj|jjd |jjd |jjt|td |jr|jdk	st#|$|||	 d|i}
W 5 d}~X Y nX |
S )	zFA proxy for AsyncConnection.unack_write that handles event publishing.r   rX   rr   Nokr   r   r   )%r#   r   r   r   r%   r   r   r$   r   r   r   r   r   r   r   r   r   r   r   unack_writeZmax_bson_sizer   r   r   r)   namer   r   r   rY   r    r   r   r(   r   AssertionErrorr   )rM   rm   r[   rn   ro   rp   rq   r6   resultr   r   r   r   rN   rN   rO   r   D  s    








z_AsyncClientBulk.unack_writez#list[tuple[str, Mapping[str, Any]]]z	list[str]z7tuple[list[Mapping[str, Any]], list[Mapping[str, Any]]])rm   r[   r?   r@   r=   c           	   	     s<   | |||\}}}}| ||||||| jI dH  ||fS )z6Executes a batch of bulkWrite server commands (unack).N)batch_commandr   r6   )	rM   rm   r[   r?   r@   rn   ro   to_send_ops
to_send_nsrN   rN   rO   _execute_batch_unack  s    z%_AsyncClientBulk._execute_batch_unackzGtuple[dict[str, Any], list[Mapping[str, Any]], list[Mapping[str, Any]]]c           
   	     s>   | |||\}}}}| ||||||| jI dH }	|	||fS )z4Executes a batch of bulkWrite server commands (ack).N)r   r   r6   )
rM   rm   r[   r?   r@   rn   ro   r   r   r   rN   rN   rO   _execute_batch  s          
z_AsyncClientBulk._execute_batchr   Optional[AsyncClientSession])full_resultr   r   r   r=   c              
     sd  | dr`tt| jddd}t||d |j||dk	| jd}||I dH  z|2 z3 dH W }|d | j }| j	| \}	}
|d s|d	 
| | jr W dS |d rV| jrV|	d
kr|
d d }t|dd}|	dkrd}	t|ddd}|	dkr t|dd}|||	 d |< qV6 W nF tk
r^ } z&|jrB| I dH  t||d< W 5 d}~X Y nX dS )z?Internal helper for processing the server reply command cursor.cursoradminz$cmd.bulkWrite)Zdatabaser   N)r   Zexplicit_sessionr:   idxr   writeErrorsrW   rT   rU   T)acknowledged)r^   rh   r^   )r   Zin_client_bulkrj   ZResultsr   )getr   r   r6   r   r   r:   Z_maybe_pin_connectionrA   r?   rZ   r8   r<   r.   r/   r-   r   alivecloser'   )rM   r   r   r   r   ZcollZ
cmd_cursordocZoriginal_indexZop_typeopZinserted_idresr   rN   rN   rO   _process_results_cursor  sH    

z(_AsyncClientBulk._process_results_cursorzOptional[WriteConcern])r7   r   r   op_id	retryabler   final_write_concernr=   c              	     sN  d}d}	| j j}
|| j | | ||	|||
|| j j}| j| jk rJ| j| j |jkr`|p^|}ddi}| j |d< | j	|d< |o|j
 }|s|st|| | jdk	r| j|d< | jr| j|d< | jr| j|d	< |r|r| js|  d
| _|||tj| |||| j  || || j | t| j| jd}t| j| jd}|jr | ||||I dH \}}}|}|dr(|d }t|dot|j t!o|j ddt"k}t|t#ot|t$t%f }|r
|s|r
t&'|}t(| j| j|| t)|| j nt(| j| j|| t)|| j d|d< g |d< |ddt*|k rVd
|d< |d s~||d< t(| j| j|| qJ|r|di }|ddt"krt&'|}t(| j| j|| t)|| j | +||||I dH  t(| j| j|| d| _,d| _n| -||||I dH \}}|  jt*|7  _|d sJ| j	r8|d r8qJq8dS )z<Internal helper for executing batches of bulkWrite commands.r   	bulkWriterX   
errorsOnlyr8   NbypassDocumentValidationr:   r;   Tr   r   coder   r   ZnErrorsanySuccessfulr   ZwriteConcernErrorF).r6   _event_listenersZvalidate_sessionrQ   codec_optionsrA   rB   Zmax_write_batch_sizer<   r8   Zin_transactionr   Zapply_write_concernr>   r:   r;   rL   Z_start_retryable_writeZ	_apply_tor+   ZPRIMARYZsend_cluster_timeadd_server_apiZapply_timeoutr   r?   r@   r   r   r   hasattrrY   r   dictr"   r   r   r!   copydeepcopyr   r   lenr   rK   r   )rM   r7   r   r   r   r   r   r   r   cmd_name	listenersrm   r[   Znot_in_transactionr?   r@   Z
raw_resultr   _r   r   Zretryable_top_level_errorZretryable_network_errorfullZwcerN   rN   rO   _execute_command  s    








 


z!_AsyncClientBulk._execute_command)r   	operationr=   c                   s   ddg g dddddi i i d t  ddddd	 fd
d}jjj|||dI dH   d sx d sx d rt j  S )z'Execute commands with w=1 WriteConcern.FNr   r   r   r   writeConcernErrorsZ	nInsertedZ	nUpsertedZnMatchedZ	nModifiedZnDeletedZinsertResultsZupdateResultsZdeleteResultsr   r   r4   r5   )r   r   r   r=   c                   s2   |j dk rtdj| || I d H  d S )N   <MongoClient.bulk_write requires MongoDB server version 8.0+.)max_wire_versionr   r   r7   )r   r   r   r   r   rM   rN   rO   retryable_bulk  s    
z8_AsyncClientBulk.execute_command.<locals>.retryable_bulk)ZbulkZoperation_idr   r   r   )r*   r6   Z_retryable_writerJ   r   r<   )rM   r   r   r   rN   r   rO   execute_commandn  s4    	z _AsyncClientBulk.execute_command)r   r=   c              	     s   d}d}| j j}t }| |||||d| j j}| j| jk rddi}| j |d< | j|d< | j	dk	rn| j	|d< dd	i|d
< | j
r| j
|d< | jr| j|d< || t| j| jd}t| j| jd}	| ||||	I dH \}
}|  jt|
7  _q0dS )z=Execute commands with OP_MSG and w=0 writeConcern, unordered.r   r   NrX   r   r8   r   wr   ZwriteConcernr:   r;   )r6   r   r*   rQ   r   rA   rB   r<   r8   r>   r:   r;   r   r   r?   r@   r   r   )rM   r   r   r   r   r   rm   r[   r?   r@   r   r   rN   rN   rO   execute_command_unack_unordered  s:    






z0_AsyncClientBulk.execute_command_unack_unorderedc                   sf   ddg g dddddi i i d}t  }t }z"| |d||d|| jI dH  W n tk
r`   Y nX dS )z;Execute commands with OP_MSG and w=0 WriteConcern, ordered.FNr   r   )r2   r*   r   r7   r    )rM   r   r   Zinitial_write_concernr   rN   rN   rO   execute_command_unack_ordered  s6    	z._AsyncClientBulk.execute_command_unack_orderedc                   sT   | j rtd| jrtd| jdk	r.td| jrD| |I dH S | |I dH S )z3Execute all operations, returning no results (w=0).z3Collation is unsupported for unacknowledged writes.z6arrayFilters is unsupported for unacknowledged writes.NzGCannot set bypass_document_validation with unacknowledged write concern)rE   r   rF   r>   r    r8   r   r   )rM   r   rN   rN   rO   execute_no_results  s    
z#_AsyncClientBulk.execute_no_resultsr   c                   s   | j std| jrtdd| _t|| j}| jjs| j||I dH 4 I dH B}|jdk rftd| 	|I dH  t
dddW  5 Q I dH R  S Q I dH R X | ||I dH }t
|| jj| jS )zExecute operations.zNo operations to executez*Bulk operations can only be executed once.TNr   r   F)r?   r   rC   r   r7   r   r6   Z_conn_for_writesr   r   r,   r   r<   )rM   r   r   
connectionr   rN   rN   rO   execute  s(    
*z_AsyncClientBulk.execute)TNNNF)FNNNN)NNN)NN)N)__name__
__module____qualname____doc__rP   propertyrQ   r\   rf   ri   rk   r   r   r   r   r   r   r   r   r   r   r   r   rN   rN   rN   rO   r3   V   sJ        %      +   #  [Y<  5*%r3   )Or   
__future__r   r   r   r   collections.abcr   	itertoolsr   typingr   r   r   r   r	   r
   Zbson.objectidr   Zbson.raw_bsonr   Zpymongor   r   Z#pymongo.asynchronous.client_sessionr   r   Zpymongo.asynchronous.collectionr   Z#pymongo.asynchronous.command_cursorr   Zpymongo.asynchronous.databaser   Zpymongo.asynchronous.helpersr   Z!pymongo.asynchronous.mongo_clientr   Zpymongo.asynchronous.poolr   Zpymongo._client_bulk_sharedr   r   Zpymongo.commonr   r   r   Zpymongo.errorsr   r   r   r   r    r!   Zpymongo.helpers_sharedr"   Zpymongo.loggerr#   r$   r%   Zpymongo.messager&   r'   r(   r)   r*   Zpymongo.read_preferencesr+   Zpymongo.resultsr,   r-   r.   r/   Zpymongo.typingsr0   r1   Zpymongo.write_concernr2   Z_IS_SYNCr3   rN   rN   rN   rO   <module>   s<    	 