class documentation

This generic base worker class is intended to be used as a building block through inheritance when you are creating a server worker.

Any methods that you need to change, you override it using polymorphism.

Method __init__ The class constructor.
Async Method notify Send msgType messages to the broker.
Async Method start Start the used resources in a controlled way.
Async Method stop Stop the used resources in a controlled way.
Instance Variable health_report Keeps track of health status when a request arrives.
Instance Variable ini Ini file configuration parameters.
Instance Variable mq_buffer Handle the storing and retrieving of messages when the external communication goes up and down.
Instance Variable mq_mgr Handle messages being sent to or received from RabbitMQ.
Instance Variable program Current program name, used by logging and RabbitMQ.
Instance Variable send_prefixes Current programs send message prefixes.
Instance Variable state_mgr Handles archiving and restoring state data that needs to survive a program restart.
Instance Variable states_to_archive Contains state(s) that needs archiving.
Instance Variable work_queue Used for transferring a message between interested parties.
Async Method _archive_active_state Archive active state(s).
Async Method _create_health_response Create the program health response and send it.
Async Method _handle_send_response Handle send response, good or bad.
Async Method _message_broker Broker messages between interested parties using a queue.
Async Method _process_health_request Process HealthRequest message.
Async Method _process_linkup_message Send pending offline messages and start subscription(s).
Async Method _process_status_response Process StatusResponse message..
Async Method _report_error Log error with context and failure data, then send it to RabbitMQ.
Async Method _restore_active_state Restore active state(s).
Async Method _send_message Send a msgType message to RabbitMQ.
Async Method _send_offline_messages Send pending offline messages to the RabbitMQ server.
def __init__(self, ini: AsyncIniFileParser, program: str, send_prefixes: list): (source)

The class constructor.

Parameters
ini:AsyncIniFileParserIni file configuration parameters.
program:strProgram name, used by logging and RabbitMQ.
send_prefixes:listCurrent programs send prefixes.
async def notify(self, msg: dict): (source)

Send msgType messages to the broker.

Parameters
msg:dictA msgType message.
async def start(self): (source)

Start the used resources in a controlled way.

async def stop(self): (source)

Stop the used resources in a controlled way.

health_report: dict = (source)

Keeps track of health status when a request arrives.

Ini file configuration parameters.

Handle the storing and retrieving of messages when the external communication goes up and down.

Handle messages being sent to or received from RabbitMQ.

Current program name, used by logging and RabbitMQ.

send_prefixes: list = (source)

Current programs send message prefixes.

Handles archiving and restoring state data that needs to survive a program restart.

states_to_archive: list = (source)

Contains state(s) that needs archiving.

Used for transferring a message between interested parties.

async def _archive_active_state(self, dump: bool = False): (source)

Archive active state(s).

When dump=True the archive filename uses a "dump_" prefix when saving the file.

Note that you are currently limited to what YAML handles when it comes to what data types you can archive.

Parameters
dump:boolDump status (default: False).
async def _create_health_response(self): (source)

Create the program health response and send it.

async def _handle_send_response(self, success: bool, msg: dict, topic: str): (source)

Handle send response, good or bad.

If the communication is down, the message is stored offline.

Parameters
success:boolMessage transmission status.
msg:dictA msgType message.
topic:strA subscription topic.
async def _message_broker(self): (source)

Broker messages between interested parties using a queue.

This is the minimum setup for the main message broker. If you need more routes, you have to override this method in your own code (and copy the content as a base for your work).

Handled received message types are:
  • Stop
  • LinkUp
  • HealthRequest
  • StatusResponse
Handled sent message types are:
  • ErrorMessage
  • HealthResponse
@abstractmethod
async def _process_health_request(self): (source)

Process HealthRequest message.

Abstract method that needs to be overridden.

async def _process_linkup_message(self): (source)

Send pending offline messages and start subscription(s).

async def _process_status_response(self, msg: dict): (source)

Process StatusResponse message..

Example msg data:

{"msgType": "StatusResponse", "resources": {...}}
async def _report_error(self, error: Exception, state: str = 'DUMP', extra: Optional[str] = None): (source)

Log error with context and failure data, then send it to RabbitMQ.

Parameters
error:ExceptionCurrent exception.
state:strError message state.
extra:Optional[str]Additional error text.
async def _restore_active_state(self): (source)

Restore active state(s).

Currently handled active state types:
  • dict
  • list

Note: this method will restore the saved state to the class attribute that was specified when it was archived.

Raises
ValueErrorWhen archived state name is not found.
async def _send_message(self, msg: dict): (source)

Send a msgType message to RabbitMQ.

The message topic is automatically created here with the help of the PREFIXES constant.

You only have to change msg_type when you are using more than the two standardized topic patterns.

An example of that might look like this:

# Send message prefixes.
PREFIXES = ['Health', 'Error', 'Email', 'Phoenix', 'File']

if msg['msgType'] == 'PhoenixRegisterFile':
    msg_type = 'PhoenixRegister.File'

elif msg['msgType'] in config.watchTopics:
    msg_type = f"{msg['msgType']}.{self.program}"

else:
    msg_type = msg['msgType']
Parameters
msg:dictA msgType message.
async def _send_offline_messages(self): (source)

Send pending offline messages to the RabbitMQ server.