U
    ~fhO|                     @  s  d Z ddlmZ ddlZddlZddlZddlmZ ddlm	Z	 ddl
mZmZmZmZmZmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZ ddlm Z  ddl!m"Z" erddl#m$Z$ ddl%m&Z& ddl'm(Z(m)Z) ddl*m+Z+m,Z,m-Z- ddl.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4 ddl5m6Z6 ddl7m8Z8m9Z9m:Z: ddl;m<Z<m=Z=m>Z>m?Z?m@Z@ ddlAmBZB ddlCmDZDmEZEmFZFmGZG ddlHmIZImJZJ ddlKmLZL dZMG dd dZNdS )zIThe client-level bulk write operations interface.

.. versionadded:: 4.9
    )annotationsN)MutableMapping)islice)TYPE_CHECKINGAnyMappingOptionalTypeUnion)ObjectId)RawBSONDocument)_csotcommon)ClientSession_validate_session_write_concern)
Collection)CommandCursor)Database)_handle_reauth)MongoClient)
Connection)_merge_command"_throw_client_bulk_write_exception)validate_is_document_typevalidate_ok_for_replacevalidate_ok_for_update)ConfigurationErrorConnectionFailureInvalidOperationNotPrimaryErrorOperationFailureWaitQueueTimeoutError)_RETRYABLE_ERROR_CODES)_COMMAND_LOGGER_CommandStatusMessage
_debug_log)_ClientBulkWriteContext_convert_client_bulk_exception_convert_exception_convert_write_result_randint)ReadPreference)ClientBulkWriteResultDeleteResultInsertOneResultUpdateResult)_DocumentOut	_Pipeline)WriteConcernTc                   @  s  e Zd ZdZdQddddd	d
dddddZeddddZddddddZdRdddddddddd	ddZdSdddddddd d!d"Z	dTddddddd#d$d%Z
ed&d'd(d)d*d*dd+d,d-d.Zd&d'd(d/d*d*ddd,d0d1Zd&d+d2d3d4d5d6d7Zd&d+d2d3d8d5d9d:Zd'd'd;d<dd=d>d?ZdUdd<d;d(dd'd@ddAdBdCZd<dd'dDdEdFZd;ddGdHdIZd;ddGdJdKZd;ddGdLdMZd<ddNdDdOdPZdS )V_ClientBulkz4The private guts of the client-level bulk write API.TNFr   r2   boolzOptional[bool]zOptional[str]zOptional[Any]None)clientwrite_concernorderedbypass_document_validationcommentletverbose_resultsreturnc                 C  s   || _ || _|| _| jdk	r*td| j || _|| _|| _|| _g | _	g | _
d| _d| _d| _d| _d| _d| _d| _d| _| j jj| _d| _d| _dS )z"Initialize a _ClientBulk instance.Nr;   r   F)r6   r7   r;   r   r   r8   bypass_doc_valr:   r<   ops
namespaces
idx_offset	total_opsexecuteduses_upsertuses_collationuses_array_filtersuses_hint_updateuses_hint_deleteoptionsZretry_writesis_retryableretryingstarted_retryable_write)selfr6   r7   r8   r9   r:   r;   r<    rN   C/tmp/pip-unpacked-wheel-36gvocj8/pymongo/synchronous/client_bulk.py__init__Y   s,    
z_ClientBulk.__init__zType[_ClientBulkWriteContext])r=   c                 C  s   t S )N)r&   )rM   rN   rN   rO   bulk_ctx_class~   s    z_ClientBulk.bulk_ctx_classstrr0   )	namespacedocumentr=   c                 C  s^   t d| t|ts&d|ks&t |d< d|d}| jd|f | j| |  jd7  _dS )z*Add an insert document to the list of ops.rT   _id)insertrT   rW      N)r   
isinstancer   r   r?   appendr@   rB   )rM   rS   rT   cmdrN   rN   rO   
add_insert   s    


z_ClientBulk.add_insertzMapping[str, Any]z#Union[Mapping[str, Any], _Pipeline]zOptional[Mapping[str, Any]]z!Optional[list[Mapping[str, Any]]]z Union[str, dict[str, Any], None])	rS   selectorupdatemultiupsert	collationarray_filtershintr=   c	           
      C  s   t | d|||d}	|dk	r,d| _||	d< |dk	rBd| _||	d< |dk	rXd| _||	d< |dk	rnd| _||	d< |rxd	| _| jd
|	f | j| |  j	d7  _	dS )z8Create an update document and add it to the list of ops.rV   r^   filterZ
updateModsr_   NTr`   ZarrayFiltersrc   ra   Fr^   rX   )
r   rD   rF   rG   rE   rJ   r?   rZ   r@   rB   )
rM   rS   r]   r^   r_   r`   ra   rb   rc   r[   rN   rN   rO   
add_update   s.    z_ClientBulk.add_update)rS   r]   replacementr`   ra   rc   r=   c                 C  s   t | d||dd}|dk	r,d| _||d< |dk	rBd| _||d< |dk	rXd| _||d< | jd	|f | j| |  jd
7  _dS )z8Create a replace document and add it to the list of ops.rV   Frd   NTr`   rc   ra   replacerX   )r   rD   rG   rE   r?   rZ   r@   rB   )rM   rS   r]   rg   r`   ra   rc   r[   rN   rN   rO   add_replace   s$    
z_ClientBulk.add_replace)rS   r]   r_   ra   rc   r=   c                 C  sp   d||d}|dk	r"d| _ ||d< |dk	r8d| _||d< |rBd| _| jd|f | j| |  jd	7  _dS )
z7Create a delete document and add it to the list of ops.rV   )deletere   r_   NTrc   ra   Frj   rX   )rH   rE   rJ   r?   rZ   r@   rB   )rM   rS   r]   r_   ra   rc   r[   rN   rN   rO   
add_delete   s    	z_ClientBulk.add_deleter&   zMutableMapping[str, Any]intzUnion[bytes, dict[str, Any]]zlist[Mapping[str, Any]]zdict[str, Any])bwcr[   
request_idmsgop_docsns_docsr6   r=   c                 C  s:  ||d< ||d< t tjrjtt |jjtj|t	t
||j|||jj|jj|jjd |jjd |jjd |jr||||| z|j|||j}tj |j }	t tjrtt |jjtj|	|t	t
||j|||jj|jj|jjd |jjd |jjd |jr||||	 | j||j W n tk
r4 }
 ztj |j }	t|
tt frh|
j!}nt"|
}t tjrtt |jjtj#|	|t	t
||j|||jj|jj|jjd |jjd |jjt|
t d |jr|$|||	 d|
i}t|
t r| j|
j!|j n| ji |j W 5 d	}
~
X Y nX |S )
zCA proxy for Connection.write_command that handles event publishing.r?   ZnsInfor   rX   clientIdmessagecommandcommandNamedatabaseName	requestIdoperationIddriverConnectionIdserverConnectionId
serverHost
serverPort	serviceIdrs   rt   
durationMSreplyrv   rw   rx   ry   rz   r{   r|   r}   r~   rs   rt   r   failurerv   rw   rx   ry   rz   r{   r|   r}   r~   ZisServerSideErrorerrorN)%r#   isEnabledForloggingDEBUGr%   _topology_settings_topology_idr$   STARTEDnextiterdb_nameconnidserver_connection_idaddress
service_idpublish_startwrite_commandcodecdatetimenow
start_time	SUCCEEDED_succeedr6   Z_process_responsesession	ExceptionrY   r   r    detailsr(   FAILED_fail)rM   rm   r[   rn   ro   rp   rq   r6   r   durationexcr   rN   rN   rO   r      s    








"z_ClientBulk.write_commandbytesc                 C  s2  t tjrZtt |jjtj|t	t
||j|||jj|jj|jjd |jjd |jjd |jrp|||||}z|j||j}tj |j }	|dk	rt|j||}
nfddi}
t tjrtt |jjtj|	|
t	t
||j|||jj|jj|jjd |jjd |jjd |jr&|||
|	 W n tk
r, } ztj |j }	t|trlt|j||j}nt|t r|j}nt!|}t tjrtt |jjtj"|	|t	t
||j|||jj|jj|jjd |jjd |jjt|td |jr|jdk	st#|$|||	 d|i}
W 5 d}~X Y nX |
S )	zAA proxy for Connection.unack_write that handles event publishing.r   rX   rr   Nokr   r   r   )%r#   r   r   r   r%   r   r   r$   r   r   r   r   r   r   r   r   r   r   r   unack_writeZmax_bson_sizer   r   r   r)   namer   r   r   rY   r    r   r   r(   r   AssertionErrorr   )rM   rm   r[   rn   ro   rp   rq   r6   resultr   r   r   r   rN   rN   rO   r   D  s    








z_ClientBulk.unack_writez#list[tuple[str, Mapping[str, Any]]]z	list[str]z7tuple[list[Mapping[str, Any]], list[Mapping[str, Any]]])rm   r[   r?   r@   r=   c           	   	   C  s6   | |||\}}}}| ||||||| j ||fS )z6Executes a batch of bulkWrite server commands (unack).)batch_commandr   r6   )	rM   rm   r[   r?   r@   rn   ro   to_send_ops
to_send_nsrN   rN   rO   _execute_batch_unack  s    z _ClientBulk._execute_batch_unackzGtuple[dict[str, Any], list[Mapping[str, Any]], list[Mapping[str, Any]]]c           
   	   C  s8   | |||\}}}}| ||||||| j}	|	||fS )z4Executes a batch of bulkWrite server commands (ack).)r   r   r6   )
rM   rm   r[   r?   r@   rn   ro   r   r   r   rN   rN   rO   _execute_batch  s    z_ClientBulk._execute_batchr   Optional[ClientSession])full_resultr   r   r   r=   c              
   C  sL  | drHtt| jddd}t||d |j||dk	| jd}|| z|D ]}|d | j }| j	| \}	}
|d s|d	 
| | jr W dS |d rP| jrP|	d
kr|
d d }t|dd}|	dkrd}	t|ddd}|	dkrt|dd}|||	 d |< qPW n@ tk
rF } z |jr*|  t||d< W 5 d}~X Y nX dS )z?Internal helper for processing the server reply command cursor.cursoradminz$cmd.bulkWrite)Zdatabaser   N)r   Zexplicit_sessionr:   idxr   writeErrorsrW   rT   rU   T)acknowledged)r^   rh   r^   )r   Zin_client_bulkrj   ZResultsr   )getr   r   r6   r   r   r:   Z_maybe_pin_connectionrA   r?   rZ   r8   r<   r.   r/   r-   r   alivecloser'   )rM   r   r   r   r   ZcollZ
cmd_cursordocZoriginal_indexZop_typeopZinserted_idresr   rN   rN   rO   _process_results_cursor  sH    

z#_ClientBulk._process_results_cursorzOptional[WriteConcern])r7   r   r   op_id	retryabler   final_write_concernr=   c              	   C  s<  d}d}	| j j}
|| j | | ||	|||
|| j j}| j| jk r8| j| j |jkr`|p^|}ddi}| j |d< | j	|d< |o|j
 }|s|st|| | jdk	r| j|d< | jr| j|d< | jr| j|d	< |r|r| js|  d
| _|||tj| |||| j  || || j | t| j| jd}t| j| jd}|jr| ||||\}}}|}|dr"|d }t|dot|j t!o|j ddt"k}t|t#ot|t$t%f }|r|s|rt&'|}t(| j| j|| t)|| j nt(| j| j|| t)|| j d|d< g |d< |ddt*|k rPd
|d< |d sx||d< t(| j| j|| q8|r|di }|ddt"krt&'|}t(| j| j|| t)|| j | +|||| t(| j| j|| d| _,d| _n| -||||\}}|  jt*|7  _|d s8| j	r8|d r8q8q8dS )z<Internal helper for executing batches of bulkWrite commands.r   	bulkWriterX   
errorsOnlyr8   NbypassDocumentValidationr:   r;   Tr   r   coder   r   ZnErrorsanySuccessfulr   ZwriteConcernErrorF).r6   _event_listenersZvalidate_sessionrQ   codec_optionsrA   rB   Zmax_write_batch_sizer<   r8   Zin_transactionr   Zapply_write_concernr>   r:   r;   rL   Z_start_retryable_writeZ	_apply_tor+   ZPRIMARYZsend_cluster_timeadd_server_apiZapply_timeoutr   r?   r@   r   r   r   hasattrrY   r   dictr"   r   r   r!   copydeepcopyr   r   lenr   rK   r   )rM   r7   r   r   r   r   r   r   r   cmd_name	listenersrm   r[   Znot_in_transactionr?   r@   Z
raw_resultr   _r   r   Zretryable_top_level_errorZretryable_network_errorfullZwcerN   rN   rO   _execute_command  s    








 


z_ClientBulk._execute_command)r   	operationr=   c                   s   ddg g dddddi i i d t  ddddd	 fd
d}jjj|||d  d sr d sr d r~t j  S )z'Execute commands with w=1 WriteConcern.FNr   r   r   r   writeConcernErrorsZ	nInsertedZ	nUpsertedZnMatchedZ	nModifiedZnDeletedZinsertResultsZupdateResultsZdeleteResultsr   r   r4   r5   )r   r   r   r=   c                   s,   |j dk rtdj| ||  d S )N   <MongoClient.bulk_write requires MongoDB server version 8.0+.)max_wire_versionr   r   r7   )r   r   r   r   r   rM   rN   rO   retryable_bulk  s    
z3_ClientBulk.execute_command.<locals>.retryable_bulk)ZbulkZoperation_idr   r   r   )r*   r6   Z_retryable_writerJ   r   r<   )rM   r   r   r   rN   r   rO   execute_commandl  s4    	z_ClientBulk.execute_command)r   r=   c              	   C  s   d}d}| j j}t }| |||||d| j j}| j| jk rddi}| j |d< | j|d< | j	dk	rn| j	|d< dd	i|d
< | j
r| j
|d< | jr| j|d< || t| j| jd}t| j| jd}	| ||||	\}
}|  jt|
7  _q0dS )z=Execute commands with OP_MSG and w=0 writeConcern, unordered.r   r   NrX   r   r8   r   wr   ZwriteConcernr:   r;   )r6   r   r*   rQ   r   rA   rB   r<   r8   r>   r:   r;   r   r   r?   r@   r   r   )rM   r   r   r   r   r   rm   r[   r?   r@   r   r   rN   rN   rO   execute_command_unack_unordered  s:    






z+_ClientBulk.execute_command_unack_unorderedc                 C  s`   ddg g dddddi i i d}t  }t }z| |d||d|| j W n tk
rZ   Y nX dS )z;Execute commands with OP_MSG and w=0 WriteConcern, ordered.FNr   r   )r2   r*   r   r7   r    )rM   r   r   Zinitial_write_concernr   rN   rN   rO   execute_command_unack_ordered  s6    	z)_ClientBulk.execute_command_unack_orderedc                 C  sH   | j rtd| jrtd| jdk	r.td| jr>| |S | |S )z3Execute all operations, returning no results (w=0).z3Collation is unsupported for unacknowledged writes.z6arrayFilters is unsupported for unacknowledged writes.NzGCannot set bypass_document_validation with unacknowledged write concern)rE   r   rF   r>   r    r8   r   r   )rM   r   rN   rN   rO   execute_no_results  s    

z_ClientBulk.execute_no_resultsr   c              
   C  s   | j std| jrtdd| _t|| j}| jjs| j||6}|jdk rXtd| 	| t
dddW  5 Q R  S Q R X | ||}t
|| jj| jS )zExecute operations.zNo operations to executez*Bulk operations can only be executed once.Tr   r   NF)r?   r   rC   r   r7   r   r6   Z_conn_for_writesr   r   r,   r   r<   )rM   r   r   
connectionr   rN   rN   rO   execute  s(    

z_ClientBulk.execute)TNNNF)FNNNN)NNN)NN)N)__name__
__module____qualname____doc__rP   propertyrQ   r\   rf   ri   rk   r   r   r   r   r   r   r   r   r   r   r   r   rN   rN   rN   rO   r3   V   sJ        %      +   #  [Y<  5*%r3   )Or   
__future__r   r   r   r   collections.abcr   	itertoolsr   typingr   r   r   r   r	   r
   Zbson.objectidr   Zbson.raw_bsonr   Zpymongor   r   Z"pymongo.synchronous.client_sessionr   r   Zpymongo.synchronous.collectionr   Z"pymongo.synchronous.command_cursorr   Zpymongo.synchronous.databaser   Zpymongo.synchronous.helpersr   Z pymongo.synchronous.mongo_clientr   Zpymongo.synchronous.poolr   Zpymongo._client_bulk_sharedr   r   Zpymongo.commonr   r   r   Zpymongo.errorsr   r   r   r   r    r!   Zpymongo.helpers_sharedr"   Zpymongo.loggerr#   r$   r%   Zpymongo.messager&   r'   r(   r)   r*   Zpymongo.read_preferencesr+   Zpymongo.resultsr,   r-   r.   r/   Zpymongo.typingsr0   r1   Zpymongo.write_concernr2   Z_IS_SYNCr3   rN   rN   rN   rO   <module>   s<    	 