U
    ~fhp                     @  s  d Z ddlmZ ddlZddlZddlZddlmZ ddlm	Z	 ddl
mZmZmZmZmZmZmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZm Z m!Z!m"Z"m#Z# ddl$m%Z%m&Z&m'Z' ddl(m)Z)m*Z*m+Z+m,Z, ddl-m.Z. ddl/m0Z0m1Z1m2Z2 ddl3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z; ddl<m=Z= ddl>m?Z? erddl@mAZA ddlBmCZC ddlDmEZE ddlFmGZGmHZHmIZI dZJG dd dZKdS )z<The bulk write operations interface.

.. versionadded:: 2.7
    )annotationsN)MutableMapping)islice)TYPE_CHECKINGAnyIteratorMappingOptionalTypeUnion)ObjectId)RawBSONDocument)_csotcommon)AsyncClientSession_validate_session_write_concern)_handle_reauth)	_COMMANDS_DELETE_ALL_merge_command_raise_bulk_write_error_Run)validate_is_document_typevalidate_ok_for_replacevalidate_ok_for_update)ConfigurationErrorInvalidOperationNotPrimaryErrorOperationFailure)_RETRYABLE_ERROR_CODES)_COMMAND_LOGGER_CommandStatusMessage
_debug_log)_DELETE_INSERT_UPDATE_BulkWriteContext_convert_exception_convert_write_result_EncryptedBulkWriteContext_randint)ReadPreference)WriteConcern)AsyncCollection)AsyncMongoClient)AsyncConnection)_DocumentOut_DocumentType	_PipelineFc                   @  s  e Zd ZdZdTdddddddd	d
ZeddddZdddddZdUdddddddddddZdVdddddddddZ	dWdddddd d!d"Z
d#dd$d%Zd&dd'd(Zed)d*dd+d,d-d.d/d0d1Zd)d*dd+dd,d-dd2d3d4Zd5d.d,d-d,d6d7d8Zd5d.d,d-d9d6d:d;ZdXd<d=d>d?ddd*d@ddA	dBdCZd<d=d>dDd.dEdFdGZd?d<ddHdIdJZd?d<d=ddKdLdMZd?d<d=ddKdNdOZd=d>dDdPdQdRdSZdS )Y
_AsyncBulkz'The private guts of the bulk write API.NzAsyncCollection[_DocumentType]boolzOptional[str]zOptional[Any]None)
collectionorderedbypass_document_validationcommentletreturnc                 C  s   |j |jjdtdd| _|| _| jdk	r8td| j || _|| _	g | _
d| _|| _d| _d| _d| _d| _d| _d| _d| _d| _d| _d| _dS )z!Initialize a _AsyncBulk instance.replace)Zunicode_decode_error_handlerZdocument_class)codec_optionsNr:   FT)Zwith_optionsr=   _replacedictr6   r:   r   r   r9   r7   opsexecutedbypass_doc_valuses_collationuses_array_filtersuses_hint_updateuses_hint_deleteis_retryableretryingstarted_retryable_writecurrent_runnext_runis_encrypted)selfr6   r7   r8   r9   r:    rN   =/tmp/pip-unpacked-wheel-36gvocj8/pymongo/asynchronous/bulk.py__init__V   s0    	 
z_AsyncBulk.__init__zType[_BulkWriteContext])r;   c                 C  s.   | j jjj}|r |js d| _tS d| _tS d S )NTF)r6   databaseclientZ
_encrypterZ_bypass_auto_encryptionrL   r)   r&   )rM   Z	encrypterrN   rN   rO   bulk_ctx_classx   s    
z_AsyncBulk.bulk_ctx_classr0   )documentr;   c                 C  s:   t d| t|ts&d|ks&t |d< | jt|f dS )z*Add an insert document to the list of ops.rT   Z_idN)r   
isinstancer   r   r@   appendr$   )rM   rT   rN   rN   rO   
add_insert   s    

z_AsyncBulk.add_insertFzMapping[str, Any]z#Union[Mapping[str, Any], _Pipeline]zOptional[Mapping[str, Any]]z!Optional[list[Mapping[str, Any]]]z Union[str, dict[str, Any], None])selectorupdatemultiupsert	collationarray_filtershintr;   c           	      C  s   t | td|fd|fd|fd|fg}|dk	r>d| _||d< |dk	rTd| _||d< |dk	rjd| _||d	< |rtd
| _| jt|f dS )z8Create an update document and add it to the list of ops.qurZ   r[   NTr\   ZarrayFiltersr^   F)	r   r?   rC   rD   rE   rG   r@   rV   r%   )	rM   rX   rY   rZ   r[   r\   r]   r^   cmdrN   rN   rO   
add_update   s     z_AsyncBulk.add_update)rX   replacementr[   r\   r^   r;   c                 C  sV   t | ||d|d}|dk	r,d| _||d< |dk	rBd| _||d< | jt|f dS )z8Create a replace document and add it to the list of ops.F)r_   r`   rZ   r[   NTr\   r^   )r   rC   rE   r@   rV   r%   )rM   rX   rc   r[   r\   r^   ra   rN   rN   rO   add_replace   s    	z_AsyncBulk.add_replaceint)rX   limitr\   r^   r;   c                 C  sX   ||d}|dk	r d| _ ||d< |dk	r6d| _||d< |tkrDd| _| jt|f dS )z7Create a delete document and add it to the list of ops.)r_   rf   NTr\   r^   F)rC   rF   r   rG   r@   rV   r#   )rM   rX   rf   r\   r^   ra   rN   rN   rO   
add_delete   s    
z_AsyncBulk.add_deletezIterator[Optional[_Run]]c                 c  s\   d}t | jD ]B\}\}}|dkr,t|}n|j|krD|V  t|}||| q|V  dS )ziGenerate batches of operations, batched by type of
        operation, in the order **provided**.
        N)	enumerater@   r   op_typeadd)rM   runidxri   	operationrN   rN   rO   gen_ordered   s    

z_AsyncBulk.gen_orderedzIterator[_Run]c                 c  sX   t tt tt tg}t| jD ]\}\}}|| || q |D ]}|jrB|V  qBdS )zbGenerate batches of operations, batched by type of
        operation, in arbitrary order.
        N)r   r$   r%   r#   rh   r@   rj   )rM   
operationsrl   ri   rm   rk   rN   rN   rO   gen_unordered   s    z_AsyncBulk.gen_unorderedr&   zMutableMapping[str, Any]byteszlist[Mapping[str, Any]]r.   zdict[str, Any])bwcra   
request_idmsgdocsrR   r;   c                   s(  |||j < ttjrdtt|jjtj	|t
t||j|||jj|jj|jjd |jjd |jjd |jrx|||| z|j|||jI dH }tj |j }ttjrtt|jjtj||t
t||j|||jj|jj|jjd |jjd |jjd |jr|||| |||jI dH  W n tk
r" }	 ztj |j }t|	tt frf|	j!}
nt"|	}
ttjrtt|jjtj#||
t
t||j|||jj|jj|jjd |jjd |jjt|	t d |jr|$||
| t|	tt fr||	j!|jI dH   W 5 d}	~	X Y nX |S )zCA proxy for SocketInfo.write_command that handles event publishing.r      clientIdmessagecommandcommandNamedatabaseName	requestIdoperationIddriverConnectionIdserverConnectionId
serverHost
serverPort	serviceIdNrx   ry   
durationMSreplyr{   r|   r}   r~   r   r   r   r   r   rx   ry   r   failurer{   r|   r}   r~   r   r   r   r   r   ZisServerSideError)%fieldr    isEnabledForloggingDEBUGr"   _topology_settings_topology_idr!   STARTEDnextiterdb_nameconnidserver_connection_idaddress
service_idpublish_startwrite_commandcodecdatetimenow
start_time	SUCCEEDED_succeedZ_process_responsesession	ExceptionrU   r   r   detailsr'   FAILED_fail)rM   rr   ra   rs   rt   ru   rR   r   durationexcr   rN   rN   rO   r      s    









z_AsyncBulk.write_command)rr   ra   rs   rt   max_doc_sizeru   rR   r;   c                   s,  t tjrZtt |jjtj|t	t
||j|||jj|jj|jjd |jjd |jjd |jrn||||}z|j||I dH }tj |j }	|dk	rt|j||}
nfddi}
t tjrtt |jjtj|	|
t	t
||j|||jj|jj|jjd |jjd |jjd |jr(|||
|	 W n tk
r& } ztj |j }	t|trlt|j||j}nt|tr|j}nt |}t tjrtt |jjtj!|	|t	t
||j|||jj|jj|jjd |jjd |jjt|td |jr|jdk	st"|#|||	  W 5 d}~X Y nX |S )zFA proxy for AsyncConnection.unack_write that handles event publishing.r   rv   rw   Nokr   r   )$r    r   r   r   r"   r   r   r!   r   r   r   r   r   r   r   r   r   r   r   unack_writer   r   r   r(   namer   r   r   rU   r   r   r   r'   r   AssertionErrorr   )rM   rr   ra   rs   rt   r   ru   rR   resultr   r   r   r   rN   rN   rO   r   >  s    








z_AsyncBulk.unack_writez4Union[_BulkWriteContext, _EncryptedBulkWriteContext])rr   ra   r@   rR   r;   c           
   	     sr   | j r@|||\}}}|jj|j|tdd|j|dI d H  n.|||\}}	}| ||||	d||I d H  |S )Nr   )w)write_concernr   rR   )rL   batch_commandr   rz   r   r,   r   r   )
rM   rr   ra   r@   rR   _batched_cmdto_sendrs   rt   rN   rN   rO   _execute_batch_unack  s    z_AsyncBulk._execute_batch_unackz.tuple[dict[str, Any], list[Mapping[str, Any]]]c                   sp   | j r<|||\}}}|jj|j||j|j|dI d H }n,|||\}	}
}| |||	|
||I d H }||fS )N)r=   r   rR   )rL   r   r   rz   r   r   r   r   )rM   rr   ra   r@   rR   r   r   r   r   rs   rt   rN   rN   rO   _execute_batch  s    z_AsyncBulk._execute_batchzIterator[Any]r,   Optional[AsyncClientSession]r/   zOptional[WriteConcern])		generatorr   r   r   op_id	retryablefull_resultfinal_write_concernr;   c	              
     sz  | j jj}	| j jj}
|
j}| js0t|| _d | _| j}||
| d}|rv| j	slt|d | _| jd krld}t
|j }| |	||||||j| j j}|jt|jk rR|rt|j|j dkr|p|}|| j jd| ji}| jr| j|d< t|| | jrd|d< | jd k	r*|jttfkr*| j|d< |r^|rL| jsL|  d| _|||tj| ||||
 || | |
| t!|j|jd }|j"r(| #||||
I d H \}}|$di }|$d	d
t%krt&'|}t(|||j| t)| t(|||j| d| _	d| _| jr>d|kr>qRn| *||||
I d H }| jt|7  _q| jrh|d rhqv| j | _}qFd S )NFTrv   r7   r9   ZbypassDocumentValidationr:   ZwriteConcernErrorcoder   writeErrors)+r6   rQ   r   rR   _event_listenersrJ   r   rK   Zvalidate_sessionrH   r   ri   rS   r=   
idx_offsetlenr@   r7   r9   r   Zapply_write_concernrB   r:   r#   r%   rI   Z_start_retryable_writeZ	_apply_tor+   ZPRIMARYZsend_cluster_timeadd_server_apiZapply_timeoutr   acknowledgedr   getr   copydeepcopyr   r   r   )rM   r   r   r   r   r   r   r   r   r   rR   	listenersrk   Zlast_runcmd_namerr   ra   r@   r   r   ZwcefullrN   rN   rO   _execute_command  s|    








z_AsyncBulk._execute_commandstr)r   r   r   rm   r;   c              	     s   g g dddddg d t  ddddd fdd	}jjj}|jj|||d
I dH } d st d r|t   S )zExecute using write commands.r   r   writeConcernErrorsZ	nInsertedZ	nUpsertedZnMatchedZ	nModifiedZnRemovedZupsertedr   r/   r4   r5   )r   r   r   r;   c              	     s     | || I d H  d S )N)r   )r   r   r   r   r   r   rM   r   rN   rO   retryable_bulkC  s    z2_AsyncBulk.execute_command.<locals>.retryable_bulk)ZbulkZoperation_idNr   r   )r*   r6   rQ   rR   Z_retryable_writerG   r   )rM   r   r   r   rm   r   rR   r   rN   r   rO   execute_command.  s.    

 
	z_AsyncBulk.execute_command)r   r   r;   c              
     s   | j jj}| j jj}|j}t }| js0t|| _| j}|rt|j	 }| 
|||||d|j	| j j}	|jt|jk r|| j jdddddii}
||
 t|j|jd}| |	|
||I dH }| jt|7  _qbt|d | _}q6dS )zCExecute write commands with OP_MSG and w=0 writeConcern, unordered.Nr7   FZwriteConcernr   r   )r6   rQ   r   rR   r   r*   rJ   r   r   ri   rS   r=   r   r   r@   r   r   r   )rM   r   r   r   rR   r   r   rk   r   rr   ra   r@   r   rN   rN   rO   execute_op_msg_no_results^  s@    



   
z$_AsyncBulk.execute_op_msg_no_results)r   r   r   r;   c              
     s^   g g dddddg d}t  }t }z"| ||d||d||I dH  W n tk
rX   Y nX dS )zAExecute write commands with OP_MSG and w=0 WriteConcern, ordered.r   r   NF)r,   r*   r   r   )rM   r   r   r   r   Zinitial_write_concernr   rN   rN   rO   execute_command_no_results  s0    
z%_AsyncBulk.execute_command_no_resultsc                   s   | j rtd| jrtd|o&|j }|rD| jrD|jdk rDtd|r`| jr`|jdk r`td| jrntd| j	r| 
|||I dH S | ||I dH S )	z3Execute all operations, returning no results (w=0).z3Collation is unsupported for unacknowledged writes.z6arrayFilters is unsupported for unacknowledged writes.	   zPMust be connected to MongoDB 4.4+ to use hint on unacknowledged delete commands.   zPMust be connected to MongoDB 4.2+ to use hint on unacknowledged update commands.zGCannot set bypass_document_validation with unacknowledged write concernN)rC   r   rD   r   rF   Zmax_wire_versionrE   rB   r   r7   r   r   )rM   r   r   r   ZunackrN   rN   rO   execute_no_results  s(    z_AsyncBulk.execute_no_resultsr   )r   r   rm   r;   c              
     s   | j std| jrtdd| _|p,| jj}t||}| jrH|  }n|  }| jj	j
}|js|||I dH 4 I dH *}| |||I dH  W 5 Q I dH R  dS Q I dH R X n| ||||I dH S dS )zExecute operations.zNo operations to executez*Bulk operations can only be executed once.TN)r@   r   rA   r6   r   r   r7   rn   rp   rQ   rR   r   Z_conn_for_writesr   r   )rM   r   r   rm   r   rR   
connectionrN   rN   rO   execute  s     


"z_AsyncBulk.execute)NN)FFNNN)FNN)NN)N)__name__
__module____qualname____doc__rP   propertyrS   rW   rb   rd   rg   rn   rp   r   r   r   r   r   r   r   r   r   r   r   rN   rN   rN   rO   r3   S   sD     "	     !     UX  g0'$r3   )Lr   
__future__r   r   r   r   collections.abcr   	itertoolsr   typingr   r   r   r   r	   r
   r   Zbson.objectidr   Zbson.raw_bsonr   Zpymongor   r   Z#pymongo.asynchronous.client_sessionr   r   Zpymongo.asynchronous.helpersr   Zpymongo.bulk_sharedr   r   r   r   r   Zpymongo.commonr   r   r   Zpymongo.errorsr   r   r   r   Zpymongo.helpers_sharedr   Zpymongo.loggerr    r!   r"   Zpymongo.messager#   r$   r%   r&   r'   r(   r)   r*   Zpymongo.read_preferencesr+   Zpymongo.write_concernr,   Zpymongo.asynchronous.collectionr-   Z!pymongo.asynchronous.mongo_clientr.   Zpymongo.asynchronous.poolr/   Zpymongo.typingsr0   r1   r2   Z_IS_SYNCr3   rN   rN   rN   rO   <module>   s6   $
(
