# Copyright 2017 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers for applying Google Cloud Firestore changes in a transaction."""

from google.api_core import retry as retries

from google.cloud.firestore_v1 import types
from typing import Any, Coroutine, NoReturn, Optional, Union

_CANT_BEGIN: str
_CANT_COMMIT: str
_CANT_RETRY_READ_ONLY: str
_CANT_ROLLBACK: str
_EXCEED_ATTEMPTS_TEMPLATE: str
_INITIAL_SLEEP: float
_MAX_SLEEP: float
_MISSING_ID_TEMPLATE: str
_MULTIPLIER: float
_WRITE_READ_ONLY: str


MAX_ATTEMPTS = 5
"""int: Default number of transaction attempts (with retries)."""
_CANT_BEGIN: str = "The transaction has already begun. Current transaction ID: {!r}."
_MISSING_ID_TEMPLATE: str = "The transaction has no transaction ID, so it cannot be {}."
_CANT_ROLLBACK: str = _MISSING_ID_TEMPLATE.format("rolled back")
_CANT_COMMIT: str = _MISSING_ID_TEMPLATE.format("committed")
_WRITE_READ_ONLY: str = "Cannot perform write operation in read-only transaction."
_INITIAL_SLEEP: float = 1.0
"""float: Initial "max" for sleep interval. To be used in :func:`_sleep`."""
_MAX_SLEEP: float = 30.0
"""float: Eventual "max" sleep time. To be used in :func:`_sleep`."""
_MULTIPLIER: float = 2.0
"""float: Multiplier for exponential backoff. To be used in :func:`_sleep`."""
_EXCEED_ATTEMPTS_TEMPLATE: str = "Failed to commit transaction in {:d} attempts."
_CANT_RETRY_READ_ONLY: str = "Only read-write transactions can be retried."


class BaseTransaction(object):
    """Accumulate read-and-write operations to be sent in a transaction.

    Args:
        max_attempts (Optional[int]): The maximum number of attempts for
            the transaction (i.e. allowing retries). Defaults to
            :attr:`~google.cloud.firestore_v1.transaction.MAX_ATTEMPTS`.
        read_only (Optional[bool]): Flag indicating if the transaction
            should be read-only or should allow writes. Defaults to
            :data:`False`.
    """

    def __init__(self, max_attempts=MAX_ATTEMPTS, read_only=False) -> None:
        self._max_attempts = max_attempts
        self._read_only = read_only
        self._id = None

    def _add_write_pbs(self, write_pbs) -> NoReturn:
        raise NotImplementedError

    def _options_protobuf(
        self, retry_id: Union[bytes, None]
    ) -> Optional[types.common.TransactionOptions]:
        """Convert the current object to protobuf.

        The ``retry_id`` value is used when retrying a transaction that
        failed (e.g. due to contention). It is intended to be the "first"
        transaction that failed (i.e. if multiple retries are needed).

        Args:
            retry_id (Union[bytes, NoneType]): Transaction ID of a transaction
                to be retried.

        Returns:
            Optional[google.cloud.firestore_v1.types.TransactionOptions]:
            The protobuf ``TransactionOptions`` if ``read_only==True`` or if
            there is a transaction ID to be retried, else :data:`None`.

        Raises:
            ValueError: If ``retry_id`` is not :data:`None` but the
                transaction is read-only.
        """
        if retry_id is not None:
            if self._read_only:
                raise ValueError(_CANT_RETRY_READ_ONLY)

            return types.TransactionOptions(
                read_write=types.TransactionOptions.ReadWrite(
                    retry_transaction=retry_id
                )
            )
        elif self._read_only:
            return types.TransactionOptions(
                read_only=types.TransactionOptions.ReadOnly()
            )
        else:
            return None

    @property
    def in_progress(self):
        """Determine if this transaction has already begun.

        Returns:
            bool: Indicates if the transaction has started.
        """
        return self._id is not None

    @property
    def id(self):
        """Get the current transaction ID.

        Returns:
            Optional[bytes]: The transaction ID (or :data:`None` if the
            current transaction is not in progress).
        """
        return self._id

    def _clean_up(self) -> None:
        """Clean up the instance after :meth:`_rollback`` or :meth:`_commit``.

        This intended to occur on success or failure of the associated RPCs.
        """
        self._write_pbs = []
        self._id = None

    def _begin(self, retry_id=None) -> NoReturn:
        raise NotImplementedError

    def _rollback(self) -> NoReturn:
        raise NotImplementedError

    def _commit(self) -> Union[list, Coroutine[Any, Any, list]]:
        raise NotImplementedError

    def get_all(
        self,
        references: list,
        retry: retries.Retry = None,
        timeout: float = None,
    ) -> NoReturn:
        raise NotImplementedError

    def get(
        self,
        ref_or_query,
        retry: retries.Retry = None,
        timeout: float = None,
    ) -> NoReturn:
        raise NotImplementedError


class _BaseTransactional(object):
    """Provide a callable object to use as a transactional decorater.

    This is surfaced via
    :func:`~google.cloud.firestore_v1.transaction.transactional`.

    Args:
        to_wrap (Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]):
            A callable that should be run (and retried) in a transaction.
    """

    def __init__(self, to_wrap) -> None:
        self.to_wrap = to_wrap
        self.current_id = None
        """Optional[bytes]: The current transaction ID."""
        self.retry_id = None
        """Optional[bytes]: The ID of the first attempted transaction."""

    def _reset(self) -> None:
        """Unset the transaction IDs."""
        self.current_id = None
        self.retry_id = None

    def _pre_commit(self, transaction, *args, **kwargs) -> NoReturn:
        raise NotImplementedError

    def _maybe_commit(self, transaction) -> NoReturn:
        raise NotImplementedError

    def __call__(self, transaction, *args, **kwargs):
        raise NotImplementedError
