class AsyncBaseWorker: (source)
Constructor: AsyncBaseWorker(ini, program, send_prefixes)
This generic base worker class is intended to be used as a building block through inheritance when you are creating a server worker.
Any methods that you need to change, you override it using polymorphism.
| Method | __init__ |
The class constructor. |
| Async Method | notify |
Send msgType messages to the broker. |
| Async Method | start |
Start the used resources in a controlled way. |
| Async Method | stop |
Stop the used resources in a controlled way. |
| Instance Variable | health |
Keeps track of health status when a request arrives. |
| Instance Variable | ini |
Ini file configuration parameters. |
| Instance Variable | mq |
Handle the storing and retrieving of messages when the external communication goes up and down. |
| Instance Variable | mq |
Handle messages being sent to or received from RabbitMQ. |
| Instance Variable | program |
Current program name, used by logging and RabbitMQ. |
| Instance Variable | send |
Current programs send message prefixes. |
| Instance Variable | state |
Handles archiving and restoring state data that needs to survive a program restart. |
| Instance Variable | states |
Contains state(s) that needs archiving. |
| Instance Variable | work |
Used for transferring a message between interested parties. |
| Async Method | _archive |
Archive active state(s). |
| Async Method | _create |
Create the program health response and send it. |
| Async Method | _handle |
Handle send response, good or bad. |
| Async Method | _message |
Broker messages between interested parties using a queue. |
| Async Method | _process |
Process HealthRequest message. |
| Async Method | _process |
Send pending offline messages and start subscription(s). |
| Async Method | _process |
Process StatusResponse message.. |
| Async Method | _report |
Log error with context and failure data, then send it to RabbitMQ. |
| Async Method | _restore |
Restore active state(s). |
| Async Method | _send |
Send a msgType message to RabbitMQ. |
| Async Method | _send |
Send pending offline messages to the RabbitMQ server. |
Archive active state(s).
When dump=True the archive filename uses a "dump_" prefix when saving the file.
Note that you are currently limited to what YAML handles when it comes to what data types you can archive.
| Parameters | |
dump:bool | Dump status (default: False). |
Broker messages between interested parties using a queue.
This is the minimum setup for the main message broker. If you need more routes, you have to override this method in your own code (and copy the content as a base for your work).
- Handled received message types are:
- Stop
- LinkUp
- HealthRequest
- StatusResponse
- Handled sent message types are:
- ErrorMessage
- HealthResponse
Process StatusResponse message..
Example msg data:
{"msgType": "StatusResponse", "resources": {...}}Restore active state(s).
- Currently handled active state types:
- dict
- list
Note: this method will restore the saved state to the class attribute that was specified when it was archived.
| Raises | |
ValueError | When archived state name is not found. |
Send a msgType message to RabbitMQ.
The message topic is automatically created here with the help of the PREFIXES constant.
You only have to change msg_type when you are using more than the two standardized topic patterns.
An example of that might look like this:
# Send message prefixes. PREFIXES = ['Health', 'Error', 'Email', 'Phoenix', 'File'] if msg['msgType'] == 'PhoenixRegisterFile': msg_type = 'PhoenixRegister.File' elif msg['msgType'] in config.watchTopics: msg_type = f"{msg['msgType']}.{self.program}" else: msg_type = msg['msgType']
| Parameters | |
msg:dict | A msgType message. |