Message Broker Modules

action_triggers.message_broker.broker.get_broker_class(broker_name: str) Type[ActionTriggerActionBase][source]

Get the broker class based on the broker name.

Parameters:

broker_name – The name of the broker.

Returns:

The broker class.

Raises:

ValueError – If the broker name is invalid.

Enums specific to Action Trigger message brokers.

class action_triggers.message_broker.enums.BrokerType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Represents the types of brokers supported by the application.

AWS_SNS = 'aws_sns'
AWS_SQS = 'aws_sqs'
GCP_PUBSUB = 'gcp_pubsub'
KAFKA = 'kafka'
RABBITMQ = 'rabbitmq'
REDIS = 'redis'

Contains the error class for generic message broker errors relating to the connection and parameters.

class action_triggers.message_broker.error.MessageBrokerError[source]

Bases: ErrorBase

A class for storing errors for a message broker.

add_connection_params_error(key: str, message: str) None
add_params_error(key: str, message: str) None
connection_params

Descriptor for storing error messages for a field.

error_class

alias of ConnectionValidationError

params

Descriptor for storing error messages for a field.

Module to support sending messages to Kafka.

class action_triggers.message_broker.kafka.KafkaBroker(broker_key: str, conn_params: dict | None, params: dict | None, **kwargs)[source]

Bases: ActionTriggerActionBase

Broker class for Kafka.

action_trigger_type = 'brokers'
conn_class

alias of KafkaConnection

class action_triggers.message_broker.kafka.KafkaConnection(*args, **kwargs)[source]

Bases: ConnectionCore

Connection class for Kafka.

async close() None[source]

Close the connection to the message broker.

async connect() None[source]

Establish a connection to the message broker.

error_class

alias of MessageBrokerError

required_conn_detail_fields: _t.Sequence[RequiredFieldBase] = (HasField('bootstrap_servers'),)
required_params_fields: _t.Sequence[RequiredFieldBase] = (HasField('topic'),)

Module to support sending messages to RabbitMQ.

class action_triggers.message_broker.rabbitmq.RabbitMQBroker(key: str, conn_params: dict | None, params: dict | None, **kwargs)[source]

Bases: ActionTriggerActionBase

Broker class for RabbitMQ.

Parameters:
  • hey – The key for the broker (must existing in settings.ACTION_TRIGGERS[“brokers”]).

  • conn_params – The connection parameters to use for establishing the connection.

  • params – Additional parameters to use for the message broker.

  • kwargs – Additional keyword arguments to pass to the subclass.

action_trigger_type = 'brokers'
conn_class

alias of RabbitMQConnection

class action_triggers.message_broker.rabbitmq.RabbitMQConnection(*args, **kwargs)[source]

Bases: ConnectionCore

Connection class for RabbitMQ.

async close() None[source]

Close the connection to the message broker.

async connect() None[source]

Establish a connection to the message broker.

error_class

alias of MessageBrokerError

required_conn_detail_fields: _t.Sequence[RequiredFieldBase] = ()
required_params_fields: _t.Sequence[RequiredFieldBase] = (HasField('queue'),)
validate() None[source]

Validate the configuration.

Module to support sending messages to Redis.

class action_triggers.message_broker.redis.RedisBroker(key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]

Bases: ActionTriggerActionBase

Broker class for Redis.

action_trigger_type = 'brokers'
conn_class

alias of RedisConnection

class action_triggers.message_broker.redis.RedisConnection(*args, **kwargs)[source]

Bases: ConnectionCore

Connection class for Redis.

async close() None[source]

Close the connection to the Redis server.

async connect() None[source]

Connect to the Redis server.

error_class

alias of MessageBrokerError

required_conn_detail_fields: _t.Sequence[RequiredFieldBase] = (HasAtLeastOneOffField('url, host'),)
required_params_fields: _t.Sequence[RequiredFieldBase] = (HasField('channel'),)

Module to support sending messages to AWS SQS.

class action_triggers.message_broker.aws_sqs.AwsSqsBroker(key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]

Bases: ActionTriggerActionBase

Broker class for AWS SQS.

action_trigger_type = 'brokers'
conn_class

alias of AwsSqsConnection

class action_triggers.message_broker.aws_sqs.AwsSqsConnection(*args, **kwargs)[source]

Bases: ConnectionCore

Connection class for AWS SQS.

async close() None[source]

Close the connection to the AWS SQS service.

async connect() None[source]

Connect to the AWS SQS service.

error_class

alias of MessageBrokerError

get_queue_url() str[source]

Get the queue URL from the parameters or fetch it from AWS using the queue name.

Returns:

The queue URL.

required_conn_detail_fields: _t.Sequence[RequiredFieldBase] = (HasField('endpoint_url'),)
required_params_fields: _t.Sequence[RequiredFieldBase] = (HasAtLeastOneOffField('queue_url, queue_name'),)

Module to support sending messages to AWS SNS.

class action_triggers.message_broker.aws_sns.AwsSnsBroker(key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]

Bases: ActionTriggerActionBase

Broker class for AWS SQS.

action_trigger_type = 'brokers'
conn_class

alias of AwsSnsConnection

class action_triggers.message_broker.aws_sns.AwsSnsConnection(*args, **kwargs)[source]

Bases: ConnectionCore

Connection class for AWS SNS.

async close() None[source]

Close the connection to the AWS SNS service.

async connect() None[source]

Connect to the AWS SQS service.

error_class

alias of MessageBrokerError

required_conn_detail_fields: _t.Sequence[RequiredFieldBase] = (HasField('endpoint_url'),)
required_params_fields: _t.Sequence[RequiredFieldBase] = (HasField('topic_arn'),)

Module to support sending messages to GCP Pub/Sub.

Developer Notes:

Make sure you use the Google Pub/Sub emulator for development and testing. The docs for the emulator can be found at https://cloud.google.com/pubsub/docs/emulator.

class action_triggers.message_broker.gcp_pubsub.GCPPubSubBroker(key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]

Bases: ActionTriggerActionBase

Broker class for GCP Pub/Sub

action_trigger_type = 'brokers'
conn_class

alias of GCPPubSubConnection

class action_triggers.message_broker.gcp_pubsub.GCPPubSubConnection(*args, **kwargs)[source]

Bases: ConnectionCore

Connection class for GCP Pub/Sub

async close()[source]

Close the connection to the GCP Pub/Sub service

async connect()[source]

Connect to the GCP Pub/Sub service

error_class

alias of MessageBrokerError

required_conn_detail_fields: _t.Sequence[RequiredFieldBase] = (HasField('project'), HasField('topic'))
required_params_fields: _t.Sequence[RequiredFieldBase] = ()
async action_triggers.message_broker.queue.process_msg_broker_queue(msg_broker_queue: MessageBrokerQueue, payload: str | dict) None[source]

Process the action for the message broker queue.

Parameters:
  • msg_broker_queue – The message broker queue object to process.

  • payload – The payload to send to the message broker queue.

Returns:

None