diff --git a/xgboost_ray/matrix.py b/xgboost_ray/matrix.py index bc1410c4..10b26fa0 100644 --- a/xgboost_ray/matrix.py +++ b/xgboost_ray/matrix.py @@ -48,6 +48,31 @@ def concat_dataframes(dfs: List[Optional[pd.DataFrame]]): return pd.concat(filtered, ignore_index=True, copy=False) +def ensure_sorted_by_qid(df: pd.DataFrame, qid: Data + ) -> Tuple[Union[np.array, str], pd.DataFrame]: + _qid: pd.Series = None + if isinstance(qid, str): + _qid = df[qid] + elif isinstance(qid, np.ndarray): + _qid = pd.Series(qid) + elif isinstance(qid, pd.DataFrame): + if len(df.shape) != 2 and df.shape[1] != 1: + raise ValueError(f"qid argument of type pd.DataFrame is expected" + "to contains only 1 column of data " + f"but the qid passed in is of shape {df.shape}.") + _qid = qid.iloc[:, 0] + elif isinstance(qid, pd.Series): + _qid = qid + if _qid.is_monotonic: + return _qid, df + else: + if isinstance(qid, str): + return qid, df.sort_values([qid]) + else: # case when qid is not part of df + return _qid.sort_values(), \ + df.set_index(_qid).sort_index().reset_index(drop=True) + + @PublicAPI(stability="beta") class RayShardingMode(Enum): """Enum for different modes of sharding the data. @@ -227,6 +252,12 @@ def _split_dataframe( `label_upper_bound` """ + # sort dataframe by qid if exists (required by DMatrix) + if self.qid is not None: + _qid, local_data = ensure_sorted_by_qid(local_data, self.qid) + if not isinstance(self.qid, str): + self.qid = _qid + exclude_cols: Set[str] = set() # Exclude these columns from `x` label, exclude = data_source.get_column(local_data, self.label) diff --git a/xgboost_ray/tests/test_matrix.py b/xgboost_ray/tests/test_matrix.py index 04a59b28..e67e3fbe 100644 --- a/xgboost_ray/tests/test_matrix.py +++ b/xgboost_ray/tests/test_matrix.py @@ -1,6 +1,8 @@ +import inspect import os import tempfile import unittest +import xgboost as xgb import numpy as np import pandas as pd @@ -27,7 +29,7 @@ def setUp(self): @classmethod def setUpClass(cls): - ray.init(num_cpus=1, local_mode=True) + ray.init(local_mode=True) @classmethod def tearDownClass(cls): @@ -356,6 +358,56 @@ def testLegacyParams(self): label_lower_bound=label_lower_bound, label_upper_bound=label_upper_bound) + @unittest.skipIf("qid" not in inspect.signature(xgb.DMatrix).parameters, + f"not supported in xgb version {xgb.__version__}") + def testQidSortedBehaviorXGBoost(self): + """Test that data with unsorted qid is sorted in RayDMatrix""" + in_x = self.x + in_y = self.y + unsorted_qid = np.array([1, 2] * 16) + + from xgboost import DMatrix + with self.assertRaises(ValueError): + DMatrix(**{"data": in_x, "label": in_y, "qid": unsorted_qid}) + DMatrix(**{ + "data": in_x, + "label": in_y, + "qid": np.sort(unsorted_qid) + }) # no exception + # test RayDMatrix handles sorting automatically + mat = RayDMatrix(in_x, in_y, qid=unsorted_qid) + params = mat.get_data(rank=0, num_actors=1) + DMatrix(**params) + + @unittest.skipIf("qid" not in inspect.signature(xgb.DMatrix).parameters, + f"not supported in xgb version {xgb.__version__}") + def testQidSortedParquet(self): + from xgboost import DMatrix + with tempfile.TemporaryDirectory() as dir: + parquet_file1 = os.path.join(dir, "file1.parquet") + parquet_file2 = os.path.join(dir, "file2.parquet") + + unsorted_qid1 = np.array([2, 4] * 16) + unsorted_qid2 = np.array([1, 3] * 16) + + # parquet 1 + data_df = pd.DataFrame(self.x, columns=["a", "b", "c", "d"]) + data_df["label"] = pd.Series(self.y) + data_df["group"] = pd.Series(unsorted_qid1) + data_df.to_parquet(parquet_file1) + # parquet 2 + data_df = pd.DataFrame(self.x, columns=["a", "b", "c", "d"]) + data_df["label"] = pd.Series(self.y) + data_df["group"] = pd.Series(unsorted_qid2) + data_df.to_parquet(parquet_file2) + mat = RayDMatrix( + [parquet_file1, parquet_file2], + columns=["a", "b", "c", "d", "label", "group"], + label="label", + qid="group") + params = mat.get_data(rank=0, num_actors=1) + DMatrix(**params) + if __name__ == "__main__": import pytest