opsml.data.interfaces._base

  1from pathlib import Path
  2from typing import Any, Dict, List, Optional, Union
  3
  4import joblib
  5import pandas as pd
  6import polars as pl
  7from pydantic import BaseModel, ConfigDict, field_validator
  8
  9from opsml.data.splitter import Data, DataSplit, DataSplitter
 10from opsml.helpers.logging import ArtifactLogger
 11from opsml.helpers.utils import FileUtils
 12from opsml.types import CommonKwargs, Feature, Suffix
 13
 14logger = ArtifactLogger.get_logger()
 15
 16try:
 17    from ydata_profiling import ProfileReport
 18except ModuleNotFoundError:
 19    ProfileReport = Any
 20
 21
 22class DataInterface(BaseModel):
 23    """Base data interface for all data types
 24
 25    Args:
 26        data:
 27            Data. Can be a pyarrow table, pandas dataframe, polars dataframe
 28            or numpy array
 29        dependent_vars:
 30            List of dependent variables. Can be string or index if using numpy
 31        data_splits:
 32            Optional list of `DataSplit`
 33        data_profile:
 34            Optional ydata-profiling `ProfileReport`
 35        feature_map:
 36            Dictionary of features -> automatically generated
 37        feature_descriptions:
 38            Dictionary or feature descriptions
 39        sql_logic:
 40            Sql logic used to generate data
 41
 42    """
 43
 44    data: Optional[Any] = None
 45    data_splits: List[DataSplit] = []
 46    dependent_vars: List[Union[int, str]] = []
 47    data_profile: Optional[ProfileReport] = None
 48    feature_map: Dict[str, Feature] = {}
 49    feature_descriptions: Dict[str, str] = {}
 50    sql_logic: Dict[str, str] = {}
 51
 52    model_config = ConfigDict(
 53        arbitrary_types_allowed=True,
 54        validate_assignment=False,
 55        validate_default=True,
 56    )
 57
 58    @property
 59    def data_type(self) -> str:
 60        return CommonKwargs.UNDEFINED.value
 61
 62    @field_validator("sql_logic", mode="before")
 63    @classmethod
 64    def _load_sql(cls, sql_logic: Dict[str, str]) -> Dict[str, str]:
 65        if not bool(sql_logic):
 66            return sql_logic
 67
 68        for name, query in sql_logic.items():
 69            if ".sql" in query:
 70                try:
 71                    sql_path = FileUtils.find_filepath(name=query)
 72                    with open(sql_path, "r", encoding="utf-8") as file_:
 73                        query_ = file_.read()
 74                    sql_logic[name] = query_
 75
 76                except Exception as error:
 77                    raise ValueError(f"Could not load sql file {query}. {error}") from error
 78
 79        return sql_logic
 80
 81    def add_sql(
 82        self,
 83        name: str,
 84        query: Optional[str] = None,
 85        filename: Optional[str] = None,
 86    ) -> None:
 87        """
 88        Adds a query or query from file to the sql_logic dictionary. Either a query or
 89        a filename pointing to a sql file are required in addition to a name.
 90
 91        Args:
 92            name:
 93                Name for sql query
 94            query:
 95                SQL query
 96            filename: Filename of sql query
 97        """
 98        if query is not None:
 99            self.sql_logic[name] = query
100
101        elif filename is not None:
102            sql_path = str(FileUtils.find_filepath(name=filename))
103            with open(sql_path, "r", encoding="utf-8") as file_:
104                query = file_.read()
105            self.sql_logic[name] = query
106
107        else:
108            raise ValueError("SQL Query or Filename must be provided")
109
110    @field_validator("data_profile", mode="before")
111    @classmethod
112    def _check_profile(cls, profile: Optional[ProfileReport]) -> Optional[ProfileReport]:
113        if profile is not None:
114            from ydata_profiling import ProfileReport as ydata_profile
115
116            assert isinstance(profile, ydata_profile)
117        return profile
118
119    def save_data(self, path: Path) -> None:
120        """Saves data to path. Base implementation use Joblib
121
122        Args:
123            path:
124                Pathlib object
125        """
126        assert self.data is not None, "No data detected in interface"
127        joblib.dump(self.data, path)
128
129        self.feature_map = {
130            "features": Feature(
131                feature_type=str(type(self.data)),
132                shape=CommonKwargs.UNDEFINED.value,
133            )
134        }
135
136    def load_data(self, path: Path) -> None:
137        """Load data from pathlib object
138
139        Args:
140            path:
141                Pathlib object
142        """
143
144        self.data = joblib.load(path)
145
146    def load_data_profile(self, path: Path) -> None:
147        """Load data profile from pathlib object
148
149        Args:
150            path:
151                Pathlib object
152        """
153        self.data_profile = ProfileReport().loads(
154            joblib.load(path),
155        )
156
157    def save_data_profile(self, path: Path) -> None:
158        """Saves data profile to path. Data profiles are saved as joblib
159        joblib
160
161        Args:
162            path:
163                Pathlib object
164        """
165        assert self.data_profile is not None, "No data profile detected in interface"
166
167        if path.suffix == Suffix.HTML.value:
168            profile_artifact = self.data_profile.to_html()
169            path.write_text(profile_artifact, encoding="utf-8")
170        else:
171            profile_artifact = self.data_profile.dumps()
172            joblib.dump(profile_artifact, path)
173
174    def create_data_profile(self, sample_perc: float = 1, name: str = "data_profile") -> ProfileReport:
175        """Creates a data profile report
176
177        Args:
178            sample_perc:
179                Percentage of data to use when creating a profile. Sampling is recommended for large dataframes.
180                Percentage is expressed as a decimal (e.g. 1 = 100%, 0.5 = 50%, etc.)
181            name:
182                Name of data profile
183
184        """
185        from opsml.profile.profile_data import DataProfiler
186
187        if isinstance(self.data, (pl.DataFrame, pd.DataFrame)):
188            if self.data_profile is None:
189                self.data_profile = DataProfiler.create_profile_report(
190                    data=self.data,
191                    name=name,
192                    sample_perc=min(sample_perc, 1),  # max of 1
193                )
194                return self.data_profile
195
196            logger.info("Data profile already exists")
197            return self.data_profile
198
199        raise ValueError("A pandas dataframe type is required to create a data profile")
200
201    def split_data(self) -> Dict[str, Data]:
202        """
203        Loops through data splits and splits data either by indexing or
204        column values
205
206        Example:
207
208            ```python
209            card_info = CardInfo(name="linnerrud", repository="tutorial", contact="user@email.com")
210            data_card = DataCard(
211                info=card_info,
212                data=data,
213                dependent_vars=["Pulse"],
214                # define splits
215                data_splits=[
216                    DataSplit(label="train", indices=train_idx),
217                    DataSplit(label="test", indices=test_idx),
218                ],
219
220            )
221
222            splits = data_card.split_data()
223            print(splits["train"].X.head())
224
225               Chins  Situps  Jumps
226            0    5.0   162.0   60.0
227            1    2.0   110.0   60.0
228            2   12.0   101.0  101.0
229            3   12.0   105.0   37.0
230            4   13.0   155.0   58.0
231            ```
232
233        Returns
234            Class containing data splits
235        """
236        if self.data is None:
237            raise ValueError("Data must not be None. Either supply data or load data")
238
239        if len(self.data_splits) > 0:
240            data_holder: Dict[str, Data] = {}
241            for data_split in self.data_splits:
242                label, data = DataSplitter.split(
243                    split=data_split,
244                    dependent_vars=self.dependent_vars,
245                    data=self.data,
246                    data_type=self.data_type,
247                )
248                data_holder[label] = data
249
250            return data_holder
251        raise ValueError("No data splits provided")
252
253    @property
254    def data_suffix(self) -> str:
255        """Returns suffix for storage"""
256        return Suffix.JOBLIB.value
257
258    @staticmethod
259    def name() -> str:
260        raise NotImplementedError
logger = <builtins.Logger object>
class DataInterface(pydantic.main.BaseModel):
 23class DataInterface(BaseModel):
 24    """Base data interface for all data types
 25
 26    Args:
 27        data:
 28            Data. Can be a pyarrow table, pandas dataframe, polars dataframe
 29            or numpy array
 30        dependent_vars:
 31            List of dependent variables. Can be string or index if using numpy
 32        data_splits:
 33            Optional list of `DataSplit`
 34        data_profile:
 35            Optional ydata-profiling `ProfileReport`
 36        feature_map:
 37            Dictionary of features -> automatically generated
 38        feature_descriptions:
 39            Dictionary or feature descriptions
 40        sql_logic:
 41            Sql logic used to generate data
 42
 43    """
 44
 45    data: Optional[Any] = None
 46    data_splits: List[DataSplit] = []
 47    dependent_vars: List[Union[int, str]] = []
 48    data_profile: Optional[ProfileReport] = None
 49    feature_map: Dict[str, Feature] = {}
 50    feature_descriptions: Dict[str, str] = {}
 51    sql_logic: Dict[str, str] = {}
 52
 53    model_config = ConfigDict(
 54        arbitrary_types_allowed=True,
 55        validate_assignment=False,
 56        validate_default=True,
 57    )
 58
 59    @property
 60    def data_type(self) -> str:
 61        return CommonKwargs.UNDEFINED.value
 62
 63    @field_validator("sql_logic", mode="before")
 64    @classmethod
 65    def _load_sql(cls, sql_logic: Dict[str, str]) -> Dict[str, str]:
 66        if not bool(sql_logic):
 67            return sql_logic
 68
 69        for name, query in sql_logic.items():
 70            if ".sql" in query:
 71                try:
 72                    sql_path = FileUtils.find_filepath(name=query)
 73                    with open(sql_path, "r", encoding="utf-8") as file_:
 74                        query_ = file_.read()
 75                    sql_logic[name] = query_
 76
 77                except Exception as error:
 78                    raise ValueError(f"Could not load sql file {query}. {error}") from error
 79
 80        return sql_logic
 81
 82    def add_sql(
 83        self,
 84        name: str,
 85        query: Optional[str] = None,
 86        filename: Optional[str] = None,
 87    ) -> None:
 88        """
 89        Adds a query or query from file to the sql_logic dictionary. Either a query or
 90        a filename pointing to a sql file are required in addition to a name.
 91
 92        Args:
 93            name:
 94                Name for sql query
 95            query:
 96                SQL query
 97            filename: Filename of sql query
 98        """
 99        if query is not None:
100            self.sql_logic[name] = query
101
102        elif filename is not None:
103            sql_path = str(FileUtils.find_filepath(name=filename))
104            with open(sql_path, "r", encoding="utf-8") as file_:
105                query = file_.read()
106            self.sql_logic[name] = query
107
108        else:
109            raise ValueError("SQL Query or Filename must be provided")
110
111    @field_validator("data_profile", mode="before")
112    @classmethod
113    def _check_profile(cls, profile: Optional[ProfileReport]) -> Optional[ProfileReport]:
114        if profile is not None:
115            from ydata_profiling import ProfileReport as ydata_profile
116
117            assert isinstance(profile, ydata_profile)
118        return profile
119
120    def save_data(self, path: Path) -> None:
121        """Saves data to path. Base implementation use Joblib
122
123        Args:
124            path:
125                Pathlib object
126        """
127        assert self.data is not None, "No data detected in interface"
128        joblib.dump(self.data, path)
129
130        self.feature_map = {
131            "features": Feature(
132                feature_type=str(type(self.data)),
133                shape=CommonKwargs.UNDEFINED.value,
134            )
135        }
136
137    def load_data(self, path: Path) -> None:
138        """Load data from pathlib object
139
140        Args:
141            path:
142                Pathlib object
143        """
144
145        self.data = joblib.load(path)
146
147    def load_data_profile(self, path: Path) -> None:
148        """Load data profile from pathlib object
149
150        Args:
151            path:
152                Pathlib object
153        """
154        self.data_profile = ProfileReport().loads(
155            joblib.load(path),
156        )
157
158    def save_data_profile(self, path: Path) -> None:
159        """Saves data profile to path. Data profiles are saved as joblib
160        joblib
161
162        Args:
163            path:
164                Pathlib object
165        """
166        assert self.data_profile is not None, "No data profile detected in interface"
167
168        if path.suffix == Suffix.HTML.value:
169            profile_artifact = self.data_profile.to_html()
170            path.write_text(profile_artifact, encoding="utf-8")
171        else:
172            profile_artifact = self.data_profile.dumps()
173            joblib.dump(profile_artifact, path)
174
175    def create_data_profile(self, sample_perc: float = 1, name: str = "data_profile") -> ProfileReport:
176        """Creates a data profile report
177
178        Args:
179            sample_perc:
180                Percentage of data to use when creating a profile. Sampling is recommended for large dataframes.
181                Percentage is expressed as a decimal (e.g. 1 = 100%, 0.5 = 50%, etc.)
182            name:
183                Name of data profile
184
185        """
186        from opsml.profile.profile_data import DataProfiler
187
188        if isinstance(self.data, (pl.DataFrame, pd.DataFrame)):
189            if self.data_profile is None:
190                self.data_profile = DataProfiler.create_profile_report(
191                    data=self.data,
192                    name=name,
193                    sample_perc=min(sample_perc, 1),  # max of 1
194                )
195                return self.data_profile
196
197            logger.info("Data profile already exists")
198            return self.data_profile
199
200        raise ValueError("A pandas dataframe type is required to create a data profile")
201
202    def split_data(self) -> Dict[str, Data]:
203        """
204        Loops through data splits and splits data either by indexing or
205        column values
206
207        Example:
208
209            ```python
210            card_info = CardInfo(name="linnerrud", repository="tutorial", contact="user@email.com")
211            data_card = DataCard(
212                info=card_info,
213                data=data,
214                dependent_vars=["Pulse"],
215                # define splits
216                data_splits=[
217                    DataSplit(label="train", indices=train_idx),
218                    DataSplit(label="test", indices=test_idx),
219                ],
220
221            )
222
223            splits = data_card.split_data()
224            print(splits["train"].X.head())
225
226               Chins  Situps  Jumps
227            0    5.0   162.0   60.0
228            1    2.0   110.0   60.0
229            2   12.0   101.0  101.0
230            3   12.0   105.0   37.0
231            4   13.0   155.0   58.0
232            ```
233
234        Returns
235            Class containing data splits
236        """
237        if self.data is None:
238            raise ValueError("Data must not be None. Either supply data or load data")
239
240        if len(self.data_splits) > 0:
241            data_holder: Dict[str, Data] = {}
242            for data_split in self.data_splits:
243                label, data = DataSplitter.split(
244                    split=data_split,
245                    dependent_vars=self.dependent_vars,
246                    data=self.data,
247                    data_type=self.data_type,
248                )
249                data_holder[label] = data
250
251            return data_holder
252        raise ValueError("No data splits provided")
253
254    @property
255    def data_suffix(self) -> str:
256        """Returns suffix for storage"""
257        return Suffix.JOBLIB.value
258
259    @staticmethod
260    def name() -> str:
261        raise NotImplementedError

Base data interface for all data types

Arguments:
  • data: Data. Can be a pyarrow table, pandas dataframe, polars dataframe or numpy array
  • dependent_vars: List of dependent variables. Can be string or index if using numpy
  • data_splits: Optional list of DataSplit
  • data_profile: Optional ydata-profiling ProfileReport
  • feature_map: Dictionary of features -> automatically generated
  • feature_descriptions: Dictionary or feature descriptions
  • sql_logic: Sql logic used to generate data
data: Optional[Any]
data_splits: List[opsml.data.splitter.DataSplit]
dependent_vars: List[Union[int, str]]
data_profile: Optional[ydata_profiling.profile_report.ProfileReport]
feature_map: Dict[str, opsml.types.model.Feature]
feature_descriptions: Dict[str, str]
sql_logic: Dict[str, str]
model_config = {'arbitrary_types_allowed': True, 'validate_assignment': False, 'validate_default': True}
data_type: str
59    @property
60    def data_type(self) -> str:
61        return CommonKwargs.UNDEFINED.value
def add_sql( self, name: str, query: Optional[str] = None, filename: Optional[str] = None) -> None:
 82    def add_sql(
 83        self,
 84        name: str,
 85        query: Optional[str] = None,
 86        filename: Optional[str] = None,
 87    ) -> None:
 88        """
 89        Adds a query or query from file to the sql_logic dictionary. Either a query or
 90        a filename pointing to a sql file are required in addition to a name.
 91
 92        Args:
 93            name:
 94                Name for sql query
 95            query:
 96                SQL query
 97            filename: Filename of sql query
 98        """
 99        if query is not None:
100            self.sql_logic[name] = query
101
102        elif filename is not None:
103            sql_path = str(FileUtils.find_filepath(name=filename))
104            with open(sql_path, "r", encoding="utf-8") as file_:
105                query = file_.read()
106            self.sql_logic[name] = query
107
108        else:
109            raise ValueError("SQL Query or Filename must be provided")

Adds a query or query from file to the sql_logic dictionary. Either a query or a filename pointing to a sql file are required in addition to a name.

Arguments:
  • name: Name for sql query
  • query: SQL query
  • filename: Filename of sql query
def save_data(self, path: pathlib.Path) -> None:
120    def save_data(self, path: Path) -> None:
121        """Saves data to path. Base implementation use Joblib
122
123        Args:
124            path:
125                Pathlib object
126        """
127        assert self.data is not None, "No data detected in interface"
128        joblib.dump(self.data, path)
129
130        self.feature_map = {
131            "features": Feature(
132                feature_type=str(type(self.data)),
133                shape=CommonKwargs.UNDEFINED.value,
134            )
135        }

Saves data to path. Base implementation use Joblib

Arguments:
  • path: Pathlib object
def load_data(self, path: pathlib.Path) -> None:
137    def load_data(self, path: Path) -> None:
138        """Load data from pathlib object
139
140        Args:
141            path:
142                Pathlib object
143        """
144
145        self.data = joblib.load(path)

Load data from pathlib object

Arguments:
  • path: Pathlib object
def load_data_profile(self, path: pathlib.Path) -> None:
147    def load_data_profile(self, path: Path) -> None:
148        """Load data profile from pathlib object
149
150        Args:
151            path:
152                Pathlib object
153        """
154        self.data_profile = ProfileReport().loads(
155            joblib.load(path),
156        )

Load data profile from pathlib object

Arguments:
  • path: Pathlib object
def save_data_profile(self, path: pathlib.Path) -> None:
158    def save_data_profile(self, path: Path) -> None:
159        """Saves data profile to path. Data profiles are saved as joblib
160        joblib
161
162        Args:
163            path:
164                Pathlib object
165        """
166        assert self.data_profile is not None, "No data profile detected in interface"
167
168        if path.suffix == Suffix.HTML.value:
169            profile_artifact = self.data_profile.to_html()
170            path.write_text(profile_artifact, encoding="utf-8")
171        else:
172            profile_artifact = self.data_profile.dumps()
173            joblib.dump(profile_artifact, path)

Saves data profile to path. Data profiles are saved as joblib joblib

Arguments:
  • path: Pathlib object
def create_data_profile( self, sample_perc: float = 1, name: str = 'data_profile') -> ydata_profiling.profile_report.ProfileReport:
175    def create_data_profile(self, sample_perc: float = 1, name: str = "data_profile") -> ProfileReport:
176        """Creates a data profile report
177
178        Args:
179            sample_perc:
180                Percentage of data to use when creating a profile. Sampling is recommended for large dataframes.
181                Percentage is expressed as a decimal (e.g. 1 = 100%, 0.5 = 50%, etc.)
182            name:
183                Name of data profile
184
185        """
186        from opsml.profile.profile_data import DataProfiler
187
188        if isinstance(self.data, (pl.DataFrame, pd.DataFrame)):
189            if self.data_profile is None:
190                self.data_profile = DataProfiler.create_profile_report(
191                    data=self.data,
192                    name=name,
193                    sample_perc=min(sample_perc, 1),  # max of 1
194                )
195                return self.data_profile
196
197            logger.info("Data profile already exists")
198            return self.data_profile
199
200        raise ValueError("A pandas dataframe type is required to create a data profile")

Creates a data profile report

Arguments:
  • sample_perc: Percentage of data to use when creating a profile. Sampling is recommended for large dataframes. Percentage is expressed as a decimal (e.g. 1 = 100%, 0.5 = 50%, etc.)
  • name: Name of data profile
def split_data(self) -> Dict[str, opsml.data.splitter.Data]:
202    def split_data(self) -> Dict[str, Data]:
203        """
204        Loops through data splits and splits data either by indexing or
205        column values
206
207        Example:
208
209            ```python
210            card_info = CardInfo(name="linnerrud", repository="tutorial", contact="user@email.com")
211            data_card = DataCard(
212                info=card_info,
213                data=data,
214                dependent_vars=["Pulse"],
215                # define splits
216                data_splits=[
217                    DataSplit(label="train", indices=train_idx),
218                    DataSplit(label="test", indices=test_idx),
219                ],
220
221            )
222
223            splits = data_card.split_data()
224            print(splits["train"].X.head())
225
226               Chins  Situps  Jumps
227            0    5.0   162.0   60.0
228            1    2.0   110.0   60.0
229            2   12.0   101.0  101.0
230            3   12.0   105.0   37.0
231            4   13.0   155.0   58.0
232            ```
233
234        Returns
235            Class containing data splits
236        """
237        if self.data is None:
238            raise ValueError("Data must not be None. Either supply data or load data")
239
240        if len(self.data_splits) > 0:
241            data_holder: Dict[str, Data] = {}
242            for data_split in self.data_splits:
243                label, data = DataSplitter.split(
244                    split=data_split,
245                    dependent_vars=self.dependent_vars,
246                    data=self.data,
247                    data_type=self.data_type,
248                )
249                data_holder[label] = data
250
251            return data_holder
252        raise ValueError("No data splits provided")

Loops through data splits and splits data either by indexing or column values

Example:
card_info = CardInfo(name="linnerrud", repository="tutorial", contact="user@email.com")
data_card = DataCard(
    info=card_info,
    data=data,
    dependent_vars=["Pulse"],
    # define splits
    data_splits=[
        DataSplit(label="train", indices=train_idx),
        DataSplit(label="test", indices=test_idx),
    ],

)

splits = data_card.split_data()
print(splits["train"].X.head())

   Chins  Situps  Jumps
0    5.0   162.0   60.0
1    2.0   110.0   60.0
2   12.0   101.0  101.0
3   12.0   105.0   37.0
4   13.0   155.0   58.0

Returns Class containing data splits

data_suffix: str
254    @property
255    def data_suffix(self) -> str:
256        """Returns suffix for storage"""
257        return Suffix.JOBLIB.value

Returns suffix for storage

@staticmethod
def name() -> str:
259    @staticmethod
260    def name() -> str:
261        raise NotImplementedError
model_fields = {'data': FieldInfo(annotation=Union[Any, NoneType], required=False), 'data_splits': FieldInfo(annotation=List[DataSplit], required=False, default=[]), 'dependent_vars': FieldInfo(annotation=List[Union[int, str]], required=False, default=[]), 'data_profile': FieldInfo(annotation=Union[ProfileReport, NoneType], required=False), 'feature_map': FieldInfo(annotation=Dict[str, Feature], required=False, default={}), 'feature_descriptions': FieldInfo(annotation=Dict[str, str], required=False, default={}), 'sql_logic': FieldInfo(annotation=Dict[str, str], required=False, default={})}
model_computed_fields = {}
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs