Skip to content

Commit

Permalink
Merge branch 'main' into renovate/actions-checkout-4.x
Browse files Browse the repository at this point in the history
  • Loading branch information
sharpener6 authored Nov 20, 2024
2 parents e35bc67 + fa65c0c commit 87258bc
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/scorecard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:

steps:
- name: Harden Runner
uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1
uses: step-security/harden-runner@0080882f6c36860b6ba35c610c98ce87d4e2f26f # v2.10.2
with:
egress-policy: audit

Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<a href="https://pypi.org/project/pargraph/">
<img alt="PyPI - Version" src="https://img.shields.io/pypi/v/pargraph?colorA=0f1632&colorB=255be3">
</a>
<img src="https://api.securityscorecards.dev/projects/github.com/Citi/pargraph/badge">
</p>
</div>

Expand Down Expand Up @@ -126,14 +127,14 @@ map_reduce_sort_recursive.to_graph(partition_counts=4).to_dot().write_png("map_r

![Map-Reduce Sort Recursive](docs/_static/map_reduce_sort_recursive.png)

Use the `to_dask` method to convert the generated graph to a Dask task graph.
Use the `to_dict` method to convert the generated graph to a dict graph.

```python
import numpy as np
from distributed import Client

with Client() as client:
client.get(map_reduce_sort.to_graph(partition_count=4).to_dask(array=np.random.rand(20)))[0]
client.get(map_reduce_sort.to_graph(partition_count=4).to_dict(array=np.random.rand(20)))[0]

# [0.06253707 0.06795382 0.11492823 0.14512393 0.20183152 0.41109117
# 0.42613798 0.45156214 0.4714821 0.54000373 0.54902451 0.62671881
Expand Down
2 changes: 1 addition & 1 deletion pargraph/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.8.4"
__version__ = "0.9.0"
28 changes: 14 additions & 14 deletions pargraph/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,23 @@ def set_parallel_backend(self, backend: Backend) -> None:
"""
self.backend = backend

def get(self, dsk: Dict, keys: Any, **kwargs) -> Any:
def get(self, graph: Dict, keys: Any, **kwargs) -> Any:
"""
Compute task graph
Compute dict graph
:param dsk: dask-compatible task graph
:param graph: dict graph
:param keys: keys to compute (e.g. ``"x"``, ``["x", "y", "z"]``, etc)
:param kwargs: keyword arguments to forward to the parallel backend
:return: results in the same structure as keys
"""
keyset = set(self._flatten_iter([keys]))

# cull graph to remove any unnecessary dependencies
graphlib_graph = self._cull_graph(self._convert_dsk_to_graph(dsk), keyset)
graphlib_graph = self._cull_graph(self._get_graph_dependencies(graph), keyset)
ref_count_graph = self._create_ref_count_graph(graphlib_graph)

graph = TopologicalSorter(graphlib_graph)
graph.prepare()
topological_sorter = TopologicalSorter(graphlib_graph)
topological_sorter.prepare()

results: Dict[Hashable, Any] = {}
future_to_key: Dict[Future[Any], Hashable] = {}
Expand Down Expand Up @@ -95,14 +95,14 @@ def wait_for_completed_futures():
future_to_key.pop(done_future, None)
done_keys.append(key)

graph.done(*done_keys)
topological_sorter.done(*done_keys)
for done_key in done_keys:
dereference_key(done_key)

# while there are still unscheduled tasks
while graph.is_active():
while topological_sorter.is_active():
# get in vertices
in_keys = graph.get_ready()
in_keys = topological_sorter.get_ready()

# if there are no in-vertices, wait for a future to resolve
# IMPORTANT: we make the assumption that the graph is acyclic
Expand All @@ -111,15 +111,15 @@ def wait_for_completed_futures():
continue

for in_key in in_keys:
computation = dsk[in_key]
computation = graph[in_key]

if self._is_submittable_function_computation(computation):
future = self._submit_function_computation(computation, results, **kwargs)
future_to_key[future] = in_key
else:
result = self._evaluate_computation(computation, results)
results[in_key] = result
graph.done(in_key)
topological_sorter.done(in_key)
dereference_key(in_key)

# resolve all pending futures
Expand Down Expand Up @@ -183,8 +183,8 @@ def _evaluate_computation(cls, computation: Any, results: Dict) -> Optional[Any]
return computation

@staticmethod
def _convert_dsk_to_graph(dsk: Dict) -> Dict:
keys = set(dsk.keys())
def _get_graph_dependencies(graph: Dict) -> Dict:
keys = set(graph.keys())

def flatten(value: Any) -> Set[Any]:
# handle tasks as tuples
Expand All @@ -209,7 +209,7 @@ def flatten(value: Any) -> Set[Any]:

return set()

return {key: flatten(value) for key, value in dsk.items()}
return {key: flatten(value) for key, value in graph.items()}

@staticmethod
def _create_ref_count_graph(graph: Dict) -> Dict:
Expand Down
Loading

0 comments on commit 87258bc

Please sign in to comment.