Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ParentProcessChecker Thread #2808

Merged
merged 5 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ruff-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ jobs:
- uses: actions/checkout@v4
- uses: chartboost/ruff-action@v1
with:
version: 0.7.4
version: 0.8.2
2 changes: 1 addition & 1 deletion .github/workflows/ruff-format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ jobs:
- uses: chartboost/ruff-action@v1
with:
args: "format --check"
version: 0.7.4
version: 0.8.2
4,634 changes: 2,324 additions & 2,310 deletions apiary.apib

Large diffs are not rendered by default.

45 changes: 23 additions & 22 deletions counterparty-core/counterpartycore/lib/api/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@
database.update_version(state_db)


def run_api_server(args, server_ready_value, stop_event):
def run_api_server(args, server_ready_value, stop_event, parent_pid):
logger.info("Starting API Server process...")

def handle_interrupt_signal(signum, frame):
Expand Down Expand Up @@ -513,7 +513,7 @@
wsgi_server = wsgi.WSGIApplication(app, args=args)

logger.info("Starting Parent Process Checker thread...")
parent_checker = ParentProcessChecker(wsgi_server)
parent_checker = ParentProcessChecker(wsgi_server, stop_event, parent_pid)
parent_checker.start()

app.app_context().push()
Expand All @@ -539,52 +539,53 @@
watcher.stop()
watcher.join()

if parent_checker is not None:
logger.trace("Stopping Parent Process Checker thread...")
parent_checker.stop()
parent_checker.join()

logger.info("API Server stopped.")


def is_process_alive(pid):
"""Check For the existence of a unix pid."""
try:

Check warning

Code scanning / pylint

Unnecessary "else" after "return", remove the "else" and de-indent the code inside it. Warning

Unnecessary "else" after "return", remove the "else" and de-indent the code inside it.
os.kill(pid, 0)
except OSError:
return False
else:
return True


# This thread is used for the following two reasons:
# 1. `docker-compose stop` does not send a SIGTERM to the child processes (in this case the API v2 process)
# 2. `process.terminate()` does not trigger a `KeyboardInterrupt` or execute the `finally` block.
class ParentProcessChecker(threading.Thread):
def __init__(self, wsgi_server):
def __init__(self, wsgi_server, stop_event, parent_pid):
super().__init__(name="ParentProcessChecker")
self.daemon = True
self.wsgi_server = wsgi_server
self.stop_event = threading.Event()
self.stop_event = stop_event
self.parent_pid = parent_pid

def run(self):
parent_pid = os.getppid()
try:
while not self.stop_event.is_set():
if os.getppid() != parent_pid:
logger.debug("Parent process is dead. Exiting...")
if self.wsgi_server is not None:
self.wsgi_server.stop()
break
while not self.stop_event.is_set() and is_process_alive(self.parent_pid):
time.sleep(1)
logger.debug("Parent process stopped. Exiting...")
if self.wsgi_server is not None:
self.wsgi_server.stop()
except KeyboardInterrupt:
pass

def stop(self):
self.stop_event.set()


class APIServer(object):
def __init__(self):
def __init__(self, stop_event):
self.process = None
self.server_ready_value = Value("I", 0)
self.stop_event = multiprocessing.Event()
self.stop_event = stop_event

def start(self, args):
if self.process is not None:
raise Exception("API Server is already running")
self.process = Process(
target=run_api_server, args=(vars(args), self.server_ready_value, self.stop_event)
target=run_api_server,
args=(vars(args), self.server_ready_value, self.stop_event, os.getpid()),
)
self.process.start()
return self.process
Expand Down
6 changes: 2 additions & 4 deletions counterparty-core/counterpartycore/lib/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,8 @@ def extract_bitcoincore_config():
"rpcssl": "backend-ssl",
}

for bitcoind_key in config_keys:
for bitcoind_key, counterparty_key in config_keys.items():
if bitcoind_key in conf:
counterparty_key = config_keys[bitcoind_key]
bitcoincore_config[counterparty_key] = conf[bitcoind_key]

return bitcoincore_config
Expand Down Expand Up @@ -144,9 +143,8 @@ def server_to_client_config(server_config):
"rpc-password": "counterparty-rpc-password",
}

for server_key in config_keys:
for server_key, client_key in config_keys.items():
if server_key in server_config:
client_key = config_keys[server_key]
client_config[client_key] = server_config[server_key]

return client_config
Expand Down
7 changes: 6 additions & 1 deletion counterparty-core/counterpartycore/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import binascii
import decimal
import logging
import multiprocessing
import os
import sys
import tarfile
Expand Down Expand Up @@ -739,6 +740,7 @@ def start_all(args):
follower_daemon = None
asset_conservation_checker = None
db = None
api_stop_event = None

# Log all config parameters, sorted by key
# Filter out default values #TODO: these should be set in a different way
Expand Down Expand Up @@ -766,7 +768,8 @@ def start_all(args):
check.software_version()

# API Server v2
api_server_v2 = api_v2.APIServer()
api_stop_event = multiprocessing.Event()
api_server_v2 = api_v2.APIServer(api_stop_event)
api_server_v2.start(args)
while not api_server_v2.is_ready() and not api_server_v2.has_stopped():
logger.trace("Waiting for API server to start...")
Expand Down Expand Up @@ -812,6 +815,8 @@ def start_all(args):
logger.error("Exception caught!", exc_info=e)
finally:
# Ensure all threads are stopped
if api_stop_event:
api_stop_event.set()
if api_status_poller:
api_status_poller.stop()
if api_server_v1:
Expand Down
2 changes: 1 addition & 1 deletion counterparty-core/counterpartycore/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def is_server_ready():
api_v2.is_server_ready = is_server_ready

args = argparse.Namespace(**server_config)
api_server = api_v2.APIServer()
api_server = api_v2.APIServer(None)
api_server.start(args)

# wait for server to be ready
Expand Down
4 changes: 2 additions & 2 deletions counterparty-core/counterpartycore/test/fixtures/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,8 @@ def generate_standard_scenario(address1, address2, order_matches):
"parseblock_unittest_fixture": (PARSEBLOCKS_FIXTURE, "parseblock_unittest_fixture"),
}
# Generate special tests for simplesig, multisig2 and multisig3 using standard scenario.
for scenario_name in standard_scenarios_params:
for scenario_name, params in standard_scenarios_params.items():
INTEGRATION_SCENARIOS[scenario_name] = (
generate_standard_scenario(**standard_scenarios_params[scenario_name]),
generate_standard_scenario(**params),
scenario_name,
)
37 changes: 24 additions & 13 deletions counterparty-core/counterpartycore/test/p2sh_encoding_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ def test_p2sh_encoding_composed(server_db):
source = ADDR[0] # noqa: F841
destination = ADDR[1] # noqa: F841

with util_test.ConfigContext(
DISABLE_ARC4_MOCKING=True, OLD_STYLE_API=True
), util_test.MockProtocolChangesContext(enhanced_sends=True, p2sh_encoding=True):
with (
util_test.ConfigContext(DISABLE_ARC4_MOCKING=True, OLD_STYLE_API=True),
util_test.MockProtocolChangesContext(enhanced_sends=True, p2sh_encoding=True),
):
# BTC Mainnet tx d90dc8637fd2ab9ae39b7c2929c793c5d28d7dea672afb02fb4001637085e9a1
datatxhex = "010000000102d2b137e49e930ef3e436b342713d8d07bd378e773c915a5938993d81dc7e6000000000fdab0147304402207848293e88563750f647e949cb594cdbec0beb4070faac73040d77d479420f8302201e0ac32788e98bd984279102b7382576d7ddb4b125d1d507725cbd12d97a2908014d60014d1401434e5452505254590300010042276049e5518791be2ffe2c301f5dfe9ef85dd0400001720034b0410000000000000001500000006a79811e000000000000000054000079cec1665f4800000000000000050000000ca91f2d660000000000000005402736c8de6e34d54000000000000001500c5e4c71e081ceb00000000000000054000000045dc03ec4000000000000000500004af1271cf5fc00000000000000054001e71f8464432780000000000000015000002e1e4191f0d0000000000000005400012bc4aaac2a54000000000000001500079c7e774e411c00000000000000054000000045dc0a6f00000000000000015000002e1e486f661000000000000000540001c807abe13908000000000000000475410426156245525daa71f2e84a40797bcf28099a2c508662a8a33324a703597b9aa2661a79a82ffb4caaa9b15f4094622fbfa85f8b9dc7381f991f5a265421391cc3ad0075740087ffffffff0100000000000000000e6a0c31d52bf3b404aefaf596cfd000000000"
config.PREFIX = b"CNTRPRTY"
Expand All @@ -66,10 +67,11 @@ def test_p2sh_encoding(server_db):
source = ADDR[0]
destination = ADDR[1]

with util_test.ConfigContext(
DISABLE_ARC4_MOCKING=True, OLD_STYLE_API=True
), util_test.MockProtocolChangesContext(
enhanced_sends=True, p2sh_encoding=True, short_tx_type_id=False
with (
util_test.ConfigContext(DISABLE_ARC4_MOCKING=True, OLD_STYLE_API=True),
util_test.MockProtocolChangesContext(
enhanced_sends=True, p2sh_encoding=True, short_tx_type_id=False
),
):
utxos = dict(
((utxo["txid"], utxo["vout"]), utxo)
Expand Down Expand Up @@ -248,8 +250,11 @@ def test_p2sh_encoding_long_data(server_db):
source = ADDR[0]
destination = ADDR[1] # noqa: F841

with util_test.ConfigContext(OLD_STYLE_API=True), util_test.MockProtocolChangesContext(
enhanced_sends=True, p2sh_encoding=True, short_tx_type_id=False
with (
util_test.ConfigContext(OLD_STYLE_API=True),
util_test.MockProtocolChangesContext(
enhanced_sends=True, p2sh_encoding=True, short_tx_type_id=False
),
):
utxos = dict(
((utxo["txid"], utxo["vout"]), utxo)
Expand Down Expand Up @@ -434,8 +439,11 @@ def test_p2sh_encoding_p2sh_source_not_supported(server_db):
source = P2SH_ADDR[0]
destination = ADDR[1]

with util_test.ConfigContext(OLD_STYLE_API=True), util_test.MockProtocolChangesContext(
enhanced_sends=True, p2sh_encoding=True, short_tx_type_id=False
with (
util_test.ConfigContext(OLD_STYLE_API=True),
util_test.MockProtocolChangesContext(
enhanced_sends=True, p2sh_encoding=True, short_tx_type_id=False
),
):
fee = 20000
fee_per_kb = 50000
Expand All @@ -459,8 +467,11 @@ def test_p2sh_encoding_manual_multisig_transaction(server_db):
source = P2SH_ADDR[0]
destination = ADDR[1]

with util_test.ConfigContext(OLD_STYLE_API=True), util_test.MockProtocolChangesContext(
enhanced_sends=True, p2sh_encoding=True, short_tx_type_id=False
with (
util_test.ConfigContext(OLD_STYLE_API=True),
util_test.MockProtocolChangesContext(
enhanced_sends=True, p2sh_encoding=True, short_tx_type_id=False
),
):
p2sh_source_multisig_pubkeys_binary = [
binascii.unhexlify(p)
Expand Down
Loading
Loading