class documentation

This class implements RabbitMQ Publish and Subscribe async handling.

The RabbitMQ queue mechanism is used so that we can take advantage of good horizontal message scaling when needed.

Note that you only need to specify the rabbit_url parameter when you are sending messages, params and resp_queue are used for subscriptions.

Functionality is implemented for:
  • Sending work queue messages (on default exchange).
  • Sending topic messages (on 'topic_routing' exchange).
  • Consuming work queue messages using a permanent queue (on default exchange).
  • Consuming topic messages using a temporary queue (on 'topic_routing' exchange).
  • Consuming topic messages using a permanent queue (on 'topic_routing' exchange).
Sends the following messages:
  • {"msgType": "LinkUp", "host": <RabbitMQ-server>}
  • {"msgType": "LinkDown", "host": "<RabbitMQ-server>", "why": "<exception text>"}
  • {"msgType": <any>, ...}
Method __init__ The class initializer.
Async Method publish_message Publish a message on specified work, or topic queue.
Async Method start Start the used resources in a controlled way.
Async Method start_permanent_work_subscription Start a work queue subscription that survives when the subscription ends.
Async Method start_topic_subscription Start topic queue subscription using subscription_type context.
Async Method status_of Send RabbitMQ connection status on the response queue.
Async Method stop Stop the used resources in a controlled way.
Instance Variable channel RabbitMQ's connection channel object instance.
Instance Variable conf RabbitParams object instance for RabbitMQ config parameters.
Instance Variable connection RabbitMQ's connection object instance.
Instance Variable host RabbitMQ servername.
Instance Variable link_down_reported LinkDown reported status.
Instance Variable rabbit_url RabbitMQ's connection URL.
Instance Variable resp_queue Destination queue for consumed messages.
Async Method _consume_incoming_message Consume an incoming subscription message from RabbitMQ.
Async Method _initiate_communication Establish communication with RabbitMQ (connection + channel).
Method _on_connection_closed Handle unexpectedly closed connection events.
Async Method _on_connection_reconnected Send a LinkUp message when the connection is reconnected.
def __init__(self, rabbit_url: str, params: Optional[RabbitParams] = None, resp_queue: Optional[asyncio.Queue] = None): (source)

The class initializer.

Parameters
rabbit_url:strRabbitMQ's connection URL.
params:Optional[RabbitParams]Configuration parameters.
resp_queue:Optional[asyncio.Queue]Destination queue for consumed messages.
async def publish_message(self, message: dict, topic: Optional[str] = None, work_queue_name: Optional[str] = None) -> bool: (source)

Publish a message on specified work, or topic queue.

Send a LinkDown message when communication fails.

Parameters
message:dictMessage to be published.
topic:Optional[str]Used for topic queue messages.
work_queue_name:Optional[str]Used for work queue messages.
Returns
boolPublisher delivery confirmation status.
Raises
AssertErrorWhen a topic and work_queue_name are both empty.
async def start(self): (source)

Start the used resources in a controlled way.

async def start_permanent_work_subscription(self): (source)

Start a work queue subscription that survives when the subscription ends.

Send a LinkDown message when communication fails.

Work queue name is defined in self.conf.program during the class initialization.

async def start_topic_subscription(self, routing_keys: list, permanent: bool = True): (source)

Start topic queue subscription using subscription_type context.

When permanent=True, a permanent subscription is activated, otherwise a temporary subscription is activated.

Send a LinkDown message when communication fails.

Parameters
routing_keys:listSubscription topic(s).
permanent:boolIndicate a wanted subscription type (default is True).
Raises
AssertErrorWhen routing_keys is not a list.
async def status_of(self): (source)

Send RabbitMQ connection status on the response queue.

async def stop(self): (source)

Stop the used resources in a controlled way.

channel: aio_pika.abc.AbstractChannel = (source)

RabbitMQ's connection channel object instance.

RabbitParams object instance for RabbitMQ config parameters.

connection: aio_pika.abc.AbstractRobustConnection = (source)

RabbitMQ's connection object instance.

RabbitMQ servername.

link_down_reported: bool = (source)

LinkDown reported status.

rabbit_url: str = (source)

RabbitMQ's connection URL.

Destination queue for consumed messages.

async def _consume_incoming_message(self, message: AbstractIncomingMessage): (source)

Consume an incoming subscription message from RabbitMQ.

Parameters
message:AbstractIncomingMessageA message that fits the subscription criteria.
async def _initiate_communication(self): (source)

Establish communication with RabbitMQ (connection + channel).

Send a LinkUp message when communication is established.

def _on_connection_closed(self, _: Any, error: AMQPConnectionError): (source)

Handle unexpectedly closed connection events.

Parameters
_:AnyUndocumented
error:AMQPConnectionErrorAMQP connection error instance.
async def _on_connection_reconnected(self, connection: AbstractRobustConnection): (source)

Send a LinkUp message when the connection is reconnected.

Parameters
connection:AbstractRobustConnectionRabbitMQ's robust connection instance.