From 9f29dc28632bb4e8ab2eded5387d6c27429c2b34 Mon Sep 17 00:00:00 2001 From: Jonas Winkler Date: Sun, 1 Nov 2020 23:07:54 +0100 Subject: [PATCH] updated consumer: now using watchdog --- Pipfile | 19 +- Pipfile.lock | 162 ++++++++++-------- src/documents/consumer.py | 44 ----- .../management/commands/document_consumer.py | 133 ++++---------- 4 files changed, 132 insertions(+), 226 deletions(-) diff --git a/Pipfile b/Pipfile index f9c23beb9..e8f862578 100644 --- a/Pipfile +++ b/Pipfile @@ -4,29 +4,28 @@ verify_ssl = true name = "pypi" [packages] -django = "*" +django = "~=3.1" pillow = "*" -dateparser = "*" +dateparser = "~=0.7" django-cors-headers = "*" -djangorestframework = "*" -inotify-simple = "*" +djangorestframework = "~=3.12" python-gnupg = "*" python-dotenv = "*" filemagic = "*" -pyocr = "*" +pyocr = "~=0.7" langdetect = "*" pdftotext = "*" -django-filter = "*" +django-filter = "~=2.4" python-dateutil = "*" psycopg2-binary = "*" -scikit-learn="*" -whoosh="*" +scikit-learn="~=0.23" +whoosh="~=2.7" gunicorn = "*" whitenoise = "*" fuzzywuzzy = "*" python-Levenshtein = "*" - -django-extensions = "*" +django-extensions = "" +watchdog = "*" [dev-packages] coveralls = "*" diff --git a/Pipfile.lock b/Pipfile.lock index b66f886e7..8b3bf705a 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "48343a032c1becd5f1a3ae46c2ade70c14c251591c5f9cb49dd2cab26b0e0bea" + "sha256": "2c1558fe7df0aee1ee20b095c2102f802470bf4a4ae09a7749ac487f8bfab8b6" }, "pipfile-spec": 6, "requires": {}, @@ -52,6 +52,7 @@ "sha256:dc663652ac9460fd06580a973576820430c6d428720e874ae46b041fa63e0efa" ], "index": "pypi", + "markers": "python_version >= '3.5'", "version": "==3.0.9" }, "django-filter": { @@ -93,13 +94,6 @@ "index": "pypi", "version": "==20.0.4" }, - "inotify-simple": { - "hashes": [ - "sha256:8440ffe49c4ae81a8df57c1ae1eb4b6bfa7acb830099bfb3e305b383005cc128" - ], - "index": "pypi", - "version": "==1.3.5" - }, "joblib": { "hashes": [ "sha256:698c311779f347cf6b7e6b8a39bb682277b8ee4aba8cf9507bc0cf4cd4737b72", @@ -118,35 +112,49 @@ }, "numpy": { "hashes": [ - "sha256:04c7d4ebc5ff93d9822075ddb1751ff392a4375e5885299445fcebf877f179d5", - "sha256:0bfd85053d1e9f60234f28f63d4a5147ada7f432943c113a11afcf3e65d9d4c8", - "sha256:0c66da1d202c52051625e55a249da35b31f65a81cb56e4c69af0dfb8fb0125bf", - "sha256:0d310730e1e793527065ad7dde736197b705d0e4c9999775f212b03c44a8484c", - "sha256:1669ec8e42f169ff715a904c9b2105b6640f3f2a4c4c2cb4920ae8b2785dac65", - "sha256:2117536e968abb7357d34d754e3733b0d7113d4c9f1d921f21a3d96dec5ff716", - "sha256:3733640466733441295b0d6d3dcbf8e1ffa7e897d4d82903169529fd3386919a", - "sha256:4339741994c775396e1a274dba3609c69ab0f16056c1077f18979bec2a2c2e6e", - "sha256:51ee93e1fac3fe08ef54ff1c7f329db64d8a9c5557e6c8e908be9497ac76374b", - "sha256:54045b198aebf41bf6bf4088012777c1d11703bf74461d70cd350c0af2182e45", - "sha256:58d66a6b3b55178a1f8a5fe98df26ace76260a70de694d99577ddeab7eaa9a9d", - "sha256:59f3d687faea7a4f7f93bd9665e5b102f32f3fa28514f15b126f099b7997203d", - "sha256:62139af94728d22350a571b7c82795b9d59be77fc162414ada6c8b6a10ef5d02", - "sha256:7118f0a9f2f617f921ec7d278d981244ba83c85eea197be7c5a4f84af80a9c3c", - "sha256:7c6646314291d8f5ea900a7ea9c4261f834b5b62159ba2abe3836f4fa6705526", - "sha256:967c92435f0b3ba37a4257c48b8715b76741410467e2bdb1097e8391fccfae15", - "sha256:9a3001248b9231ed73894c773142658bab914645261275f675d86c290c37f66d", - "sha256:aba1d5daf1144b956bc87ffb87966791f5e9f3e1f6fab3d7f581db1f5b598f7a", - "sha256:addaa551b298052c16885fc70408d3848d4e2e7352de4e7a1e13e691abc734c1", - "sha256:b594f76771bc7fc8a044c5ba303427ee67c17a09b36e1fa32bde82f5c419d17a", - "sha256:c35a01777f81e7333bcf276b605f39c872e28295441c265cd0c860f4b40148c1", - "sha256:cebd4f4e64cfe87f2039e4725781f6326a61f095bc77b3716502bed812b385a9", - "sha256:d526fa58ae4aead839161535d59ea9565863bb0b0bdb3cc63214613fb16aced4", - "sha256:d7ac33585e1f09e7345aa902c281bd777fdb792432d27fca857f39b70e5dd31c", - "sha256:e6ddbdc5113628f15de7e4911c02aed74a4ccff531842c583e5032f6e5a179bd", - "sha256:eb25c381d168daf351147713f49c626030dcff7a393d5caa62515d415a6071d8" + "sha256:0ee77786eebbfa37f2141fd106b549d37c89207a0d01d8852fde1c82e9bfc0e7", + "sha256:199bebc296bd8a5fc31c16f256ac873dd4d5b4928dfd50e6c4995570fc71a8f3", + "sha256:1a307bdd3dd444b1d0daa356b5f4c7de2e24d63bdc33ea13ff718b8ec4c6a268", + "sha256:1ea7e859f16e72ab81ef20aae69216cfea870676347510da9244805ff9670170", + "sha256:271139653e8b7a046d11a78c0d33bafbddd5c443a5b9119618d0652a4eb3a09f", + "sha256:35bf5316af8dc7c7db1ad45bec603e5fb28671beb98ebd1d65e8059efcfd3b72", + "sha256:463792a249a81b9eb2b63676347f996d3f0082c2666fd0604f4180d2e5445996", + "sha256:50d3513469acf5b2c0406e822d3f314d7ac5788c2b438c24e5dd54d5a81ef522", + "sha256:50f68ebc439821b826823a8da6caa79cd080dee2a6d5ab9f1163465a060495ed", + "sha256:51e8d2ae7c7e985c7bebf218e56f72fa93c900ad0c8a7d9fbbbf362f45710f69", + "sha256:522053b731e11329dd52d258ddf7de5288cae7418b55e4b7d32f0b7e31787e9d", + "sha256:5ea4401ada0d3988c263df85feb33818dc995abc85b8125f6ccb762009e7bc68", + "sha256:604d2e5a31482a3ad2c88206efd43d6fcf666ada1f3188fd779b4917e49b7a98", + "sha256:6ff88bcf1872b79002569c63fe26cd2cda614e573c553c4d5b814fb5eb3d2822", + "sha256:7197ee0a25629ed782c7bd01871ee40702ffeef35bc48004bc2fdcc71e29ba9d", + "sha256:741d95eb2b505bb7a99fbf4be05fa69f466e240c2b4f2d3ddead4f1b5f82a5a5", + "sha256:83af653bb92d1e248ccf5fdb05ccc934c14b936bcfe9b917dc180d3f00250ac6", + "sha256:8802d23e4895e0c65e418abe67cdf518aa5cbb976d97f42fd591f921d6dffad0", + "sha256:8edc4d687a74d0a5f8b9b26532e860f4f85f56c400b3a98899fc44acb5e27add", + "sha256:942d2cdcb362739908c26ce8dd88db6e139d3fa829dd7452dd9ff02cba6b58b2", + "sha256:9a0669787ba8c9d3bb5de5d9429208882fb47764aa79123af25c5edc4f5966b9", + "sha256:9d08d84bb4128abb9fbd9f073e5c69f70e5dab991a9c42e5b4081ea5b01b5db0", + "sha256:9f7f56b5e85b08774939622b7d45a5d00ff511466522c44fc0756ac7692c00f2", + "sha256:a2daea1cba83210c620e359de2861316f49cc7aea8e9a6979d6cb2ddab6dda8c", + "sha256:b9074d062d30c2779d8af587924f178a539edde5285d961d2dfbecbac9c4c931", + "sha256:c4aa79993f5d856765819a3651117520e41ac3f89c3fc1cb6dee11aa562df6da", + "sha256:d78294f1c20f366cde8a75167f822538a7252b6e8b9d6dbfb3bdab34e7c1929e", + "sha256:dfdc8b53aa9838b9d44ed785431ca47aa3efaa51d0d5dd9c412ab5247151a7c4", + "sha256:dffed17848e8b968d8d3692604e61881aa6ef1f8074c99e81647ac84f6038535", + "sha256:e080087148fd70469aade2abfeadee194357defd759f9b59b349c6192aba994c", + "sha256:e983cbabe10a8989333684c98fdc5dd2f28b236216981e0c26ed359aaa676772", + "sha256:ea6171d2d8d648dee717457d0f75db49ad8c2f13100680e284d7becf3dc311a6", + "sha256:eefc13863bf01583a85e8c1121a901cc7cb8f059b960c4eba30901e2e6aba95f", + "sha256:efd656893171bbf1331beca4ec9f2e74358fc732a2084f664fd149cc4b3441d2" ], "markers": "python_version >= '3.6'", - "version": "==1.19.2" + "version": "==1.19.3" + }, + "pathtools": { + "hashes": [ + "sha256:7c35c5421a39bb82e58018febd90e3b6e5db34c5443aaaf742b3f33d4655f1c0" + ], + "version": "==0.1.2" }, "pdftotext": { "hashes": [ @@ -245,11 +253,11 @@ }, "python-dotenv": { "hashes": [ - "sha256:8c10c99a1b25d9a68058a1ad6f90381a62ba68230ca93966882a4dbc3bc9c33d", - "sha256:c10863aee750ad720f4f43436565e4c1698798d763b63234fb5021b6c616e423" + "sha256:0c8d1b80d1a1e91717ea7d526178e3882732420b03f08afea0406db6402e220e", + "sha256:587825ed60b1711daea4832cf37524dfd404325b7db5e25ebe88c495c9f807a0" ], "index": "pypi", - "version": "==0.14.0" + "version": "==0.15.0" }, "python-gnupg": { "hashes": [ @@ -275,35 +283,35 @@ }, "regex": { "hashes": [ - "sha256:0cb23ed0e327c18fb7eac61ebbb3180ebafed5b9b86ca2e15438201e5903b5dd", - "sha256:1a065e7a6a1b4aa851a0efa1a2579eabc765246b8b3a5fd74000aaa3134b8b4e", - "sha256:1a511470db3aa97432ac8c1bf014fcc6c9fbfd0f4b1313024d342549cf86bcd6", - "sha256:1c447b0d108cddc69036b1b3910fac159f2b51fdeec7f13872e059b7bc932be1", - "sha256:2278453c6a76280b38855a263198961938108ea2333ee145c5168c36b8e2b376", - "sha256:240509721a663836b611fa13ca1843079fc52d0b91ef3f92d9bba8da12e768a0", - "sha256:4e21340c07090ddc8c16deebfd82eb9c9e1ec5e62f57bb86194a2595fd7b46e0", - "sha256:570e916a44a361d4e85f355aacd90e9113319c78ce3c2d098d2ddf9631b34505", - "sha256:59d5c6302d22c16d59611a9fd53556554010db1d47e9df5df37be05007bebe75", - "sha256:6a46eba253cedcbe8a6469f881f014f0a98819d99d341461630885139850e281", - "sha256:6f567df0601e9c7434958143aebea47a9c4b45434ea0ae0286a4ec19e9877169", - "sha256:781906e45ef1d10a0ed9ec8ab83a09b5e0d742de70e627b20d61ccb1b1d3964d", - "sha256:8469377a437dbc31e480993399fd1fd15fe26f382dc04c51c9cb73e42965cc06", - "sha256:8cd0d587aaac74194ad3e68029124c06245acaeddaae14cb45844e5c9bebeea4", - "sha256:97a023f97cddf00831ba04886d1596ef10f59b93df7f855856f037190936e868", - "sha256:a973d5a7a324e2a5230ad7c43f5e1383cac51ef4903bf274936a5634b724b531", - "sha256:af360e62a9790e0a96bc9ac845d87bfa0e4ee0ee68547ae8b5a9c1030517dbef", - "sha256:b706c70070eea03411b1761fff3a2675da28d042a1ab7d0863b3efe1faa125c9", - "sha256:bfd7a9fddd11d116a58b62ee6c502fd24cfe22a4792261f258f886aa41c2a899", - "sha256:c30d8766a055c22e39dd7e1a4f98f6266169f2de05db737efe509c2fb9c8a3c8", - "sha256:c53dc8ee3bb7b7e28ee9feb996a0c999137be6c1d3b02cb6b3c4cba4f9e5ed09", - "sha256:c95d514093b80e5309bdca5dd99e51bcf82c44043b57c34594d9d7556bd04d05", - "sha256:d43cf21df524283daa80ecad551c306b7f52881c8d0fe4e3e76a96b626b6d8d8", - "sha256:d62205f00f461fe8b24ade07499454a3b7adf3def1225e258b994e2215fd15c5", - "sha256:e289a857dca3b35d3615c3a6a438622e20d1bf0abcb82c57d866c8d0be3f44c4", - "sha256:e5f6aa56dda92472e9d6f7b1e6331f4e2d51a67caafff4d4c5121cadac03941e", - "sha256:f4b1c65ee86bfbf7d0c3dfd90592a9e3d6e9ecd36c367c884094c050d4c35d04" - ], - "version": "==2020.10.23" + "sha256:03855ee22980c3e4863dc84c42d6d2901133362db5daf4c36b710dd895d78f0a", + "sha256:06b52815d4ad38d6524666e0d50fe9173533c9cc145a5779b89733284e6f688f", + "sha256:11116d424734fe356d8777f89d625f0df783251ada95d6261b4c36ad27a394bb", + "sha256:119e0355dbdd4cf593b17f2fc5dbd4aec2b8899d0057e4957ba92f941f704bf5", + "sha256:1ec66700a10e3c75f1f92cbde36cca0d3aaee4c73dfa26699495a3a30b09093c", + "sha256:2dc522e25e57e88b4980d2bdd334825dbf6fa55f28a922fc3bfa60cc09e5ef53", + "sha256:3a5f08039eee9ea195a89e180c5762bfb55258bfb9abb61a20d3abee3b37fd12", + "sha256:49461446b783945597c4076aea3f49aee4b4ce922bd241e4fcf62a3e7c61794c", + "sha256:4afa350f162551cf402bfa3cd8302165c8e03e689c897d185f16a167328cc6dd", + "sha256:4b5a9bcb56cc146c3932c648603b24514447eafa6ce9295234767bf92f69b504", + "sha256:625116aca6c4b57c56ea3d70369cacc4d62fead4930f8329d242e4fe7a58ce4b", + "sha256:654c1635f2313d0843028487db2191530bca45af61ca85d0b16555c399625b0e", + "sha256:8092a5a06ad9a7a247f2a76ace121183dc4e1a84c259cf9c2ce3bbb69fac3582", + "sha256:832339223b9ce56b7b15168e691ae654d345ac1635eeb367ade9ecfe0e66bee0", + "sha256:8ca9dca965bd86ea3631b975d63b0693566d3cc347e55786d5514988b6f5b84c", + "sha256:a62162be05edf64f819925ea88d09d18b09bebf20971b363ce0c24e8b4aa14c0", + "sha256:b88fa3b8a3469f22b4f13d045d9bd3eda797aa4e406fde0a2644bc92bbdd4bdd", + "sha256:c13d311a4c4a8d671f5860317eb5f09591fbe8259676b86a85769423b544451e", + "sha256:c2c6c56ee97485a127555c9595c069201b5161de9d05495fbe2132b5ac104786", + "sha256:c3466a84fce42c2016113101018a9981804097bacbab029c2d5b4fcb224b89de", + "sha256:c8a2b7ccff330ae4c460aff36626f911f918555660cc28163417cb84ffb25789", + "sha256:cb905f3d2e290a8b8f1579d3984f2cfa7c3a29cc7cba608540ceeed18513f520", + "sha256:cfcf28ed4ce9ced47b9b9670a4f0d3d3c0e4d4779ad4dadb1ad468b097f808aa", + "sha256:dd3e6547ecf842a29cf25123fbf8d2461c53c8d37aa20d87ecee130c89b7079b", + "sha256:ea37320877d56a7f0a1e6a625d892cf963aa7f570013499f5b8d5ab8402b5625", + "sha256:f1fce1e4929157b2afeb4bb7069204d4370bab9f4fc03ca1fbec8bd601f8c87d", + "sha256:f43109822df2d3faac7aad79613f5f02e4eab0fc8ad7932d2e70e2a83bd49c26" + ], + "version": "==2020.10.28" }, "scikit-learn": { "hashes": [ @@ -383,6 +391,13 @@ ], "version": "==2.1" }, + "watchdog": { + "hashes": [ + "sha256:4214e1379d128b0588021880ccaf40317ee156d4603ac388b9adcf29165e0c04" + ], + "index": "pypi", + "version": "==0.10.3" + }, "whitenoise": { "hashes": [ "sha256:05ce0be39ad85740a78750c86a93485c40f08ad8c62a6006de0233765996e5c7", @@ -674,11 +689,11 @@ }, "pytest": { "hashes": [ - "sha256:7a8190790c17d79a11f847fba0b004ee9a8122582ebff4729a082c109e81a4c9", - "sha256:8f593023c1a0f916110285b6efd7f99db07d59546e3d8c36fc60e2ab05d3be92" + "sha256:4288fed0d9153d9646bfcdf0c0428197dba1ecb27a33bb6e031d002fa88653fe", + "sha256:c0a7e94a8cdbc5422a51ccdad8e6f1024795939cc89159a0ae7f0b316ad3823e" ], "index": "pypi", - "version": "==6.1.1" + "version": "==6.1.2" }, "pytest-cov": { "hashes": [ @@ -835,10 +850,11 @@ }, "toml": { "hashes": [ - "sha256:926b612be1e5ce0634a2ca03470f95169cf16f939018233a670519cb4ac58b0f", - "sha256:bda89d5935c2eac546d648028b9901107a595863cb36bae0c73ac804a9b4ce88" + "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b", + "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f" ], - "version": "==0.10.1" + "markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'", + "version": "==0.10.2" }, "tox": { "hashes": [ diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 401ef0ff0..1229af680 100755 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -4,10 +4,8 @@ import logging import os import re -import time import uuid -from operator import itemgetter from django.conf import settings from django.utils import timezone from paperless.db import GnuPG @@ -36,17 +34,12 @@ class Consumer: 5. Delete the document and image(s) """ - # Files are considered ready for consumption if they have been unmodified - # for this duration - FILES_MIN_UNMODIFIED_DURATION = 0.5 - def __init__(self, consume=settings.CONSUMPTION_DIR, scratch=settings.SCRATCH_DIR): self.logger = logging.getLogger(__name__) self.logging_group = None - self._ignore = [] self.consume = consume self.scratch = scratch @@ -83,43 +76,6 @@ def log(self, level, message): "group": self.logging_group }) - def consume_new_files(self): - """ - Find non-ignored files in consumption dir and consume them if they have - been unmodified for FILES_MIN_UNMODIFIED_DURATION. - """ - ignored_files = [] - files = [] - for entry in os.scandir(self.consume): - if entry.is_file(): - file = (entry.path, entry.stat().st_mtime) - if file in self._ignore: - ignored_files.append(file) - else: - files.append(file) - else: - self.logger.warning( - "Skipping %s as it is not a file", - entry.path - ) - - if not files: - return - - # Set _ignore to only include files that still exist. - # This keeps it from growing indefinitely. - self._ignore[:] = ignored_files - - files_old_to_new = sorted(files, key=itemgetter(1)) - - time.sleep(self.FILES_MIN_UNMODIFIED_DURATION) - - for file, mtime in files_old_to_new: - if mtime == os.path.getmtime(file): - # File has not been modified and can be consumed - if not self.try_consume_file(file): - self._ignore.append((file, mtime)) - @transaction.atomic def try_consume_file(self, file): """ diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index 4a3d24bf5..93ad6947c 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -1,12 +1,13 @@ import logging import os -import time from django.conf import settings -from django.core.management.base import BaseCommand, CommandError +from django.core.management.base import BaseCommand -from ...consumer import Consumer, ConsumerError -from ...mail import MailFetcher, MailFetcherError +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler + +from documents.consumer import Consumer try: from inotify_simple import INotify, flags @@ -14,6 +15,15 @@ INotify = flags = None +class Handler(FileSystemEventHandler): + + def __init__(self, consumer): + self.consumer = consumer + + def on_created(self, event): + self.consumer.try_consume_file(event.src_path) + + class Command(BaseCommand): """ On every iteration of an infinite loop, consume what we can from the @@ -29,6 +39,8 @@ def __init__(self, *args, **kwargs): self.mail_fetcher = None self.first_iteration = True + self.consumer = Consumer() + BaseCommand.__init__(self, *args, **kwargs) def add_arguments(self, parser): @@ -38,111 +50,34 @@ def add_arguments(self, parser): nargs="?", help="The consumption directory." ) - parser.add_argument( - "--loop-time", - default=settings.CONSUMER_LOOP_TIME, - type=int, - help="Wait time between each loop (in seconds)." - ) - parser.add_argument( - "--mail-delta", - default=10, - type=int, - help="Wait time between each mail fetch (in minutes)." - ) - parser.add_argument( - "--oneshot", - action="store_true", - help="Run only once." - ) - parser.add_argument( - "--no-inotify", - action="store_true", - help="Don't use inotify, even if it's available.", - default=False - ) def handle(self, *args, **options): self.verbosity = options["verbosity"] directory = options["directory"] - loop_time = options["loop_time"] - mail_delta = options["mail_delta"] * 60 - use_inotify = INotify is not None and options["no_inotify"] is False - - try: - self.file_consumer = Consumer(consume=directory) - self.mail_fetcher = MailFetcher(consume=directory) - except (ConsumerError, MailFetcherError) as e: - raise CommandError(e) for d in (settings.ORIGINALS_DIR, settings.THUMBNAIL_DIR): os.makedirs(d, exist_ok=True) logging.getLogger(__name__).info( - "Starting document consumer at {}{}".format( - directory, - " with inotify" if use_inotify else "" + "Starting document consumer at {}".format( + directory ) ) - if options["oneshot"]: - self.loop_step(mail_delta) - else: - try: - if use_inotify: - self.loop_inotify(mail_delta) - else: - self.loop(loop_time, mail_delta) - except KeyboardInterrupt: - print("Exiting") - - def loop(self, loop_time, mail_delta): - while True: - start_time = time.time() - if self.verbosity > 1: - print(".", int(start_time)) - self.loop_step(mail_delta, start_time) - # Sleep until the start of the next loop step - time.sleep(max(0, start_time + loop_time - time.time())) - - def loop_step(self, mail_delta, time_now=None): - - # Occasionally fetch mail and store it to be consumed on the next loop - # We fetch email when we first start up so that it is not necessary to - # wait for 10 minutes after making changes to the config file. - next_mail_time = self.mail_fetcher.last_checked + mail_delta - if self.first_iteration or time_now > next_mail_time: - self.first_iteration = False - self.mail_fetcher.pull() - - self.file_consumer.consume_new_files() - - def loop_inotify(self, mail_delta): - directory = self.file_consumer.consume - inotify = INotify() - inotify.add_watch(directory, flags.CLOSE_WRITE | flags.MOVED_TO) - - # Run initial mail fetch and consume all currently existing documents - self.loop_step(mail_delta) - next_mail_time = self.mail_fetcher.last_checked + mail_delta - - while True: - # Consume documents until next_mail_time - while True: - delta = next_mail_time - time.time() - if delta > 0: - for event in inotify.read(timeout=delta): - file = os.path.join(directory, event.name) - if os.path.isfile(file): - self.file_consumer.try_consume_file(file) - else: - self.logger.warning( - "Skipping %s as it is not a file", - file - ) - else: - break - - self.mail_fetcher.pull() - next_mail_time = self.mail_fetcher.last_checked + mail_delta + # Consume all files as this is not done initially by the watchdog + for entry in os.scandir(directory): + if entry.is_file(): + self.consumer.try_consume_file(entry.path) + + # Start the watchdog. Woof! + observer = Observer() + event_handler = Handler(self.consumer) + observer.schedule(event_handler, directory, recursive=True) + observer.start() + try: + while observer.is_alive(): + observer.join(1) + except KeyboardInterrupt: + observer.stop() + observer.join()