class documentation

This class detects files in one, or several specified directories.

Directories that are to be excluded from the search can be specified. A specific file pattern can be supplied, default is all files. File patterns that should be ignored can be supplied. A recursive search can also be activated.

The file is only reported when it's available (not when it's initially detected since it might be in a copying or moving phase).

Due to antivirus programs are scanning new files as well, there's sometimes a conflict of interest. The consequences when this happens are that when the file is detected, it's not available, and therefore it will be ignored. Another related issue is that files might already exist in the monitored path(s) when the program starts, and they will not be detected either. This problem is handled when you let the find_undetected_files input parameter use its default value. When True A check runs every two minutes that touches files that weren't detected earlier.

The workflow is as follows:
  1. Start the file detection by calling method start().
  2. Receive detection messages on supplied out_queue.
  3. Stop the detection by calling method stop().

Example data:

{"msgType": "FileFound",
 "file": "D:\prod\kundin\cust3\DDDD.231008.txt"}
Method __init__ The class constructor.
Async Method notify Send msgType messages to the broker.
Async Method start Start the used resources in a controlled way.
Async Method stop Stop the used resources in a controlled way.
Instance Variable file_mgr Put a JSON message on the report queue when a file is detected and available.
Instance Variable find_undetected_files Touch undetected files in search path(s) (default is True).
Instance Variable lock A locking mechanism that protects the suppressed cache object.
Instance Variable observer Observer thread that schedules watching directories and dispatches calls to event handlers.
Instance Variable out_queue File detection report queue.
Instance Variable recursive Search path(s) recursively (default is False).
Instance Variable root_paths List of path(s) to search.
Instance Variable suppress Suppress reporting of already detected files.
Instance Variable watchers Keep track of which paths(s) that is observed.
Instance Variable work_queue Internal message broker queue.
Async Method _handle_report_suppression Handle multiple detections of "small" files (only report once).
Async Method _handle_undetected_files Touch undetected files to make them detectable again.
Async Method _message_broker Broker messages between interested parties using a queue.
Async Method _process_file_found Process received FileFound message.
Async Method _process_new_search_paths Update the list of paths used in the search.
Async Method _process_status_request Process health status request.
def __init__(self, paths: list, out_queue: Queue, recursive: bool = False, find_undetected_files: bool = True, case_sensitive: bool = False, excluded_paths: Optional[list] = None, patterns: Optional[list] = None, ignored_patterns: Optional[list] = None): (source)

The class constructor.

Parameters
paths:listList of path(s) to search.
out_queue:QueueSearch response queue.
recursive:boolSearch path(s) recursively (default is False).
find_undetected_files:boolTouch undetected files in search path(s) (default is True).
case_sensitive:boolUse case-sensitive file matching (default is False).
excluded_paths:Optional[list]List of path(s) to exclude in search.
patterns:Optional[list]A list of file search patterns to use (implicitly ['*'] is used if None is specified).
ignored_patterns:Optional[list]A list of file search patterns to ignore.
Raises
RuntimeErrorWhen an invalid path is specified.
AssertionErrorWhen path parameter is not a list.
AssertionErrorWhen pattern parameter is not None or not a list.
AssertionErrorWhen excluded_paths parameter is not None or not a list.
AssertionErrorWhen ignored_patterns parameter is not None or not a list.
async def notify(self, msg: dict): (source)

Send msgType messages to the broker.

Parameters
msg:dictA msgType message.
async def start(self): (source)

Start the used resources in a controlled way.

async def stop(self): (source)

Stop the used resources in a controlled way.

Put a JSON message on the report queue when a file is detected and available.

find_undetected_files: bool = (source)

Touch undetected files in search path(s) (default is True).

A locking mechanism that protects the suppressed cache object.

observer: watchdog.observers.Observer = (source)

Observer thread that schedules watching directories and dispatches calls to event handlers.

File detection report queue.

recursive: bool = (source)

Search path(s) recursively (default is False).

root_paths: set = (source)

List of path(s) to search.

suppress: dict = (source)

Suppress reporting of already detected files.

watchers: dict = (source)

Keep track of which paths(s) that is observed.

Internal message broker queue.

async def _handle_report_suppression(self): (source)

Handle multiple detections of "small" files (only report once).

The duplicate detections only occur for "small" files, mostly within a few milliseconds, so a 2-second prune timeout should be ok...

This method is run by the scheduler every 10 seconds.

async def _handle_undetected_files(self): (source)

Touch undetected files to make them detectable again.

async def _message_broker(self): (source)

Broker messages between interested parties using a queue.

Handled message types are:
  • Stop
  • FileFound
  • StatusRequest
  • UpdateSearchPaths
async def _process_file_found(self, msg: dict): (source)

Process received FileFound message.

Parameters
msg:dictA FileFound msgType message.
async def _process_new_search_paths(self, msg: dict): (source)

Update the list of paths used in the search.

Example data:

{'msgType': 'UpdateSearchPaths', 'data': [<root Paths>]}
Parameters
msg:dictA UpdateSearchPaths message.
async def _process_status_request(self): (source)

Process health status request.