U
    ~fhL                     @  s  d Z ddlmZ ddlZddlZddlZddlZddlmZm	Z	m
Z
mZmZ ddlmZ ddlmZ ddlmZmZmZmZ ddlmZ dd	lmZ dd
lmZmZmZ ddlmZ ddl m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z' ddl(m)Z) er"ddl*m+Z+m,Z,m-Z- ddl.m/Z/ ddl0m1Z1 dZ2dddddZ3dddddZ4G dd dZ5G d d! d!e5Z6G d"d# d#e5Z7G d$d% d%e5Z8e9 Z:ddd&d'd(Z;d)dd*d+d,Z<dd-d.d/Z=dd-d0d1Z>e?e> dS )2z9Class to monitor a MongoDB server on a background thread.    )annotationsN)TYPE_CHECKINGAnyMappingOptionalcast)common)MovingMinimum)NetworkTimeoutNotPrimaryErrorOperationFailure_OperationCancelled)Hello)_create_lock)_SDAM_LOGGER
_debug_log_SDAMStatusMessage)_is_faas)MovingAverage)ServerDescription)_SrvResolver)periodic_executor)_shutdown_executors)
ConnectionPool_CancellationContext)TopologySettings)TopologyT	ExceptionNone)errorreturnc                 C  s   d| _ d| _d| _dS )z'PYTHON-2433 Clear error traceback info.N)__traceback____context__	__cause__r     r&   ?/tmp/pip-unpacked-wheel-36gvocj8/pymongo/synchronous/monitor.py	_sanitize.   s    r(   float)startr!   c                 C  s   t dt |  S )zReturn the duration since the given start time.

    Accounts for buggy platforms where time.monotonic() is not monotonic.
    See PYTHON-4600.
    g        )maxtime	monotonic)r*   r&   r&   r'   _monotonic_duration5   s    r.   c                   @  sj   e Zd ZdddddddZdd	d
dZdd	ddZdd	ddZddddddZdd	ddZdS )MonitorBaser   strintr)   )topologynameintervalmin_intervalc                   sh   dd fdd}t j||||d}|| _dddd	 fd
d}t| |j t||| _t|  dS )zBase class to do periodic work on a background thread.

        The background thread is signaled to stop when the Topology or
        this instance is freed.
        boolr!   c                    s     } | d krdS |    dS )NFT)_run)monitorZself_refr&   r'   targetH   s
    z$MonitorBase.__init__.<locals>.target)r4   r5   r;   r3   NzOptional[Topology]r   )dummyr!   c                   s     }|r|   d S Ngc_safe_close)r<   r9   r:   r&   r'   _on_topology_gcU   s    z-MonitorBase.__init__.<locals>._on_topology_gc)N)	r   ZPeriodicExecutor	_executorweakrefrefcloseproxy	_topology	_register)selfr2   r3   r4   r5   r;   executorr@   r&   r:   r'   __init__?   s    	   zMonitorBase.__init__r   r7   c                 C  s   | j   dS )z[Start monitoring, or restart after a fork.

        Multiple calls have no effect.
        N)rA   openrH   r&   r&   r'   rK   a   s    zMonitorBase.openc                 C  s   | j   dS )zGC safe close.N)rA   rD   rL   r&   r&   r'   r?   h   s    zMonitorBase.gc_safe_closec                 C  s   |    dS )zWClose and stop monitoring.

        open() restarts the monitor after closing.
        Nr>   rL   r&   r&   r'   rD   l   s    zMonitorBase.closeNzOptional[int])timeoutr!   c                 C  s   | j | dS )zWait for the monitor to stop.N)rA   join)rH   rM   r&   r&   r'   rN   s   s    zMonitorBase.joinc                 C  s   | j   dS )z)If the monitor is sleeping, wake it soon.N)rA   ZwakerL   r&   r&   r'   request_checkw   s    zMonitorBase.request_check)N)	__name__
__module____qualname__rJ   rK   r?   rD   rN   rO   r&   r&   r&   r'   r/   >   s   "r/   c                      s   e Zd Zddddd fddZdd	d
dZdd	ddZdd	ddZdd	ddZdd	ddZdd	ddZ	dd	ddZ
dd	ddZdddddZ  ZS )Monitorr   r   r   r   )server_descriptionr2   pooltopology_settingsc                   s   t  |d|jtj || _|| _|| _| jjj	| _
| j
dk	oD| j
j| _d| _t||||j| _|jdkrvd| _n|jdkrd| _n
t  | _dS )a   Class to monitor a MongoDB server on a background thread.

        Pass an initial ServerDescription, a Topology, a Pool, and
        TopologySettings.

        The Topology is weakly referenced. The Pool must be exclusive to this
        Monitor.
        Zpymongo_server_monitor_threadNstreamTpollF)superrJ   heartbeat_frequencyr   MIN_HEARTBEAT_INTERVAL_server_description_pool	_settingsZ_pool_optionsZ_event_listeners
_listenersZenabled_for_server_heartbeat_publish_cancel_context_RttMonitorZ_create_pool_for_monitoraddress_rtt_monitorZserver_monitoring_mode_streamr   )rH   rT   r2   rU   rV   	__class__r&   r'   rJ   }   s,    


zMonitor.__init__r   r7   c                 C  s   | j }|r|  dS )zCancel any concurrent hello check.

        Note: this is called from a weakref.proxy callback and MUST NOT take
        any locks.
        N)ra   cancel)rH   contextr&   r&   r'   cancel_check   s    zMonitor.cancel_checkc                 C  s    | j   | jjr| j   dS )z1Start an _RttMonitor that periodically runs ping.N)rd   rK   rA   _stoppedrD   rL   r&   r&   r'   _start_rtt_monitor   s    
zMonitor._start_rtt_monitorc                 C  s    | j   | j  |   d S r=   )rA   rD   rd   r?   rj   rL   r&   r&   r'   r?      s    

zMonitor.gc_safe_closec                 C  s   |    | j  |   d S r=   )r?   rd   rD   _reset_connectionrL   r&   r&   r'   rD      s    
zMonitor.closec                 C  s   | j   d S r=   )r]   resetrL   r&   r&   r'   rm      s    zMonitor._reset_connectionc              
   C  s   z| j }z|  | _ W nT tk
rj } z6t| t| j j|d| _ |jrR| j  W Y W d S d }~X Y nX | j	j
| j | j jt| j jtd | jr| j jr| j jr|   | j  | j jr|jr| j  W n tk
r   |   Y nX d S )Nr%   )Z
reset_poolZinterrupt_connections)r\   _check_serverr   r(   r   rc   is_server_type_knownrA   Z
skip_sleeprF   Z	on_changer    
isinstancer
   re   topology_versionrl   ReferenceErrorrD   )rH   Zprev_sdexcr&   r&   r'   r8      s:     

zMonitor._runc           	      C  sf  t  }z`z|  W W S  ttfk
rd } z,ttttf |j	}| j
|d  W 5 d}~X Y nX W n tk
r~    Y n tk
r` } zt| | j}|j}t|}t| jo|jo|j}| jr| jdk	st| j|||| ttjr tt| j
j |d |d ||d |t!j"d | #  t$|t%r6 | j&'  t(||d W Y S d}~X Y nX dS )z^Call hello or read the next streaming response.

        Returns a ServerDescription.
        z$clusterTimeNr        )
topologyId
serverHost
serverPortawaited
durationMSZfailuremessager%   ))r,   r-   _check_oncer   r   r   r   r0   r   detailsrF   Zreceive_cluster_timegetrs   r   r(   r\   rc   r.   r6   re   rp   rr   r`   r_   AssertionErrorZpublish_server_heartbeat_failedr   isEnabledForloggingDEBUGr   _topology_idr   ZHEARTBEAT_FAILrm   rq   r   rd   rn   r   )	rH   r*   rt   r~   r    sdrc   durationrz   r&   r&   r'   ro      sF    

zMonitor._check_serverc           	      C  sv  | j j}| j }t| jjo(| jo(|jo(|j}| jrN| j	dk	s@t
| j	|| | jrd| jjrd|   | j  }ttjrtt| jj|j|j|d |d |tjd |j| _| |\}}|js| j| | j  \}}t!||||d}| jr| j	dk	st
| j	"||||j ttjr\tt| jj|j|j|d |d ||d |j#tj$d
 |W  5 Q R  S Q R X dS )zfA single attempt to call hello.

        Returns a ServerDescription, or raises an exception.
        Nr   ru   )rw   driverConnectionIdserverConnectionIdrx   ry   rz   r|   )Zmin_round_trip_timerv   )	rw   r   r   rx   ry   rz   r{   Zreplyr|   )%r\   rc   r6   r]   Zconnsre   rp   rr   r`   r_   r   Z publish_server_heartbeat_startedra   Z	cancelledrm   checkoutr   r   r   r   r   rF   r   idZserver_connection_idr   ZHEARTBEAT_STARTZcancel_context_check_with_socket	awaitablerd   
add_sampler   r   Z"publish_server_heartbeat_succeededdocumentZHEARTBEAT_SUCCESS)	rH   rc   r   rz   connresponseZround_trip_timeZavg_rttZmin_rttr&   r&   r'   r}     sd       zMonitor._check_oncer   ztuple[Hello, float])r   r!   c                 C  st   | j  }t }|jr*t| dd}n:| jrV|jrV| j	j
rV||| j	j
| jj}n||dd}t|}||fS )zcReturn (Hello, round_trip_time).

        Can raise ConnectionFailure or OperationFailure.
        T)r   N)rF   Zmax_cluster_timer,   r-   Zmore_to_comer   Z_next_replyre   Zperformed_handshaker\   rr   Z_hellor^   rZ   r.   )rH   r   Zcluster_timer*   r   r   r&   r&   r'   r   W  s$    
zMonitor._check_with_socket)rP   rQ   rR   rJ   rj   rl   r?   rD   rm   r8   ro   r}   r   __classcell__r&   r&   rf   r'   rS   |   s   '	'+;rS   c                      s@   e Zd Zddd fddZdddd	Zd
dddZ  ZS )
SrvMonitorr   r   )r2   rV   c                   sP   t  |dtj|j || _| jj| _t| jj	t
s8t| jj	| _t | _dS )zClass to poll SRV records on a background thread.

        Pass a Topology and a TopologySettings.

        The Topology is weakly referenced.
        Zpymongo_srv_polling_threadN)rY   rJ   r   MIN_SRV_RESCAN_INTERVALrZ   r^   Z_seeds	_seedlistrq   Zfqdnr0   r   _fqdnr,   r-   _startup_time)rH   r2   rV   rf   r&   r'   rJ   r  s    

zSrvMonitor.__init__r   r7   c                 C  s^   t  | jtj k rd S |  }|rZ|| _z| j| j W n t	k
rX   | 
  Y nX d S r=   )r,   r-   r   r   r   _get_seedlistr   rF   Zon_srv_updaters   rD   )rH   seedlistr&   r&   r'   r8     s    zSrvMonitor._runzOptional[list[tuple[str, Any]]]c                 C  st   z8t | j| jjj| jj}| \}}t|dkr6tW n tk
rV   | 	  Y dS X | j
t|tj |S dS )zXPoll SRV records for a seedlist.

        Returns a list of ServerDescriptions.
        r   N)r   r   r^   Zpool_optionsconnect_timeoutZsrv_service_nameZget_hosts_and_min_ttllenr   rO   rA   Zupdate_intervalr+   r   r   )rH   resolverr   Zttlr&   r&   r'   r     s    zSrvMonitor._get_seedlist)rP   rQ   rR   rJ   r8   r   r   r&   r&   rf   r'   r   q  s   r   c                      s|   e Zd Zdddd fddZddd	d
ZdddddZddddZddddZddddZddddZ	  Z
S )rb   r   r   r   )r2   rV   rU   c                   s8   t  |d|jtj || _t | _t | _	t
 | _dS )z\Maintain round trip times for a server.

        The Topology is weakly referenced.
        Zpymongo_server_rtt_threadN)rY   rJ   rZ   r   r[   r]   r   _moving_averager	   _moving_minr   _lock)rH   r2   rV   rU   rf   r&   r'   rJ     s    z_RttMonitor.__init__r   r7   c                 C  s   |    | j  d S r=   )r?   r]   rn   rL   r&   r&   r'   rD     s    z_RttMonitor.closer)   )sampler!   c              	   C  s.   | j  | j| | j| W 5 Q R X dS )zAdd a RTT sample.N)r   r   r   r   )rH   r   r&   r&   r'   r     s    z_RttMonitor.add_sampleztuple[Optional[float], float]c              
   C  s2   | j " | j | j fW  5 Q R  S Q R X dS )zBGet the calculated average, or None if no samples yet and the min.N)r   r   r   r   rL   r&   r&   r'   r     s    z_RttMonitor.getc              	   C  s*   | j  | j  | j  W 5 Q R X dS )zReset the average RTT.N)r   r   rn   r   rL   r&   r&   r'   rn     s    
z_RttMonitor.resetc                 C  sT   z|   }| | W n8 tk
r2   |   Y n tk
rN   | j  Y nX d S r=   )_pingr   rs   rD   r   r]   rn   )rH   Zrttr&   r&   r'   r8     s    z_RttMonitor._runc              
   C  sJ   | j  6}| jjrtdt }|  t|W  5 Q R  S Q R X dS )z)Run a "hello" command and return the RTT.z_RttMonitor closedN)	r]   r   rA   rk   r   r,   r-   Zhellor.   )rH   r   r*   r&   r&   r'   r     s    z_RttMonitor._ping)rP   rQ   rR   rJ   rD   r   r   rn   r8   r   r   r&   r&   rf   r'   rb     s   rb   )r9   r!   c                 C  s   t | t}t| d S r=   )rB   rC   _unregister	_MONITORSadd)r9   rC   r&   r&   r'   rG     s    rG   z"weakref.ReferenceType[MonitorBase])monitor_refr!   c                 C  s   t |  d S r=   )r   remove)r   r&   r&   r'   r     s    r   r7   c                  C  s8   t d krd S tt } | D ]}| }|r|  qd }d S r=   )r   listr?   )ZmonitorsrC   r9   r&   r&   r'   _shutdown_monitors  s    
r   c                  C  s    t } | r|   t} | r|   d S r=   )r   r   )shutdownr&   r&   r'   _shutdown_resources  s    r   )@__doc__
__future__r   atexitr   r,   rB   typingr   r   r   r   r   Zpymongor   Zpymongo._csotr	   Zpymongo.errorsr
   r   r   r   Zpymongo.hellor   Zpymongo.lockr   Zpymongo.loggerr   r   r   Zpymongo.pool_optionsr   Zpymongo.read_preferencesr   Zpymongo.server_descriptionr   Zpymongo.srv_resolverr   Zpymongo.synchronousr   Z%pymongo.synchronous.periodic_executorr   Zpymongo.synchronous.poolr   r   r   Zpymongo.synchronous.settingsr   Zpymongo.synchronous.topologyr   Z_IS_SYNCr(   r.   r/   rS   r   rb   setr   rG   r   r   r   registerr&   r&   r&   r'   <module>   sH   	> v<C
