class AsyncExampleWorker(AsyncBaseWorker): (source)
Constructor: AsyncExampleWorker(ini, program)
This worker class demonstrates how to use resources like RabbitMQ and Watchdog asynchronously. It also demonstrates the usage of internal message brokers to control the work by using JSON messages as events. It also demonstrates the usage of scheduling reoccurring events.
- The following environment variable dependencies exist:
- ENVIRONMENT
- HOSTNAME (on Linux servers only - set by OS)
- COMPUTERNAME (on Windows servers only - set by OS)
- The following secret dependencies exist:
- mongo_pwd
- The following jobs are scheduled:
- _schedule_dump_check(): runs every five seconds.
- _schedule_state_pruning(): runs at midnight every day.
RabbitMQ is used for receiving and sending messages to external services. Messages that are to be sent to the RabbitMQ will be stored offline if the communication goes down and will be re-sent when the communication is re-established.
- Subscribe temporary for the following RabbitMQ message topic(s):
- Health.Request
- Subscribe permanently for the following RabbitMQ message topic(s):
- File.ReportRequest.<SERVER>
- Sends RabbitMQ messages with the following topic(s):
- File.Report.<server>
- File.Detected.<server>
- Error.Message.AsyncExampleProgram.<server>
- Health.Response.AsyncExampleProgram.<server>
| Method | __init__ |
The class constructor. |
| Async Method | start |
Start the used resources in a controlled way. |
| Async Method | stop |
Stop the used resources in a controlled way. |
| Instance Variable | detected |
Keeping track of received files during the day. |
| Instance Variable | health |
Keeping track of health status when a request arrives. |
| Instance Variable | scheduler |
Handles dump checks and pruning of received_files state. |
| Instance Variable | searcher |
Reports new files detected in specified directories. |
| Async Method | _message |
Broker messages between interested parties using a queue. |
| Async Method | _process |
Trigger a new workflow by sending a FileDetected message. |
| Async Method | _process |
Process HealthRequest message.. |
| Async Method | _process |
Send pending offline messages and start subscription(s). |
| Async Method | _process |
Update affected resources with the changed Ini parameters. |
| Async Method | _process |
Process FileReportRequest message. |
| Async Method | _prune |
Make a daily report of received_files that is older than today and remove the reported files from the state. |
| Async Method | _schedule |
Handle DUMP request in a separate task, if needed. |
| Async Method | _schedule |
Start received_files pruning in a separate task, if needed. |
The class constructor.
| Parameters | |
ini:AsyncExampleProgIni | Ini file configuration parameters. |
program:str | Program name, used by logging and RabbitMQ. |
Start the used resources in a controlled way.
You need the statement, appending the state(s) that needs to survive a program restart to be placed first in the method (the self.states_to_archive class attribute is defined in the inherited base class).
It could look something like this:
self.states_to_archive.append(self.detected_files)
Broker messages between interested parties using a queue.
- Handled received message types are:
- Stop
- LinkUp
- FileFound
- HealthRequest
- StatusResponse
- ChangedIniParams
- FileReportRequest
- Handled sent message types are:
- FileReport
- FileDetected
- ErrorMessage
- HealthResponse
Trigger a new workflow by sending a FileDetected message.
If a duplicate file is detected, it's considered an error, so it's logged and the file is moved to the error_path directory.
When it's a unique file, it's added to the detection state, a FileDetected message is sent, and the file is moved to the out_path directory.
Example msg data:
{"msgType": "FileFound",
"file": "D:\prod\kundin\cust3\DDDD.231008.txt"}| Parameters | |
msg:dict | A FileFound message. |
Make a daily report of received_files that is older than today and remove the reported files from the state.
This method is normally called at midnight since the detected files state should only contain files for the current day.
When a program is started, we also have to make sure that all files older than today are reported and pruned from the current-day file state.