Skip to content

Commit

Permalink
Add sort dataframe logic on qid (#239)
Browse files Browse the repository at this point in the history
* add sort dataframe logic on qid for centralized

* move sorting logic to _split_dataframe

* Update xgboost_ray/matrix.py

Co-authored-by: Antoni Baum <[email protected]>
Signed-off-by: atomic <[email protected]>

* refactor sorting logic to method

* logic for more cases of qid type and add integration test

- add logic to include more case of qid data type (array, dataframe)
- add 2 integration tests to cover behavior for sorting qid

* raise exception for the case when qid DataFrame is using unexpected shape

* fix lint

* more lint fix

* add unittest skip for xgboost 0.9

* Apply suggestions from code review

Signed-off-by: Antoni Baum <[email protected]>

* Fix

Signed-off-by: Antoni Baum <[email protected]>

* Test tweak

Signed-off-by: Antoni Baum <[email protected]>

Signed-off-by: atomic <[email protected]>
Signed-off-by: Antoni Baum <[email protected]>
Co-authored-by: Antoni Baum <[email protected]>
  • Loading branch information
atomic and Yard1 authored Oct 6, 2022
1 parent 536b702 commit ac30013
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 1 deletion.
31 changes: 31 additions & 0 deletions xgboost_ray/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ def concat_dataframes(dfs: List[Optional[pd.DataFrame]]):
return pd.concat(filtered, ignore_index=True, copy=False)


def ensure_sorted_by_qid(df: pd.DataFrame, qid: Data
) -> Tuple[Union[np.array, str], pd.DataFrame]:
_qid: pd.Series = None
if isinstance(qid, str):
_qid = df[qid]
elif isinstance(qid, np.ndarray):
_qid = pd.Series(qid)
elif isinstance(qid, pd.DataFrame):
if len(df.shape) != 2 and df.shape[1] != 1:
raise ValueError(f"qid argument of type pd.DataFrame is expected"
"to contains only 1 column of data "
f"but the qid passed in is of shape {df.shape}.")
_qid = qid.iloc[:, 0]
elif isinstance(qid, pd.Series):
_qid = qid
if _qid.is_monotonic:
return _qid, df
else:
if isinstance(qid, str):
return qid, df.sort_values([qid])
else: # case when qid is not part of df
return _qid.sort_values(), \
df.set_index(_qid).sort_index().reset_index(drop=True)


@PublicAPI(stability="beta")
class RayShardingMode(Enum):
"""Enum for different modes of sharding the data.
Expand Down Expand Up @@ -227,6 +252,12 @@ def _split_dataframe(
`label_upper_bound`
"""
# sort dataframe by qid if exists (required by DMatrix)
if self.qid is not None:
_qid, local_data = ensure_sorted_by_qid(local_data, self.qid)
if not isinstance(self.qid, str):
self.qid = _qid

exclude_cols: Set[str] = set() # Exclude these columns from `x`

label, exclude = data_source.get_column(local_data, self.label)
Expand Down
54 changes: 53 additions & 1 deletion xgboost_ray/tests/test_matrix.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import inspect
import os
import tempfile
import unittest
import xgboost as xgb

import numpy as np
import pandas as pd
Expand All @@ -27,7 +29,7 @@ def setUp(self):

@classmethod
def setUpClass(cls):
ray.init(num_cpus=1, local_mode=True)
ray.init(local_mode=True)

@classmethod
def tearDownClass(cls):
Expand Down Expand Up @@ -356,6 +358,56 @@ def testLegacyParams(self):
label_lower_bound=label_lower_bound,
label_upper_bound=label_upper_bound)

@unittest.skipIf("qid" not in inspect.signature(xgb.DMatrix).parameters,
f"not supported in xgb version {xgb.__version__}")
def testQidSortedBehaviorXGBoost(self):
"""Test that data with unsorted qid is sorted in RayDMatrix"""
in_x = self.x
in_y = self.y
unsorted_qid = np.array([1, 2] * 16)

from xgboost import DMatrix
with self.assertRaises(ValueError):
DMatrix(**{"data": in_x, "label": in_y, "qid": unsorted_qid})
DMatrix(**{
"data": in_x,
"label": in_y,
"qid": np.sort(unsorted_qid)
}) # no exception
# test RayDMatrix handles sorting automatically
mat = RayDMatrix(in_x, in_y, qid=unsorted_qid)
params = mat.get_data(rank=0, num_actors=1)
DMatrix(**params)

@unittest.skipIf("qid" not in inspect.signature(xgb.DMatrix).parameters,
f"not supported in xgb version {xgb.__version__}")
def testQidSortedParquet(self):
from xgboost import DMatrix
with tempfile.TemporaryDirectory() as dir:
parquet_file1 = os.path.join(dir, "file1.parquet")
parquet_file2 = os.path.join(dir, "file2.parquet")

unsorted_qid1 = np.array([2, 4] * 16)
unsorted_qid2 = np.array([1, 3] * 16)

# parquet 1
data_df = pd.DataFrame(self.x, columns=["a", "b", "c", "d"])
data_df["label"] = pd.Series(self.y)
data_df["group"] = pd.Series(unsorted_qid1)
data_df.to_parquet(parquet_file1)
# parquet 2
data_df = pd.DataFrame(self.x, columns=["a", "b", "c", "d"])
data_df["label"] = pd.Series(self.y)
data_df["group"] = pd.Series(unsorted_qid2)
data_df.to_parquet(parquet_file2)
mat = RayDMatrix(
[parquet_file1, parquet_file2],
columns=["a", "b", "c", "d", "label", "group"],
label="label",
qid="group")
params = mat.get_data(rank=0, num_actors=1)
DMatrix(**params)


if __name__ == "__main__":
import pytest
Expand Down

0 comments on commit ac30013

Please sign in to comment.