Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lack of support for new TimestampNTZType in Spark 3.4 datatypes #1385

Merged
merged 3 commits into from
Oct 19, 2023

Conversation

filipeo2-mck
Copy link
Contributor

When loading a parquet file with timestamp fields into a dataframe using Spark 3.4, pyspark can make use of the new pyspark.sql.types.TimestampNTZType() but these news fields were not declared as equivalent to Pandera's Timestamp.

Stack Trace
Data type '<class 'pyspark.sql.types.TimestampNTZType'>' not understood by Engine.

╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮

...

│ /Users/fo2/c1s/worktree/pyspark_optional_fields/pandera/api/pyspark/model.py:289 in validate     │
│                                                                                                  │
│   286 │   │   """%(validate_doc)s"""                                                             │
│   287 │   │   return cast(                                                                       │
│   288 │   │   │   DataFrameBase[TDataFrameModel],                                                │
│ ❱ 289 │   │   │   cls.to_schema().validate(                                                      │
│   290 │   │   │   │   check_obj, head, tail, sample, random_state, lazy, inplace                 │
│   291 │   │   │   ),                                                                             │
│   292 │   │   )                                                                                  │
│                                                                                                  │
│ /Users/fo2/c1s/worktree/pyspark_optional_fields/pandera/api/pyspark/model.py:242 in to_schema    │
│                                                                                                  │
│   239 │   │   )                                                                                  │
│   240 │   │   cls.__root_checks__ = df_custom_checks + df_registered_checks                      │
│   241 │   │                                                                                      │
│ ❱ 242 │   │   columns = cls._build_columns_index(cls.__fields__, cls.__checks__)                 │
│   243 │   │                                                                                      │
│   244 │   │   kwargs = {}                                                                        │
│   245 │   │   if cls.__config__ is not None:                                                     │
│                                                                                                  │
│ /Users/fo2/c1s/worktree/pyspark_optional_fields/pandera/api/pyspark/model.py:330 in              │
│ _build_columns_index                                                                             │
│                                                                                                  │
│   327 │   │   │   │   │   │   f"'check_name' is not supported for {field_name}."                 │
│   328 │   │   │   │   │   )                                                                      │
│   329 │   │   │   │                                                                              │
│ ❱ 330 │   │   │   │   columns[field_name] = col_constructor(  # type: ignore                     │
│   331 │   │   │   │   │   dtype,                                                                 │
│   332 │   │   │   │   │   required=not annotation.optional,                                      │
│   333 │   │   │   │   │   checks=field_checks,                                                   │
│                                                                                                  │
│ /Users/fo2/c1s/worktree/pyspark_optional_fields/pandera/api/pyspark/model_components.py:61 in    │
│ to_column                                                                                        │
│                                                                                                  │
│    58 │   │   name: str = None,                                                                  │
│    59 │   ) -> Column:                                                                           │
│    60 │   │   """Create a schema_components.Column from a field."""                              │
│ ❱  61 │   │   return self._to_schema_component(                                                  │
│    62 │   │   │   dtype,                                                                         │
│    63 │   │   │   Column,                                                                        │
│    64 │   │   │   nullable=self.nullable,                                                        │
│                                                                                                  │
│ /Users/fo2/c1s/worktree/pyspark_optional_fields/pandera/api/pyspark/model_components.py:51 in    │
│ _to_schema_component                                                                             │
│                                                                                                  │
│    48 │   │   if self.dtype_kwargs:                                                              │
│    49 │   │   │   dtype = dtype(**self.dtype_kwargs)  # type: ignore                             │
│    50 │   │   checks = self.checks + to_checklist(checks)                                        │
│ ❱  51 │   │   return component(dtype, checks=checks, **kwargs)  # type: ignore                   │
│    52 │                                                                                          │
│    53 │   def to_column(                                                                         │
│    54 │   │   self,                                                                              │
│                                                                                                  │
│ /Users/fo2/c1s/worktree/pyspark_optional_fields/pandera/api/pyspark/components.py:70 in __init__ │
│                                                                                                  │
│    67 │   │                                                                                      │
│    68 │   │   See :ref:`here<column>` for more usage details.                                    │
│    69 │   │   """                                                                                │
│ ❱  70 │   │   super().__init__(                                                                  │
│    71 │   │   │   dtype=dtype,                                                                   │
│    72 │   │   │   checks=checks,                                                                 │
│    73 │   │   │   nullable=nullable,                                                             │
│                                                                                                  │
│ /Users/fo2/c1s/worktree/pyspark_optional_fields/pandera/api/pyspark/column_schema.py:52 in       │
│ __init__                                                                                         │
│                                                                                                  │
│    49 │   │   :type nullable: bool                                                               │
│    50 │   │   """                                                                                │
│    51 │   │                                                                                      │
│ ❱  52 │   │   super().__init__(                                                                  │
│    53 │   │   │   dtype=dtype,                                                                   │
│    54 │   │   │   checks=checks,                                                                 │
│    55 │   │   │   coerce=coerce,                                                                 │
│                                                                                                  │
│ /Users/fo2/c1s/worktree/pyspark_optional_fields/pandera/api/base/schema.py:39 in __init__        │
│                                                                                                  │
│    36 │   │   drop_invalid_rows=False,                                                           │
│    37 │   ):                                                                                     │
│    38 │   │   """Abstract base schema initializer."""                                            │
│ ❱  39 │   │   self.dtype = dtype                                                                 │
│    40 │   │   self.checks = checks                                                               │
│    41 │   │   self.coerce = coerce                                                               │
│    42 │   │   self.name = name                                                                   │
│                                                                                                  │
│ /Users/fo2/c1s/worktree/pyspark_optional_fields/pandera/api/pyspark/column_schema.py:81 in dtype │
│                                                                                                  │
│    78 │   def dtype(self, value: Optional[PySparkDtypeInputTypes]) -> None:                      │
│    79 │   │   """Set the pyspark dtype"""                                                        │
│    80 │   │   self._dtype = (                                                                    │
│ ❱  81 │   │   │   pyspark_engine.Engine.dtype(value) if value else None                          │
│    82 │   │   )  # pylint:disable=no-value-for-parameter                                         │
│    83 │                                                                                          │
│    84 │   def validate(                                                                          │
│                                                                                                  │
│ /Users/fo2/c1s/worktree/pyspark_optional_fields/pandera/engines/pyspark_engine.py:125 in dtype   │
│                                                                                                  │
│   122 │   │   │   │   subst = ""                                                                 │
│   123 │   │   │   │   # You can manually specify the number of replacements by changing the 4t   │
│   124 │   │   │   │   data_type = re.sub(regex, subst, data_type, 0, re.MULTILINE)               │
│ ❱ 125 │   │   │   return engine.Engine.dtype(cls, data_type)                                     │
│   126 │   │   except TypeError:  # pylint:disable=try-except-raise # pragma: no cover            │
│   127 │   │   │   raise                                                                          │
│   128                                                                                            │
│                                                                                                  │
│ /Users/fo2/c1s/worktree/pyspark_optional_fields/pandera/engines/engine.py:271 in dtype           │
│                                                                                                  │
│   268 │   │   try:                                                                               │
│   269 │   │   │   return registry.dispatch(data_type)                                            │
│   270 │   │   except (KeyError, ValueError):                                                     │
│ ❱ 271 │   │   │   raise TypeError(                                                               │
│   272 │   │   │   │   f"Data type '{data_type}' not understood by {cls.__name__}."               │
│   273 │   │   │   ) from None                                                                    │
│   274                                                                                            │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
TypeError: Data type '<class 'pyspark.sql.types.TimestampNTZType'>' not understood by Engine.

Issue

The pyspark_engine.py module was not prepared to process pyspark.sql.types.TimestampNTZType() as a Timestamp.

Proposed solution

As this type is exclusive to Spark 3.4 and newer versions, an IF condition was needed to ensure that it's added only when pyspark >= 3.4 is being used, both in the main code as in tests:

Behavior when using PySpark 3.3, with common types only:
image

Behavior when using PySpark 3.4, with the new type:
image

Additional details

Version in use: 0.17.2
OS: MacOS 13.6 (M1)

@filipeo2-mck filipeo2-mck marked this pull request as ready for review October 18, 2023 13:54
Copy link

@kasperjanehag kasperjanehag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@codecov
Copy link

codecov bot commented Oct 18, 2023

Codecov Report

All modified lines are covered by tests ✅

Comparison is base (4425ad8) 93.92% compared to head (0dce486) 93.92%.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #1385   +/-   ##
=======================================
  Coverage   93.92%   93.92%           
=======================================
  Files          91       91           
  Lines        6781     6787    +6     
=======================================
+ Hits         6369     6375    +6     
  Misses        412      412           
Files Coverage Δ
pandera/engines/pyspark_engine.py 93.78% <100.00%> (+0.24%) ⬆️

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@cosmicBboy
Copy link
Collaborator

Thanks, and congrats on your first PR @filipeo2-mck ! 🚀

@cosmicBboy cosmicBboy merged commit 0c48778 into unionai-oss:main Oct 19, 2023
55 checks passed
Copy link
Collaborator

@NeerajMalhotra-QB NeerajMalhotra-QB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!!!

@NeerajMalhotra-QB NeerajMalhotra-QB self-requested a review October 19, 2023 20:57
@kasperjanehag
Copy link

Amazing stuff @filipeo2-mck. Thanks for the effort!

noklam pushed a commit to noklam/pandera that referenced this pull request Oct 29, 2023
…nionai-oss#1385)

* add TimestampNTZType as equivalents and add parameters to test case

Signed-off-by: Filipe Oliveira <[email protected]>

* parse version to improve robustness

Signed-off-by: Filipe Oliveira <[email protected]>

---------

Signed-off-by: Filipe Oliveira <[email protected]>
Signed-off-by: Nok <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants