Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pfBlockerNG: use asynchronous I/O #1307

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@
pfb['mod_sqlite3_e'] = e
pass

try:
from concurrent.futures import ThreadPoolExecutor
pfb['async_io'] = True
pfb['async_io_executor'] = ThreadPoolExecutor(max_workers=1)
except Exception as e:
pfb['async_io'] = False
pfb['async_io_executor_e'] = e


def init_standard(id, env):
global pfb, rcodeDB, dataDB, zoneDB, regexDB, hstsDB, whiteDB, excludeDB, excludeAAAADB, excludeSS, dnsblDB, noAAAADB, gpListDB, safeSearchDB, feedGroupIndexDB, maxmindReader
Expand Down Expand Up @@ -100,12 +108,23 @@ class log_stderr(object):
def __init__(self, logger):
self.logger = logger
self.linebuf = ''
if pfb['async_io']:
self.executor = pfb['async_io_executor']
else:
self.executor = None

def write(self, msg):
def _write(self, msg):
if msg != pfb['p_err']:
self.logger.log(logging.ERROR, msg.rstrip())
pfb['p_err'] = msg

def write(self, msg):
if self.executor is not None:
self.executor.submit(self._write, msg)
else:
self._write(msg)


# Create python error logfile
logfile = '/var/log/pfblockerng/py_error.log'

Expand Down Expand Up @@ -146,6 +165,9 @@ def write(self, msg):
if not pfb['mod_sqlite3']:
sys.stderr.write("[pfBlockerNG]: Failed to load python module 'sqlite3': {}" .format(pfb['mod_sqlite3_e']))

if not pfb['async_io']:
sys.stderr.write("[pfBlockerNG]: Failed to create I/O Thread Pool Executor: {}" .format(pfb['async_io_executor_e']))

# Initialize default settings
pfb['dnsbl_ipv4'] = ''
pfb['dnsbl_ipv6'] = ''
Expand Down Expand Up @@ -745,6 +767,13 @@ def write_sqlite(db, groupname, update):
return True


def write_sqlite_async(db, groupname, update):
if pfb['async_io']:
pfb['async_io_executor'].submit(write_sqlite, db, groupname, update)
else:
write_sqlite(db, groupname, update)


def get_details_dnsbl(m_type, qinfo, qstate, rep, kwargs):
global pfb, rcodeDB, dnsblDB, noAAAADB, maxmindReader

Expand All @@ -757,7 +786,7 @@ def get_details_dnsbl(m_type, qinfo, qstate, rep, kwargs):

# Increment totalqueries counter
if pfb['sqlite3_resolver_con']:
write_sqlite(1, '', True)
write_sqlite_async(1, '', True)

# Determine if event is a 'reply' or DNSBL block
isDNSBL = dnsblDB.get(q_name)
Expand All @@ -769,7 +798,7 @@ def get_details_dnsbl(m_type, qinfo, qstate, rep, kwargs):

# Increment dnsblgroup counter
if pfb['sqlite3_dnsbl_con'] and isDNSBL['group'] != '':
write_sqlite(2, isDNSBL['group'], True)
write_sqlite_async(2, isDNSBL['group'], True)

dupEntry = '+'
lastEvent = dnsblDB.get('last-event')
Expand Down Expand Up @@ -800,8 +829,13 @@ def get_details_dnsbl(m_type, qinfo, qstate, rep, kwargs):
break

csv_line = ','.join('{}'.format(v) for v in ('DNSBL-python', timestamp, q_name, q_ip, isDNSBL['p_type'], isDNSBL['b_type'], isDNSBL['group'], isDNSBL['b_eval'], isDNSBL['feed'], dupEntry))
log_entry(csv_line, '/var/log/pfblockerng/dnsbl.log')
log_entry(csv_line, '/var/log/pfblockerng/unified.log')
if pfb['async_io']:
executor = pfb['async_io_executor']
executor.submit(log_entry, csv_line, '/var/log/pfblockerng/dnsbl.log')
executor.submit(log_entry, csv_line, '/var/log/pfblockerng/unified.log')
else:
log_entry(csv_line, '/var/log/pfblockerng/dnsbl.log')
log_entry(csv_line, '/var/log/pfblockerng/unified.log')

return True

Expand Down Expand Up @@ -855,7 +889,7 @@ def get_details_reply(m_type, qinfo, qstate, rep, kwargs):

# Increment totalqueries counter (Don't include the Resolver DNS requests)
if pfb['sqlite3_resolver_con'] and q_ip != '127.0.0.1':
write_sqlite(1, '', True)
write_sqlite_async(1, '', True)

# Do not log Replies, if disabled
if not pfb['python_reply']:
Expand Down Expand Up @@ -997,8 +1031,13 @@ def get_details_reply(m_type, qinfo, qstate, rep, kwargs):
break

csv_line = ','.join('{}'.format(v) for v in ('DNS-reply', timestamp, m_type, o_type, q_type, ttl, q_name, q_ip, r_addr, iso_code))
log_entry(csv_line, '/var/log/pfblockerng/dns_reply.log')
log_entry(csv_line, '/var/log/pfblockerng/unified.log')
if pfb['async_io']:
executor = pfb['async_io_executor']
executor.submit(log_entry, csv_line, '/var/log/pfblockerng/dns_reply.log')
executor.submit(log_entry, csv_line, '/var/log/pfblockerng/unified.log')
else:
log_entry(csv_line, '/var/log/pfblockerng/dns_reply.log')
log_entry(csv_line, '/var/log/pfblockerng/unified.log')

return True

Expand Down Expand Up @@ -1093,6 +1132,9 @@ def deinit(id):

if pfb['python_maxmind']:
maxmindReader.close()

if pfb['async_io']:
pfb['async_io_executor'].shutdown()

log_info('[pfBlockerNG]: pfb_unbound.py script exiting')
return True
Expand Down Expand Up @@ -1595,7 +1637,7 @@ def operate(id, event, qstate, qdata):
dnsblDB[q_name_original] = {'qname': q_name_original, 'b_type': b_type, 'p_type': p_type, 'b_ip': b_ip, 'log': log_type, 'feed': feed, 'group': group, 'b_eval': b_eval }

# Add domain data to DNSBL cache for Reports tab
write_sqlite(3, '', [b_type, q_name, group, b_eval, feed])
write_sqlite_async(3, '', [b_type, q_name, group, b_eval, feed])

# Use previously blocked domain details
else:
Expand Down