class documentation

class AsyncExampleWorker(AsyncBaseWorker): (source)

Constructor: AsyncExampleWorker(ini, program)

View In Hierarchy

This worker class demonstrates how to use resources like RabbitMQ and Watchdog asynchronously. It also demonstrates the usage of internal message brokers to control the work by using JSON messages as events. It also demonstrates the usage of scheduling reoccurring events.

The following environment variable dependencies exist:
  • ENVIRONMENT
  • HOSTNAME (on Linux servers only - set by OS)
  • COMPUTERNAME (on Windows servers only - set by OS)
The following secret dependencies exist:
  • mongo_pwd
The following jobs are scheduled:
  • _schedule_dump_check(): runs every five seconds.
  • _schedule_state_pruning(): runs at midnight every day.

RabbitMQ is used for receiving and sending messages to external services. Messages that are to be sent to the RabbitMQ will be stored offline if the communication goes down and will be re-sent when the communication is re-established.

Subscribe temporary for the following RabbitMQ message topic(s):
  • Health.Request
Subscribe permanently for the following RabbitMQ message topic(s):
  • File.ReportRequest.<SERVER>
Sends RabbitMQ messages with the following topic(s):
  • File.Report.<server>
  • File.Detected.<server>
  • Error.Message.AsyncExampleProgram.<server>
  • Health.Response.AsyncExampleProgram.<server>
Method __init__ The class constructor.
Async Method start Start the used resources in a controlled way.
Async Method stop Stop the used resources in a controlled way.
Instance Variable detected_files Keeping track of received files during the day.
Instance Variable health_report Keeping track of health status when a request arrives.
Instance Variable scheduler Handles dump checks and pruning of received_files state.
Instance Variable searcher Reports new files detected in specified directories.
Async Method _message_broker Broker messages between interested parties using a queue.
Async Method _process_file_found Trigger a new workflow by sending a FileDetected message.
Async Method _process_health_request Process HealthRequest message..
Async Method _process_linkup_message Send pending offline messages and start subscription(s).
Async Method _process_new_params Update affected resources with the changed Ini parameters.
Async Method _process_report_request Process FileReportRequest message.
Async Method _prune_state_content Make a daily report of received_files that is older than today and remove the reported files from the state.
Async Method _schedule_dump_check Handle DUMP request in a separate task, if needed.
Async Method _schedule_state_pruning Start received_files pruning in a separate task, if needed.
def __init__(self, ini: AsyncExampleProgIni, program: str): (source)

The class constructor.

Parameters
ini:AsyncExampleProgIniIni file configuration parameters.
program:strProgram name, used by logging and RabbitMQ.
async def start(self): (source)

Start the used resources in a controlled way.

You need the statement, appending the state(s) that needs to survive a program restart to be placed first in the method (the self.states_to_archive class attribute is defined in the inherited base class).

It could look something like this:

self.states_to_archive.append(self.detected_files)
async def stop(self): (source)

Stop the used resources in a controlled way.

Note that this method can't be async

detected_files: dict = (source)

Keeping track of received files during the day.

health_report: dict = (source)

Keeping track of health status when a request arrives.

scheduler: apscheduler.schedulers.asyncio.AsyncIOScheduler = (source)

Handles dump checks and pruning of received_files state.

Reports new files detected in specified directories.

async def _message_broker(self): (source)

Broker messages between interested parties using a queue.

Handled received message types are:
  • Stop
  • LinkUp
  • FileFound
  • HealthRequest
  • StatusResponse
  • ChangedIniParams
  • FileReportRequest
Handled sent message types are:
  • FileReport
  • FileDetected
  • ErrorMessage
  • HealthResponse
async def _process_file_found(self, msg: dict): (source)

Trigger a new workflow by sending a FileDetected message.

If a duplicate file is detected, it's considered an error, so it's logged and the file is moved to the error_path directory.

When it's a unique file, it's added to the detection state, a FileDetected message is sent, and the file is moved to the out_path directory.

Example msg data:

{"msgType": "FileFound",
 "file": "D:\prod\kundin\cust3\DDDD.231008.txt"}
Parameters
msg:dictA FileFound message.
async def _process_health_request(self): (source)

Process HealthRequest message..

Example msg data:

{"msgType": "HealthRequest"}
async def _process_linkup_message(self): (source)

Send pending offline messages and start subscription(s).

async def _process_new_params(self): (source)

Update affected resources with the changed Ini parameters.

async def _process_report_request(self): (source)

Process FileReportRequest message.

Handled sent message types are:
  • FileReport
async def _prune_state_content(self): (source)

Make a daily report of received_files that is older than today and remove the reported files from the state.

This method is normally called at midnight since the detected files state should only contain files for the current day.

When a program is started, we also have to make sure that all files older than today are reported and pruned from the current-day file state.

async def _schedule_dump_check(self): (source)

Handle DUMP request in a separate task, if needed.

async def _schedule_state_pruning(self): (source)

Start received_files pruning in a separate task, if needed.