Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PySpark] Improve validation performance by enabling cache()/unpersist() toggles #1414

Merged

Conversation

filipeo2-mck
Copy link
Contributor

@filipeo2-mck filipeo2-mck commented Nov 9, 2023

This PR is related to the discussed solutions in the issue #1409, about current PySpark low performance in complex dataframes/pipelines.

It enables the capability of caching the dataframe-to-be-validated before the validation process starts, to avoid letting Spark reprocess the dataframe DAG every time a new schema/data check is executed.

Formal documentation explaining the usage and reasoning about this new improvement was added, please take a look at docs/source/pyspark_sql.rst


In my internal tests, which contains 4 differently sized input dataframes that are transformed in a final dataframe (that is also validated and written to disk), enabling the new cache flag (export PANDERA_CACHE_DATAFRAME=True) decreased the processing time from 80 minutes to 17 minutes (21% of the original processing time).

Enabling also the new "keep the persisted cache after ending validation" (export PANDERA_KEEP_CACHED_DATAFRAME=False) saved one more minute (16 minutes, 20%). It gives to the user a finer control over its cluster's cache.

Each test with the flags above mentioned ran 3 times and these timings were consistent across runs.
The improvements from PR #1403 were not applied to above tests. Having both will give us a big boost in performance (at least when nullables are being checked)

@filipeo2-mck filipeo2-mck changed the title [PySpark] Improve performance by enabling cache()/unpersist() toggles [PySpark] Improve validation performance by enabling cache()/unpersist() toggles Nov 9, 2023
Copy link

codecov bot commented Nov 9, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (af0e5c0) 94.23% compared to head (eec060b) 94.26%.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1414      +/-   ##
==========================================
+ Coverage   94.23%   94.26%   +0.02%     
==========================================
  Files          91       91              
  Lines        6976     7009      +33     
==========================================
+ Hits         6574     6607      +33     
  Misses        402      402              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@filipeo2-mck filipeo2-mck marked this pull request as ready for review November 10, 2023 13:46
@filipeo2-mck
Copy link
Contributor Author

@NeerajMalhotra-QB , for your evaluation, please

…park_performance_cache

    Signed-off-by: Filipe Oliveira <[email protected]>
docs/.DS_Store Outdated Show resolved Hide resolved


def cache_check_obj():
"""This decorator evaluates if `check_obj` can be cached before validation.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticing, this isn't true for the other dectorators in this file either but would it make sense to cleariy in the docstring that this is a decorator factory and should decorated with cache_check_obj?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something similar to

    """
    A decorator factory that creates a decorator to evaluate if `check_obj` can be cached before validation.

    As each new data check added to the Pandera schema by the user triggers 
    a new Spark action, Spark reprocesses the `check_obj` DataFrame multiple times.
    To prevent this waste of processing resources and to reduce validation times 
    in complex scenarios, the decorator created by this factory caches the 
    `check_obj` DataFrame before validation and unpersists it afterwards.

    The behavior of the resulting decorator depends on the `PANDERA_PYSPARK_CACHING` 
    environment variable.

    Usage:
        @cache_check_obj()
        def your_function(...):
            ...

    Note: This is not a direct decorator but a factory that returns a decorator.
    """

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked the new explanation (I'll make use of it hehe) but I'm not sure if explaining this common design pattern is valuable here. We would need to add this explanation to others decorators too, to keep the standard and we would end bloating the docstrings with repeated information.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed!

Copy link

@kasperjanehag kasperjanehag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! Left a few comments. :)

@@ -3,6 +3,7 @@
dask-worker-space
spark-warehouse
docs/source/_contents
**.DS_Store
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring MacOS specific files

Comment on lines +34 to +42
@cache_check_obj()
def func_w_check_obj_args(self, check_obj: DataFrame, /):
"""Right function to use this decorator, check_obj as arg."""
return check_obj.columns

@cache_check_obj()
def func_w_check_obj_kwargs(self, *, check_obj: DataFrame = None):
"""Right function to use this decorator, check_obj as kwarg."""
return check_obj.columns
Copy link
Contributor Author

@filipeo2-mck filipeo2-mck Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check_obj can be passed as an arg or a kwarg now. Unit tests were added too.
Thank you for noticing that @maxispeicher 👍

Copy link

@kasperjanehag kasperjanehag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@cosmicBboy
Copy link
Collaborator

This is awesome @filipeo2-mck !

Would recommend renaming PANDERA_PYSPARK_UNPERSIST to PANDERA_PYSPARK_PERSIST_CACHE so that the flag is "positive" (True) in order to enable it.

Also a quick question for this feature: are the PANDERA_PYSPARK_CACHE and PANDERA_PYSPARK_UNPERSIST abstractions make sense for other dataframe libraries? If there's a chance this could apply elsewhere, I think this can be more generic, e.g. PANDERA_PRECACHE_DATAFRAME.

@filipeo2-mck
Copy link
Contributor Author

filipeo2-mck commented Nov 16, 2023

Hi Niels!

Would recommend renaming PANDERA_PYSPARK_UNPERSIST to PANDERA_PYSPARK_PERSIST_CACHE so that the flag is "positive" (True) in order to enable it.

Done! The suggested rename makes sense. I've opted for keeping it as PANDERA_PYSPARK_KEEP_CACHE because PySpark has another .persist() method that does something similar (with its differences), to avoid confusion to the users.


Also a quick question for this feature: are the PANDERA_PYSPARK_CACHE and PANDERA_PYSPARK_UNPERSIST abstractions make sense for other dataframe libraries? If there's a chance this could apply elsewhere, I think this can be more generic, e.g. PANDERA_PRECACHE_DATAFRAME.

I don't think we have something similar in other existing supported libraries. I've searched for caching capabilities into Pandas and I couldn't find anything close to that. The closest option would be the use of .to_pickle() in a dataframe but this would be closer to use a .checkpoint() in PySpark, that is the approach from #1409 that we opted for not taking.

@filipeo2-mck
Copy link
Contributor Author

I was evaluating Polars and it has a .cache() method too.
As Polars support is expected, I'm going to rename it to be generic, as you mentioned.

Signed-off-by: Filipe Oliveira <[email protected]>
@filipeo2-mck
Copy link
Contributor Author

filipeo2-mck commented Nov 16, 2023

@cosmicBboy, renaming done, now the configs are generic (for new integrations): PANDERA_CACHE_DATAFRAME and PANDERA_KEEP_CACHED_DATAFRAME

@cosmicBboy cosmicBboy merged commit bc5e37a into unionai-oss:main Nov 20, 2023
56 checks passed
cosmicBboy added a commit that referenced this pull request Dec 5, 2023
This PR renames the pandera config arguments introduced in this PR:
#1414 and makes the names
more generic. Fixes tests that were broken by the config changes.

Signed-off-by: Niels Bantilan <[email protected]>
cosmicBboy added a commit that referenced this pull request Dec 5, 2023
This PR renames the pandera config arguments introduced in this PR:
#1414 and makes the names
more generic. Fixes tests that were broken by the config changes.

Signed-off-by: Niels Bantilan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants