a
    !fec                     @   sz  d dl Z d dlmZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlZd dlmZ d dlmZ d dlmZ d d	lmZ ejZeeZd
Zd ddddddddddddddddddZdZe
je
je
je
je
je
je
je
j fZ!e
jfZ"e #ddd gZ$G d!d" d"e%Z&G d#d$ d$eZ'G d%d& d&e%Z(G d'd( d(e%Z)d)d* Z*d+d, Z+d-d. Z,d/d0 Z-G d1d2 d2e%Z.dS )3    N)Enum)ResumableBidiRpc)BackgroundConsumer)
exceptions)ListenRequest)Target)TargetChange)_helpersiyP                             	   
                  )OK	CANCELLEDUNKNOWNZINVALID_ARGUMENTZDEADLINE_EXCEEDED	NOT_FOUNDZALREADY_EXISTSZPERMISSION_DENIEDZUNAUTHENTICATEDZRESOURCE_EXHAUSTEDZFAILED_PRECONDITIONZABORTEDZOUT_OF_RANGEZUNIMPLEMENTEDZINTERNALUNAVAILABLEZ	DATA_LOSSZ
DO_NOT_USEzThread-OnRpcTerminatedDocTreeEntryvalueindexc                   @   sT   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd ZdS )WatchDocTreec                 C   s   i | _ d| _d S )Nr   )_dict_indexself r(   `/var/www/html/python-backend/venv/lib/python3.9/site-packages/google/cloud/firestore_v1/watch.py__init__N   s    zWatchDocTree.__init__c                 C   s   t | j S N)listr$   keysr&   r(   r(   r)   r-   R   s    zWatchDocTree.keysc                 C   s"   t  }| j |_| j|_|} | S r+   )r#   r$   copyr%   )r'   Zwdtr(   r(   r)   _copyU   s
    zWatchDocTree._copyc                 C   s,   |   } t|| j| j|< |  jd7  _| S )Nr
   )r/   r    r%   r$   )r'   keyr!   r(   r(   r)   insert\   s    zWatchDocTree.insertc                 C   s
   | j | S r+   r$   r'   r0   r(   r(   r)   findb   s    zWatchDocTree.findc                 C   s   |   } | j|= | S r+   )r/   r$   r3   r(   r(   r)   removee   s    zWatchDocTree.removec                 c   s   | j D ]
}|V  qd S r+   r2   r'   kr(   r(   r)   __iter__j   s    
zWatchDocTree.__iter__c                 C   s
   t | jS r+   )lenr$   r&   r(   r(   r)   __len__n   s    zWatchDocTree.__len__c                 C   s
   || j v S r+   r2   r6   r(   r(   r)   __contains__q   s    zWatchDocTree.__contains__N)__name__
__module____qualname__r*   r-   r/   r1   r4   r5   r8   r:   r;   r(   r(   r(   r)   r#   J   s   r#   c                   @   s   e Zd ZdZdZdZdS )
ChangeTyper
   r   r   N)r<   r=   r>   ADDEDREMOVEDMODIFIEDr(   r(   r(   r)   r?   u   s   r?   c                   @   s   e Zd Zdd ZdS )DocumentChangec                 C   s   || _ || _|| _|| _dS )zDocumentChange

        Args:
            type (ChangeType):
            document (document.DocumentSnapshot):
            old_index (int):
            new_index (int):
        N)typedocument	old_index	new_index)r'   rD   rE   rF   rG   r(   r(   r)   r*   |   s    
zDocumentChange.__init__Nr<   r=   r>   r*   r(   r(   r(   r)   rC   {   s   rC   c                   @   s   e Zd Zdd ZdS )WatchResultc                 C   s   || _ || _|| _d S r+   )snapshotnamechange_type)r'   rJ   rK   rL   r(   r(   r)   r*      s    zWatchResult.__init__NrH   r(   r(   r(   r)   rI      s   rI   c                 C   s   t | tjrt| S | S )z(Wraps a gRPC exception class, if needed.)
isinstancegrpcZRpcErrorr   Zfrom_grpc_error)	exceptionr(   r(   r)   _maybe_wrap_exception   s    
rP   c                 C   s   | |ksJ ddS )Nz+Document watches only support one document.r   r(   )Zdoc1Zdoc2r(   r(   r)   document_watch_comparator   s    rQ   c                 C   s   t | }t|tS r+   )rP   rM   _RECOVERABLE_STREAM_EXCEPTIONSrO   wrappedr(   r(   r)   _should_recover   s    rU   c                 C   s   t | }t|tS r+   )rP   rM   _TERMINATING_STREAM_EXCEPTIONSrS   r(   r(   r)   _should_terminate   s    rW   c                
   @   s   e Zd Zdd Zdd Zedd Zedd Zd	d
 Zdd Z	e
dd Zd.ddZdd Zdd Zdd Zdd Zdd Zdd Zdd ZejeejeejeejeejeiZd d! Zd"d# Zd$d% Zed&d' Zd(d) Z d*d+ Z!d,d- Z"dS )/Watchc                 C   sz   || _ || _|| _|| _|| _|| _|j| _t	 | _
d| _| |j d| _t | _i | _i | _d| _d| _|   dS )a  
        Args:
            firestore:
            target:
            comparator:
            snapshot_callback: Callback method to process snapshots.
                Args:
                    docs (List(DocumentSnapshot)): A callback that returns the
                        ordered list of documents stored in this snapshot.
                    changes (List(str)): A callback that returns the list of
                        changed documents since the last snapshot delivered for
                        this watch.
                    read_time (string): The ISO 8601 time at which this
                        snapshot was obtained.

            document_snapshot_cls: factory for instances of DocumentSnapshot
        FN)Z_document_reference
_firestore_targets_comparator_document_snapshot_cls_snapshot_callbackZ_firestore_api_api	threadingLock_closing_closed_set_documents_pfx_database_stringresume_tokenr#   doc_treedoc_map
change_mapcurrent
has_pushed_init_stream)r'   Zdocument_referenceZ	firestoretargetZ
comparatorsnapshot_callbackdocument_snapshot_clsr(   r(   r)   r*      s"    
zWatch.__init__c                 C   sP   | j }t| jjjtt|| jjd| _	| j	
| j t| j	| j| _| j  d S )N)Z	start_rpcZshould_recoverZshould_terminateZinitial_requestmetadata)_get_rpc_requestr   r^   
_transportlistenrU   rW   rY   Z_rpc_metadata_rpcadd_done_callback_on_rpc_doner   on_snapshot	_consumerstart)r'   Zrpc_requestr(   r(   r)   rk      s    zWatch._init_streamc                 C   s"   | ||j d|jgitdt||S )a  
        Creates a watch snapshot listener for a document. snapshot_callback
        receives a DocumentChange object, but may also start to get
        targetChange and such soon

        Args:
            document_ref: Reference to Document
            snapshot_callback: callback to be called on snapshot
            document_snapshot_cls: class to make snapshots with
            reference_class_instance: class make references

        	documents)ry   	target_id)_client_document_pathWATCH_TARGET_IDrQ   )clsdocument_refrm   rn   r(   r(   r)   for_document   s    
zWatch.for_documentc                 C   s>   |j  \}}tj|| d}| ||j|jtd|j||S )N)parentZstructured_query)queryrz   )	_parentZ_parent_infor   ZQueryTargetZ_to_protobufr{   _pbr}   r[   )r~   r   rm   rn   parent_path_Zquery_targetr(   r(   r)   	for_query  s    
zWatch.for_queryc                 C   s8   | j d ur| j | jd< n| jdd  t| jj| jdS )Nre   )ZdatabaseZ
add_target)re   rZ   popr   rY   rd   r&   r(   r(   r)   rp   )  s    

zWatch._get_rpc_requestc                 C   s   | d| _ t| j | _d S )Nz/documents/)_documents_pfxr9   _documents_pfx_len)r'   Zdatabase_stringr(   r(   r)   rc   3  s    zWatch._set_documents_pfxc                 C   s   | j duo| j jS )zbool: True if this manager is actively streaming.

        Note that ``False`` does not indicate this is complete shut down,
        just that it stopped getting new messages.
        N)rw   	is_activer&   r(   r(   r)   r   7  s    zWatch.is_activeNc                 C   s   | j f | jrW d   dS | jr8td | j  d| _| j  d| _d| _td W d   n1 sr0    Y  |rtd|  t	|t
r|t|dS )a  Stop consuming messages and shutdown all helper threads.

        This method is idempotent. Additional calls will have no effect.

        Args:
            reason (Any): The reason to close this. If None, this is considered
                an "intentional" shutdown.
        NzStopping consumer.TzFinished stopping manager.zreason for closing: %s)ra   rb   r   _LOGGERdebugrw   stoprs   closerM   	ExceptionRuntimeError)r'   reasonr(   r(   r)   r   @  s     	


(
zWatch.closec                 C   s:   t d t|}tjt| jd|id}d|_|  dS )a
  Triggered whenever the underlying RPC terminates without recovery.

        This is typically triggered from one of two threads: the background
        consumer thread (when calling ``recv()`` produces a non-recoverable
        error) or the grpc management thread (when cancelling the RPC).

        This method is *non-blocking*. It will start another thread to deal
        with shutting everything down. This is to prevent blocking in the
        background consumer and preventing it from being ``joined()``.
        z.RPC termination has signaled manager shutdown.r   )rK   rl   kwargsTN)	r   inforP   r_   Thread_RPC_ERROR_THREAD_NAMEr   daemonrx   )r'   futurethreadr(   r(   r)   ru   _  s    
zWatch._on_rpc_donec                 C   s   |    d S r+   )r   r&   r(   r(   r)   unsubscriber  s    zWatch.unsubscribec                 C   sF   t d |jd u p t|jdk}|rB|jrB| jrB| |j|j d S )Nz%on_snapshot: target change: NO_CHANGEr   )r   r   
target_idsr9   	read_timeri   pushre   )r'   target_changeZno_target_idsr(   r(   r)   $_on_snapshot_target_change_no_changeu  s
    
z*Watch._on_snapshot_target_change_no_changec                 C   s,   t d |jd }|tkr(td| d S )Nzon_snapshot: target change: ADDr   z&Unexpected target ID %s sent by server)r   r   r   r}   r   )r'   r   rz   r(   r(   r)   _on_snapshot_target_change_add  s    

z$Watch._on_snapshot_target_change_addc                 C   sD   t d |jjr$|jj}|jj}nd}d}d||f }t|d S )Nz"on_snapshot: target change: REMOVEr   zinternal errorzError %s:  %s)r   r   causecodemessager   )r'   r   r   r   error_messager(   r(   r)   !_on_snapshot_target_change_remove  s    

z'Watch._on_snapshot_target_change_removec                 C   s   t d |   d S )Nz!on_snapshot: target change: RESET)r   r   _reset_docsr'   r   r(   r(   r)    _on_snapshot_target_change_reset  s    
z&Watch._on_snapshot_target_change_resetc                 C   s   t d d| _d S )Nz#on_snapshot: target change: CURRENTT)r   r   ri   r   r(   r(   r)   "_on_snapshot_target_change_current  s    
z(Watch._on_snapshot_target_change_currentc                 C   s   | | jr|| jd  }|S r+   )
startswithr   r   )r'   document_namer(   r(   r)   _strip_document_pfx  s    zWatch._strip_document_pfxc              
   C   s`  |du r|    dS |j}|d}|dkr|jj}td|  | j|}|du rd| }t	d|  | j t
|d z|| |j W n6 ty } ztd|   W Y d}~n
d}~0 0 n|d	krtd
 t|jjv }t|jjv }	|jj}
|rbtd t|
j| j}| |
j}| j|}| j||dd|
j|
jd}|| j|
j< n|	r\td tj| j|
j< n|dkrtd |jj}tj| j|< n|dkrtd |jj}tj| j|< n|dkr8td |jj | ! kr\t	d t"j#t$| j d}|%  |&  | '  | (  n$td d| }| j t
|d dS )aS  Process a response from the bi-directional gRPC stream.

        Collect changes and push the changes in a batch to the customer
        when we receive 'current' from the listen response.

        Args:
            proto(`google.cloud.firestore_v1.types.ListenResponse`):
                Callback method that receives a object to
        NZresponse_typer   zon_snapshot: target change: zUnknown target change type: zon_snapshot: )r   zmeth(proto) exc: document_changezon_snapshot: document changez%on_snapshot: document change: CHANGEDT)	referencedataexistsr   create_timeupdate_timez%on_snapshot: document change: REMOVEDdocument_deletez$on_snapshot: document change: DELETEdocument_removez$on_snapshot: document change: REMOVEfilterzon_snapshot: filter updatez%Filter mismatch -- restarting stream.)rK   rl   zUNKNOWN TYPE. UHOHzUnknown listen response type: ))r   r   Z
WhichOneofr   target_change_typer   r   _target_changetype_dispatchgetr   
ValueErrorr   r}   r   r   Zremoved_target_idsrE   r	   Zdecode_dictfieldsrY   r   rK   r\   r   r   rh   r?   rA   r   r   r   count_current_sizer_   r   r   rx   joinr   rk   )r'   protoZpbwhichr   methr   Zexc2changedremovedrE   r   r   r   rJ   rK   r   r(   r(   r)   rv     s~    
















zWatch.on_snapshotc                 C   s   |  | j| j|\}}}| | j| j|||\}}}| jrBt|rrt| j	}	t
| |	d}
| |
|| d| _|| _|| _| j  || _dS )zInvoke the callback with a new snapshot

        Build the sntapshot from the current set of changes.

        Clear the current changes on completion.
        r0   TN)_extract_changesrg   rh   _compute_snapshotrf   rj   r9   	functools
cmp_to_keyr[   sortedr-   r]   clearre   )r'   r   Znext_resume_tokendeletesaddsupdatesupdated_treeupdated_mapappliedChangesr0   r-   r(   r(   r)   r     s    



z
Watch.pushc                 C   s   g }g }g }|  D ]`\}}|tjkr:|| v rt|| q|| v r\|d urP||_|| q|d urj||_|| q|||fS r+   )itemsr?   rA   appendr   )rg   changesr   r   r   r   rK   r!   r(   r(   r)   r   8  s    
zWatch._extract_changesc                    s   |}|}t |t |ks J ddd dd   fdd}g }	t| j}
t|}|D ] }|||\}}}|	| qZt||
d}td	 |D ]*}td
  |||\}}}|	| qt||
d}|D ](}||||\}}}|d ur|	| qt |t |ksJ d|||	fS )NzJThe document tree and document map should have the same number of entries.c                 S   sP   | |v sJ d| | }||}|j}||}|| = ttj||d||fS )z
            Applies a document delete to the document tree and document map.
            Returns the corresponding DocumentChange event.
            z!Document to delete does not existr   )r   r4   r"   r5   rC   r?   rA   )rK   r   r   old_documentexistingrF   r(   r(   r)   
delete_docX  s    


z+Watch._compute_snapshot.<locals>.delete_docc                 S   sN   | j j}||vsJ d|| d}|| j}| ||< ttj| d|||fS )z
            Applies a document add to the document tree and the document map.
            Returns the corresponding DocumentChange event.
            zDocument to add already existsNr   )r   r|   r1   r4   r"   rC   r?   r@   )new_documentr   r   rK   rG   r(   r(   r)   add_docj  s    z(Watch._compute_snapshot.<locals>.add_docc                    sv   | j j}||v sJ d||}|j| jkrl|||\}}} | ||\}}}ttj| |j|j||fS d||fS )z
            Applies a document modification to the document tree and the
            document map.
            Returns the DocumentChange event for successful modifications.
            z!Document to modify does not existN)	r   r|   r   r   rC   r?   rB   rF   rG   )r   r   r   rK   r   Zremove_changeZ
add_changer   r   r(   r)   
modify_docz  s(    


z+Watch._compute_snapshot.<locals>.modify_docr   zwalk over add_changeszin add_changeszQThe update document tree and document map should have the same number of entries.)r9   r   r   r[   r   r   r   r   )r'   rf   rg   Zdelete_changesZadd_changesZupdate_changesr   r   r   r   r0   rK   ZchangerJ   r(   r   r)   r   M  sF    !




zWatch._compute_snapshotc                 C   s2   |  | j| jd\}}}t| jt| t| S )zsReturn the current count of all documents.

        Count includes the changes from the current changeMap.
        N)r   rg   rh   r9   )r'   r   r   r   r(   r(   r)   r     s    zWatch._current_sizec                 C   sH   t d | j  d| _| j D ]}|jj}t	j
| j|< q$d| _dS )zG
        Helper to clear the docs on RESET or filter mismatch.
        zresetting documentsNF)r   r   rh   r   re   rf   r-   r   r|   r?   rA   ri   )r'   rJ   rK   r(   r(   r)   r     s    

zWatch._reset_docs)N)#r<   r=   r>   r*   rk   classmethodr   r   rp   rc   propertyr   r   ru   r   r   r   r   r   r   TargetChangeType	NO_CHANGEZADDZREMOVERESETZCURRENTr   r   rv   r   staticmethodr   r   r   r   r(   r(   r(   r)   rX      s@   >




o
qrX   )/collectionsenumr   r   loggingr_   Zgoogle.api_core.bidir   r   Zgoogle.api_corer   rN   Z)google.cloud.firestore_v1.types.firestorer   r   r   Zgoogle.cloud.firestore_v1r	   r   	getLoggerr<   r   r}   ZGRPC_STATUS_CODEr   ZAbortedZ	CancelledUnknownZDeadlineExceededZResourceExhaustedZInternalServerErrorZServiceUnavailableZUnauthenticatedrR   rV   
namedtupler    objectr#   r?   rC   rI   rP   rQ   rU   rW   rX   r(   r(   r(   r)   <module>   sn   

+