U
    ~fhfo                     @  s  d Z ddlmZ ddlZddlZddlZddlmZ ddlm	Z	 ddl
mZmZmZmZmZmZmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZmZmZmZ ddlm Z m!Z!m"Z" ddl#m$Z$m%Z%m&Z&m'Z' ddl(m)Z) ddl*m+Z+m,Z,m-Z- ddl.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6 ddl7m8Z8 ddl9m:Z:m;Z; ddl<m=Z= ddl>m?Z? erddl@mAZA ddlBmCZC ddlDmEZE ddlFmGZGmHZHmIZI dZJG dd dZKdS )z<The bulk write operations interface.

.. versionadded:: 2.7
    )annotationsN)MutableMapping)islice)TYPE_CHECKINGAnyIteratorMappingOptionalTypeUnion)ObjectId)RawBSONDocument)_csotcommon)	_COMMANDS_DELETE_ALL_merge_command_raise_bulk_write_error_Run)validate_is_document_typevalidate_ok_for_replacevalidate_ok_for_update)ConfigurationErrorInvalidOperationNotPrimaryErrorOperationFailure)_RETRYABLE_ERROR_CODES)_COMMAND_LOGGER_CommandStatusMessage
_debug_log)_DELETE_INSERT_UPDATE_BulkWriteContext_convert_exception_convert_write_result_EncryptedBulkWriteContext_randint)ReadPreference)ClientSession_validate_session_write_concern)_handle_reauth)WriteConcern)
Collection)MongoClient)
Connection)_DocumentOut_DocumentType	_PipelineTc                   @  s  e Zd ZdZdTdddddddd	d
ZeddddZdddddZdUdddddddddddZdVdddddddddZ	dWdddddd d!d"Z
d#dd$d%Zd&dd'd(Zed)d*dd+d,d-d.d/d0d1Zd)d*dd+dd,d-dd2d3d4Zd5d.d,d-d,d6d7d8Zd5d.d,d-d9d6d:d;ZdXd<d=d>d?ddd*d@ddA	dBdCZd<d=d>dDd.dEdFdGZd?d<ddHdIdJZd?d<d=ddKdLdMZd?d<d=ddKdNdOZd=d>dDdPdQdRdSZdS )Y_Bulkz'The private guts of the bulk write API.NzCollection[_DocumentType]boolzOptional[str]zOptional[Any]None)
collectionorderedbypass_document_validationcommentletreturnc                 C  s   |j |jjdtdd| _|| _| jdk	r8td| j || _|| _	g | _
d| _|| _d| _d| _d| _d| _d| _d| _d| _d| _d| _d| _dS )zInitialize a _Bulk instance.replace)Zunicode_decode_error_handlerZdocument_class)codec_optionsNr:   FT)Zwith_optionsr=   _replacedictr6   r:   r   r   r9   r7   opsexecutedbypass_doc_valuses_collationuses_array_filtersuses_hint_updateuses_hint_deleteis_retryableretryingstarted_retryable_writecurrent_runnext_runis_encrypted)selfr6   r7   r8   r9   r:    rN   </tmp/pip-unpacked-wheel-36gvocj8/pymongo/synchronous/bulk.py__init__V   s0    	 
z_Bulk.__init__zType[_BulkWriteContext])r;   c                 C  s.   | j jjj}|r |js d| _tS d| _tS d S )NTF)r6   databaseclientZ
_encrypterZ_bypass_auto_encryptionrL   r&   r#   )rM   Z	encrypterrN   rN   rO   bulk_ctx_classx   s    
z_Bulk.bulk_ctx_classr0   )documentr;   c                 C  s:   t d| t|ts&d|ks&t |d< | jt|f dS )z*Add an insert document to the list of ops.rT   Z_idN)r   
isinstancer   r   r@   appendr!   )rM   rT   rN   rN   rO   
add_insert   s    

z_Bulk.add_insertFzMapping[str, Any]z#Union[Mapping[str, Any], _Pipeline]zOptional[Mapping[str, Any]]z!Optional[list[Mapping[str, Any]]]z Union[str, dict[str, Any], None])selectorupdatemultiupsert	collationarray_filtershintr;   c           	      C  s   t | td|fd|fd|fd|fg}|dk	r>d| _||d< |dk	rTd| _||d< |dk	rjd| _||d	< |rtd
| _| jt|f dS )z8Create an update document and add it to the list of ops.qurZ   r[   NTr\   ZarrayFiltersr^   F)	r   r?   rC   rD   rE   rG   r@   rV   r"   )	rM   rX   rY   rZ   r[   r\   r]   r^   cmdrN   rN   rO   
add_update   s     z_Bulk.add_update)rX   replacementr[   r\   r^   r;   c                 C  sV   t | ||d|d}|dk	r,d| _||d< |dk	rBd| _||d< | jt|f dS )z8Create a replace document and add it to the list of ops.F)r_   r`   rZ   r[   NTr\   r^   )r   rC   rE   r@   rV   r"   )rM   rX   rc   r[   r\   r^   ra   rN   rN   rO   add_replace   s    	z_Bulk.add_replaceint)rX   limitr\   r^   r;   c                 C  sX   ||d}|dk	r d| _ ||d< |dk	r6d| _||d< |tkrDd| _| jt|f dS )z7Create a delete document and add it to the list of ops.)r_   rf   NTr\   r^   F)rC   rF   r   rG   r@   rV   r    )rM   rX   rf   r\   r^   ra   rN   rN   rO   
add_delete   s    
z_Bulk.add_deletezIterator[Optional[_Run]]c                 c  s\   d}t | jD ]B\}\}}|dkr,t|}n|j|krD|V  t|}||| q|V  dS )ziGenerate batches of operations, batched by type of
        operation, in the order **provided**.
        N)	enumerater@   r   op_typeadd)rM   runidxri   	operationrN   rN   rO   gen_ordered   s    

z_Bulk.gen_orderedzIterator[_Run]c                 c  sX   t tt tt tg}t| jD ]\}\}}|| || q |D ]}|jrB|V  qBdS )zbGenerate batches of operations, batched by type of
        operation, in arbitrary order.
        N)r   r!   r"   r    rh   r@   rj   )rM   
operationsrl   ri   rm   rk   rN   rN   rO   gen_unordered   s    z_Bulk.gen_unorderedr#   zMutableMapping[str, Any]byteszlist[Mapping[str, Any]]r.   zdict[str, Any])bwcra   
request_idmsgdocsrR   r;   c                 C  s  |||j < ttjrdtt|jjtj	|t
t||j|||jj|jj|jjd |jjd |jjd |jrx|||| z|j|||j}tj |j }ttjrtt|jjtj||t
t||j|||jj|jj|jjd |jjd |jjd |jr|||| |||j W n tk
r }	 ztj |j }t|	tt frZ|	j!}
nt"|	}
ttjrtt|jjtj#||
t
t||j|||jj|jj|jjd |jjd |jjt|	t d |jr|$||
| t|	tt fr||	j!|j  W 5 d}	~	X Y nX |S )zCA proxy for SocketInfo.write_command that handles event publishing.r      clientIdmessagecommandcommandNamedatabaseName	requestIdoperationIddriverConnectionIdserverConnectionId
serverHost
serverPort	serviceIdrx   ry   
durationMSreplyr{   r|   r}   r~   r   r   r   r   r   rx   ry   r   failurer{   r|   r}   r~   r   r   r   r   r   ZisServerSideErrorN)%fieldr   isEnabledForloggingDEBUGr   _topology_settings_topology_idr   STARTEDnextiterdb_nameconnidserver_connection_idaddress
service_idpublish_startwrite_commandcodecdatetimenow
start_time	SUCCEEDED_succeedZ_process_responsesession	ExceptionrU   r   r   detailsr$   FAILED_fail)rM   rr   ra   rs   rt   ru   rR   r   durationexcr   rN   rN   rO   r      s    









z_Bulk.write_command)rr   ra   rs   rt   max_doc_sizeru   rR   r;   c                 C  s&  t tjrZtt |jjtj|t	t
||j|||jj|jj|jjd |jjd |jjd |jrn||||}z|j||}tj |j }	|dk	rt|j||}
nfddi}
t tjrtt |jjtj|	|
t	t
||j|||jj|jj|jjd |jjd |jjd |jr"|||
|	 W n tk
r  } ztj |j }	t|trft|j||j}nt|trz|j}nt |}t tjrtt |jjtj!|	|t	t
||j|||jj|jj|jjd |jjd |jjt|td |jr|jdk	s t"|#|||	  W 5 d}~X Y nX |S )zAA proxy for Connection.unack_write that handles event publishing.r   rv   rw   Nokr   r   )$r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   unack_writer   r   r   r%   namer   r   r   rU   r   r   r   r$   r   AssertionErrorr   )rM   rr   ra   rs   rt   r   ru   rR   resultr   r   r   r   rN   rN   rO   r   >  s    








z_Bulk.unack_writez4Union[_BulkWriteContext, _EncryptedBulkWriteContext])rr   ra   r@   rR   r;   c           
   	   C  sf   | j r:|||\}}}|jj|j|tdd|j|d n(|||\}}	}| ||||	d|| |S )Nr   )w)write_concernr   rR   )rL   batch_commandr   rz   r   r,   r   r   )
rM   rr   ra   r@   rR   _batched_cmdto_sendrs   rt   rN   rN   rO   _execute_batch_unack  s    z_Bulk._execute_batch_unackz.tuple[dict[str, Any], list[Mapping[str, Any]]]c                 C  sd   | j r6|||\}}}|jj|j||j|j|d}n&|||\}	}
}| |||	|
||}||fS )N)r=   r   rR   )rL   r   r   rz   r   r   r   r   )rM   rr   ra   r@   rR   r   r   r   r   rs   rt   rN   rN   rO   _execute_batch  s    z_Bulk._execute_batchzIterator[Any]r,   Optional[ClientSession]r/   zOptional[WriteConcern])		generatorr   r   r   op_id	retryablefull_resultfinal_write_concernr;   c	              
   C  sn  | j jj}	| j jj}
|
j}| js0t|| _d | _| j}||
| d}|rj| j	slt|d | _| jd krld}t
|j }| |	||||||j| j j}|jt|jk rF|rt|j|j dkr|p|}|| j jd| ji}| jr| j|d< t|| | jrd|d< | jd k	r*|jttfkr*| j|d< |r^|rL| jsL|  d| _|||tj| ||||
 || | |
| t!|j|jd }|j"r"| #||||
\}}|$di }|$d	d
t%krt&'|}t(|||j| t)| t(|||j| d| _	d| _| jr2d|kr2qFn| *||||
}| jt|7  _q| jr\|d r\qj| j | _}qFd S )NFTrv   r7   r9   ZbypassDocumentValidationr:   ZwriteConcernErrorcoder   writeErrors)+r6   rQ   r   rR   _event_listenersrJ   r   rK   Zvalidate_sessionrH   r   ri   rS   r=   
idx_offsetlenr@   r7   r9   r   Zapply_write_concernrB   r:   r    r"   rI   Z_start_retryable_writeZ	_apply_tor(   ZPRIMARYZsend_cluster_timeadd_server_apiZapply_timeoutr   acknowledgedr   getr   copydeepcopyr   r   r   )rM   r   r   r   r   r   r   r   r   r   rR   	listenersrk   Zlast_runcmd_namerr   ra   r@   r   r   ZwcefullrN   rN   rO   _execute_command  s|    








z_Bulk._execute_commandstr)r   r   r   rm   r;   c              	     sz   g g dddddg d t  ddddd fdd	}jjj}|jj|||d
} d sn d rvt   S )zExecute using write commands.r   r   writeConcernErrorsZ	nInsertedZ	nUpsertedZnMatchedZ	nModifiedZnRemovedZupsertedr   r/   r4   r5   )r   r   r   r;   c              	     s    | ||  d S )N)r   )r   r   r   r   r   r   rM   r   rN   rO   retryable_bulkC  s    z-_Bulk.execute_command.<locals>.retryable_bulk)ZbulkZoperation_idr   r   )r'   r6   rQ   rR   Z_retryable_writerG   r   )rM   r   r   r   rm   r   rR   r   rN   r   rO   execute_command.  s.    

 
	z_Bulk.execute_command)r   r   r;   c              
   C  s   | j jj}| j jj}|j}t }| js0t|| _| j}|rt|j	 }| 
|||||d|j	| j j}	|jt|jk r|| j jdddddii}
||
 t|j|jd}| |	|
||}| jt|7  _qbt|d | _}q6dS )zCExecute write commands with OP_MSG and w=0 writeConcern, unordered.Nr7   FZwriteConcernr   r   )r6   rQ   r   rR   r   r'   rJ   r   r   ri   rS   r=   r   r   r@   r   r   r   )rM   r   r   r   rR   r   r   rk   r   rr   ra   r@   r   rN   rN   rO   execute_op_msg_no_results^  s@    



   
z_Bulk.execute_op_msg_no_results)r   r   r   r;   c              
   C  sX   g g dddddg d}t  }t }z| ||d||d|| W n tk
rR   Y nX dS )zAExecute write commands with OP_MSG and w=0 WriteConcern, ordered.r   r   NF)r,   r'   r   r   )rM   r   r   r   r   Zinitial_write_concernr   rN   rN   rO   execute_command_no_results  s0    
z _Bulk.execute_command_no_resultsc                 C  s   | j rtd| jrtd|o&|j }|rD| jrD|jdk rDtd|r`| jr`|jdk r`td| jrntd| j	r| 
|||S | ||S )z3Execute all operations, returning no results (w=0).z3Collation is unsupported for unacknowledged writes.z6arrayFilters is unsupported for unacknowledged writes.	   zPMust be connected to MongoDB 4.4+ to use hint on unacknowledged delete commands.   zPMust be connected to MongoDB 4.2+ to use hint on unacknowledged update commands.zGCannot set bypass_document_validation with unacknowledged write concern)rC   r   rD   r   rF   Zmax_wire_versionrE   rB   r   r7   r   r   )rM   r   r   r   ZunackrN   rN   rO   execute_no_results  s(    z_Bulk.execute_no_resultsr   )r   r   rm   r;   c              	   C  s   | j std| jrtdd| _|p,| jj}t||}| jrH|  }n|  }| jj	j
}|js|||}| ||| W 5 Q R  dS Q R X n| ||||S dS )zExecute operations.zNo operations to executez*Bulk operations can only be executed once.TN)r@   r   rA   r6   r   r*   r7   rn   rp   rQ   rR   r   Z_conn_for_writesr   r   )rM   r   r   rm   r   rR   
connectionrN   rN   rO   execute  s     


z_Bulk.execute)NN)FFNNN)FNN)NN)N)__name__
__module____qualname____doc__rP   propertyrS   rW   rb   rd   rg   rn   rp   r+   r   r   r   r   r   r   r   r   r   r   rN   rN   rN   rO   r3   S   sD     "	     !     UX  g0%$r3   )Lr   
__future__r   r   r   r   collections.abcr   	itertoolsr   typingr   r   r   r   r	   r
   r   Zbson.objectidr   Zbson.raw_bsonr   Zpymongor   r   Zpymongo.bulk_sharedr   r   r   r   r   Zpymongo.commonr   r   r   Zpymongo.errorsr   r   r   r   Zpymongo.helpers_sharedr   Zpymongo.loggerr   r   r   Zpymongo.messager    r!   r"   r#   r$   r%   r&   r'   Zpymongo.read_preferencesr(   Z"pymongo.synchronous.client_sessionr)   r*   Zpymongo.synchronous.helpersr+   Zpymongo.write_concernr,   Zpymongo.synchronous.collectionr-   Z pymongo.synchronous.mongo_clientr.   Zpymongo.synchronous.poolr/   Zpymongo.typingsr0   r1   r2   Z_IS_SYNCr3   rN   rN   rN   rO   <module>   s6   $
(
