a
    !fP                     @   s|  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	m
Z
mZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dl m!Z! d dl"m#Z$ d dl"mZ% d dl&m'Z( e$j)Z)ej*rBd dl+m,Z, d dlm-Z- d dl.m/Z/ d dl0m1Z2 e3e4Z5e%j67 Z8eej9ej:f Z;G dd de(j<Z=dS )    )absolute_importN)AnyDictOptionalSequenceTupleTypeUnion)gapic_v1)AnonymousCredentials)service_account)types)
exceptions)futures)thread)ordered_sequencer)unordered_sequencer)FlowController)gapic_version)client)	pubsub_v1)_batch)OptionalRetry)pubsubc                	       s  e Zd ZdZd4eejef eejef e	d fddZ
ed5eeejef e	d dddZeZeed	d
dZe fddZeeedddZeeddddZdd	 fddZdejjejjfeeeddeeef ddddZdd	ddZdd	d d!Zdd	d"d#Zdd	d$d%Zdd	d&d'Zdd	d(d)Z d6ed*edd+d,d-Z!e"dd.d/d0Z#d7eeedd1d2d3Z$  Z%S )8Clienta  A publisher client for Google Cloud Pub/Sub.

    This creates an object that is capable of publishing messages.
    Generally, you can instantiate this client with no arguments, and you
    get sensible defaults.

    Args:
        batch_settings:
            The settings for batch publishing.
        publisher_options:
            The options for the publisher client. Note that enabling message ordering
            will override the publish retry timeout to be infinite.
        kwargs:
            Any additional arguments provided are sent as keyword arguments to the
            underlying
            :class:`~google.cloud.pubsub_v1.gapic.publisher_client.PublisherClient`.
            Generally you should not need to set additional keyword
            arguments. Regional endpoints can be set via ``client_options`` that
            takes a single key-value pair that defines the endpoint.

    Example:

    .. code-block:: python

        from google.cloud import pubsub_v1

        publisher_client = pubsub_v1.PublisherClient(
            # Optional
            batch_settings = pubsub_v1.types.BatchSettings(
                max_bytes=1024,  # One kilobyte
                max_latency=1,   # One second
            ),

            # Optional
            publisher_options = pubsub_v1.types.PublisherOptions(
                enable_message_ordering=False,
                flow_control=pubsub_v1.types.PublishFlowControl(
                    message_limit=2000,
                    limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK,
                ),
            ),

            # Optional
            client_options = {
                "api_endpoint": REGIONAL_ENDPOINT
            }
        )
     )batch_settingspublisher_optionskwargsc                    s   t |tju s"t|dks"J dt |tju sDt|dksDJ dtjdrndtjdi|d< t |d< tj| | _	| j	d | _
t jf i | | jj| _tj| _tj| | _| j | _i | _d| _d | _t| j	j| _d S )	Nr   zBbatch_settings must be of type BatchSettings or an empty sequence.zHpublisher_options must be of type PublisherOptions or an empty sequence.ZPUBSUB_EMULATOR_HOSTZapi_endpointZclient_optionscredentialsF)typer   BatchSettingslenPublisherOptionsosenvirongetr   r   _enable_message_orderingsuper__init__
_transportZ_host_targetr   ZBatch_batch_classr   Z	make_lock_batch_lock_sequencers_is_stopped_commit_threadr   Zflow_control_flow_controller)selfr   r   r   	__class__r   h/var/www/html/python-backend/venv/lib/python3.9/site-packages/google/cloud/pubsub_v1/publisher/client.pyr)   n   s6    



zClient.__init__)filenamer   r   returnc                 K   s$   t j|}||d< | |fi |S )a  Creates an instance of this client using the provided credentials
        file.

        Args:
            filename:
                The path to the service account private key JSON file.
            batch_settings:
                The settings for batch publishing.
            kwargs:
                Additional arguments to pass to the constructor.

        Returns:
            A Publisher instance that is the constructed client.
        r   )r   Credentialsfrom_service_account_file)clsr6   r   r   r   r   r   r5   r9      s    z Client.from_service_account_file)r7   c                 C   s   | j S )zeReturn the target (where the API is).

        Returns:
            The location of the API.
        )r+   r2   r   r   r5   target   s    zClient.targetc                    s   d}t j|td t S )a  The underlying gapic API client.

        .. versionchanged:: 2.10.0
            Instead of a GAPIC ``PublisherClient`` client instance, this property is a
            proxy object to it with the same interface.

        .. deprecated:: 2.10.0
            Use the GAPIC methods and properties on the client instance directly
            instead of through the :attr:`api` attribute.
        zThe "api" property only exists for backward compatibility, access its attributes directly thorugh the client instance (e.g. "client.foo" instead of "client.api.foo").)category)warningswarnDeprecationWarningr(   )r2   msgr3   r   r5   api   s    z
Client.api)topicordering_keyr7   c                 C   sN   ||f}| j |}|du rJ|dkr2t| |}nt| ||}|| j |< |S )zdGet an existing sequencer or create a new one given the (topic,
        ordering_key) pair.
        N )r.   r&   r   UnorderedSequencerr   OrderedSequencerr2   rC   rD   sequencer_key	sequencerr   r   r5   _get_or_create_sequencer   s    
zClient._get_or_create_sequencerNc                 C   sv   | j \ | jrtd| js$td||f}| j|}|du rLtd n|	  W d   n1 sh0    Y  dS )a*  Resume publish on an ordering key that has had unrecoverable errors.

        Args:
            topic: The topic to publish messages to.
            ordering_key: A string that identifies related messages for which
                publish order should be respected.

        Raises:
            RuntimeError:
                If called after publisher has been stopped by a `stop()` method
                call.
            ValueError:
                If the topic/ordering key combination has not been seen before
                by this client.
        z-Cannot resume publish on a stopped publisher.zICannot resume publish on a topic/ordering key if ordering is not enabled.NzCError: The topic/ordering key combination has not been seen before.)
r-   r/   RuntimeErrorr'   
ValueErrorr.   r&   _LOGGERdebugZunpauserH   r   r   r5   resume_publish   s    zClient.resume_publishzpubsub_types.PublishResponsec                    s   t  j|i |S )z#Call the GAPIC public API directly.)r(   publish)r2   argsr   r3   r   r5   _gapic_publish	  s    zClient._gapic_publishrE   r   ztypes.OptionalTimeoutz"pubsub_v1.publisher.futures.Future)rC   datarD   retrytimeoutattrsr7   c              
      s  t |tstdjs(|dkr(tdt| D ]6\}}t |trJq6t |trd|d||< q6tdq6t	|||d}	t
j|	 zj  W n< tjy }
 z"t }||
 |W  Y d}
~
S d}
~
0 0  fdd	}|tjju  rjj}|tjju rjj}j jr(td
jrz|tjju rbj}|j|j j }|!d}d}n|durz|!d}d}"||}|j ||d}|#| $  |W  d   S 1 s0    Y  dS )aN  Publish a single message.

        .. note::
            Messages in Pub/Sub are blobs of bytes. They are *binary* data,
            not text. You must send data as a bytestring
            (``bytes`` in Python 3; ``str`` in Python 2), and this library
            will raise an exception if you send a text string.

            The reason that this is so important (and why we do not try to
            coerce for you) is because Pub/Sub is also platform independent
            and there is no way to know how to decode messages properly on
            the other side; therefore, encoding and decoding is a required
            exercise for the developer.

        Add the given message to this object; this will cause it to be
        published once the batch either has enough messages or a sufficient
        period of time has elapsed.
        This method may block if LimitExceededBehavior.BLOCK is used in the
        flow control settings.

        Example:
            >>> from google.cloud import pubsub_v1
            >>> client = pubsub_v1.PublisherClient()
            >>> topic = client.topic_path('[PROJECT]', '[TOPIC]')
            >>> data = b'The rain in Wales falls mainly on the snails.'
            >>> response = client.publish(topic, data, username='guido')

        Args:
            topic: The topic to publish messages to.
            data: A bytestring representing the message body. This
                must be a bytestring.
            ordering_key: A string that identifies related messages for which
                publish order should be respected. Message ordering must be
                enabled for this client to use this feature.
            retry:
                Designation of what errors, if any, should be retried. If `ordering_key`
                is specified, the total retry deadline will be changed to "infinity".
                If given, it overides any retry passed into the client through
                the ``publisher_options`` argument.
            timeout:
                The timeout for the RPC request. Can be used to override any timeout
                passed in through ``publisher_options`` when instantiating the client.

            attrs: A dictionary of attributes to be
                sent as metadata. (These may be text strings or byte strings.)

        Returns:
            A :class:`~google.cloud.pubsub_v1.publisher.futures.Future`
            instance that conforms to Python Standard library's
            :class:`~concurrent.futures.Future` interface (but not an
            instance of that class).

        Raises:
            RuntimeError:
                If called after publisher has been stopped by a `stop()` method
                call.

            pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
                the ``message`` would exceed the max size limit on the backend.
        z=Data being published to Pub/Sub must be sent as a bytestring.rE   zSCannot publish a message with an ordering key when message ordering is not enabled.zutf-8zGAll attributes being published to Pub/Sub must be sent as text strings.)rT   rD   
attributesNc                    s   j   d S N)r1   release)futuremessager2   r   r5   on_publish_done|  s    z'Client.publish.<locals>.on_publish_donez&Cannot publish on a stopped publisher.g      A)rU   rV   )%
isinstancebytes	TypeErrorr'   rM   copyitemsstrdecode_raw_proto_pubbsub_messagegapic_typesPubsubMessagewrapr1   addr   ZFlowControlLimitErrorr   Futureset_exceptionr
   methodDEFAULTr   rU   rV   r-   r/   rL   r*   Z_wrapped_methodsrQ   Z_retryZwith_deadlinerK   add_done_callback!_ensure_commit_timer_runs_no_lock)r2   rC   rT   rD   rU   rV   rW   kvZ
vanilla_pbexcr[   r^   	transportZ
base_retryrJ   r   r\   r5   rQ     sb    G







zClient.publishc                 C   s2   | j  |   W d   n1 s$0    Y  dS )zEnsure a cleanup/commit timer thread is running.

        If a cleanup/commit timer thread is already running, this does nothing.
        N)r-   rp   r;   r   r   r5   $ensure_cleanup_and_commit_timer_runs  s    z+Client.ensure_cleanup_and_commit_timer_runsc                 C   s"   | j s| jjtdk r|   dS )zEnsure a commit timer thread is running, without taking
        _batch_lock.

        _batch_lock must be held before calling this method.
        infN)r0   r   max_latencyfloat_start_commit_threadr;   r   r   r5   rp     s    z(Client._ensure_commit_timer_runs_no_lockc                 C   s"   t jd| jdd| _| j  dS )z>Start a new thread to actually wait and commit the sequencers.zThread-PubSubBatchCommitterT)namer<   daemonN)	threadingThread_wait_and_commit_sequencersr0   startr;   r   r   r5   ry     s    zClient._start_commit_threadc                 C   sf   t | jj td | j4 | jr6W d   dS |   d| _	W d   n1 sX0    Y  dS )z;Wait up to the batching timeout, and commit all sequencers.zCommit thread is waking upN)
timesleepr   rw   rN   rO   r-   r/   _commit_sequencersr0   r;   r   r   r5   r~     s    
z"Client._wait_and_commit_sequencersc                 C   sB   dd | j  D }|D ]}| j |= q| j  D ]}|  q0dS )z1Clean up finished sequencers and commit the rest.c                 S   s   g | ]\}}|  r|qS r   )is_finished).0keyrJ   r   r   r5   
<listcomp>  s   z-Client._commit_sequencers.<locals>.<listcomp>N)r.   rc   valuescommit)r2   Zfinished_sequencer_keysrI   rJ   r   r   r5   r     s    
zClient._commit_sequencersc                 C   sV   | j < | jrtdd| _| j D ]}|  q&W d   n1 sH0    Y  dS )a  Immediately publish all outstanding messages.

        Asynchronously sends all outstanding messages and
        prevents future calls to `publish()`. Method should
        be invoked prior to deleting this `Client()` object
        in order to ensure that no pending messages are lost.

        .. note::

            This method is non-blocking. Use `Future()` objects
            returned by `publish()` to make sure all publish
            requests completed, either in success or error.

        Raises:
            RuntimeError:
                If called after publisher has been stopped by a `stop()` method
                call.
        z(Cannot stop a publisher already stopped.TN)r-   r/   rL   r.   r   stop)r2   rJ   r   r   r5   r     s    zClient.stopz_batch.thread.Batch)rC   batchrD   r7   c                 C   s   |  ||}|| d S rY   )rK   
_set_batch)r2   rC   r   rD   rJ   r   r   r5   r     s    zClient._set_batch)batch_classr7   c                 C   s
   || _ d S rY   )r,   )r2   r   r   r   r5   _set_batch_class  s    zClient._set_batch_class)rC   rJ   rD   r7   c                 C   s   ||f}|| j |< d S rY   )r.   )r2   rC   rJ   rD   rI   r   r   r5   _set_sequencer  s    zClient._set_sequencer)r   r   )r   )rE   )rE   )&__name__
__module____qualname____doc__r	   r   r!   r   r#   r   r)   classmethodrd   r9   Zfrom_service_account_jsonpropertyr<   rB   SequencerTyperK   rP   rS   r
   rm   rn   r`   rQ   ru   rp   ry   r~   r   r   r   r   r   r   __classcell__r   r   r3   r5   r   <   sl   3  . $
 	  r   )>
__future__r   rb   loggingr$   r|   r   typingr   r   r   r   r   r   r	   r>   Zgoogle.api_corer
   Zgoogle.auth.credentialsr   Zgoogle.oauth2r   Zgoogle.cloud.pubsub_v1r   Z google.cloud.pubsub_v1.publisherr   r   Z'google.cloud.pubsub_v1.publisher._batchr   Z+google.cloud.pubsub_v1.publisher._sequencerr   r   Z0google.cloud.pubsub_v1.publisher.flow_controllerr   Zgoogle.pubsub_v1r   Zpackage_versionrg   Z#google.pubsub_v1.services.publisherr   Zpublisher_client__version__TYPE_CHECKINGZgoogle.cloudr   r   Z*google.pubsub_v1.services.publisher.clientr   Zgoogle.pubsub_v1.typesr   Zpubsub_types	getLoggerr   rN   rh   Zpbrf   rG   rF   r   ZPublisherClientr   r   r   r   r5   <module>   sB   $


