Source code for action_triggers.base.config

"""Base module for handling the configuration for a given Action Trigger.

Note: This is not used in the current implementation. It is a work in progress
and a placeholder for future work.
"""

import typing as _t
from abc import ABC, abstractmethod, abstractproperty

from django.conf import settings

from action_triggers.base.error import ErrorBase
from action_triggers.config_required_fields import RequiredFieldBase
from action_triggers.dynamic_loading import replace_dict_values_with_results
from action_triggers.enums import ActionTriggerType


[docs] class ConnectionBase(ABC): """Base class for establishing a connection. This class provides the capability to marry the configuration provided in the settings with the connection details and parameters provided by the user. However, this enforces a one-sided relationship where the user cannot overwrite the base configuration as defined in the settings as the base configuration always takes precedence. :param config: The configuration as defined in `settings.ACTION_TRIGGERS` for a given action trigger type. :param conn_details: Additional connection parameters to use for establishing the connection provided by the user. :param params: Additional parameters to use for the action trigger provided by the user. """ @abstractproperty def required_conn_detail_fields(self) -> _t.Sequence[RequiredFieldBase]: """The required connection detail fields that must be provided by the user. """ @abstractproperty def required_params_fields(self) -> _t.Sequence[RequiredFieldBase]: """The required parameters fields that must be provided by the user.""" @abstractproperty def error_class(self) -> _t.Type[ErrorBase]: """The error class to use for storing errors."""
[docs] @abstractmethod async def connect(self) -> None: """Establish a connection to the message broker."""
[docs] @abstractmethod async def close(self) -> None: """Close the connection to the message broker."""
def __init__(self, config: dict, conn_details: dict, params: dict): self.config = config self._user_conn_details = conn_details self._user_params = params self._conn_details: _t.Optional[dict] = None self._params: _t.Optional[dict] = None self._errors = self.error_class() self.conn: _t.Any = None async def __aenter__(self): await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() @property def conn_details(self) -> dict: """Lazy load the the merged connection details. When merging the connection details, the user provided base connection details take precedence over the base configuration. :return: The merged connection details. """ if self._conn_details is None: self._conn_details = { **self.config.get("conn_details", {}), **self._user_conn_details, } return self._conn_details @property def params(self) -> dict: """Lazy load the the merged parameters. When merging the parameters, the user provided base parameters take precedence over the base configuration. :return: The merged parameters. """ if self._params is None: self._params = { **self.config.get("params", {}), **self._user_params, } return self._params
[docs] class ActionTriggerActionBase(ABC): """Base class for triggering an action. This class should be subclassed to implement the specific action for an action trigger. :param key: The key for the action trigger (must exist in the `settings.ACTION_TRIGGERS[self.action_trigger_type]` dictionary). :param conn_details: The connection parameters to use for establishing the connection. :param params: Additional parameters to use for the action trigger. :param kwargs: Additional keyword arguments to pass to the subclass. """ @abstractproperty def action_trigger_type(self) -> ActionTriggerType: """The type of action trigger.""" @abstractproperty def conn_class(self) -> _t.Type[ConnectionBase]: """The connection class to use for establishing a connection to the service which the action will interact with. """ def __init__( self, key: str, conn_details: _t.Union[dict, None], params: _t.Union[dict, None], **kwargs, ): self.key = key self.config = _t.cast( dict, settings.ACTION_TRIGGERS[self.action_trigger_type.value] )[key] self.conn_details = self._build_and_merge_config( conn_details, self.config.get("conn_details"), ) self.params = self._build_and_merge_config( params, self.config.get("params"), ) self.kwargs = kwargs self._conn = self.conn_class( self.config, self.conn_details, self.params, ) def _build_and_merge_config( self, user_config: _t.Union[dict, None], base_config: _t.Union[dict, None], ) -> dict: """Builds and merges the user provided configuration with the base configuration. The building process involves computing certain values in the merged configuration. Note: The base configuration always takes precedence over the user provided configuration. :param user_config: The user provided configuration. :param base_config: The base configuration. :return: The merged configuration. """ return replace_dict_values_with_results( { **(user_config or {}), **(base_config or {}), } )
[docs] async def send_message(self, message: str) -> None: """Starts a connection with the message broker and sends a message. :param message: The message to send to the message broker. """ async with self._conn as conn: await self._send_message_impl(conn, message)
@abstractmethod async def _send_message_impl(self, conn: _t.Any, message: str) -> None: """Implementation of sending a message to the message broken given an established connection. :param conn: The established connection to the message broker. :param message: The message to send to the message broker. """