opsml.data.interfaces._base
1from pathlib import Path 2from typing import Any, Dict, List, Optional, Union 3 4import joblib 5import pandas as pd 6import polars as pl 7from pydantic import BaseModel, ConfigDict, field_validator 8 9from opsml.data.splitter import Data, DataSplit, DataSplitter 10from opsml.helpers.logging import ArtifactLogger 11from opsml.helpers.utils import FileUtils 12from opsml.types import CommonKwargs, Feature, Suffix 13 14logger = ArtifactLogger.get_logger() 15 16try: 17 from ydata_profiling import ProfileReport 18except ModuleNotFoundError: 19 ProfileReport = Any 20 21 22class DataInterface(BaseModel): 23 """Base data interface for all data types 24 25 Args: 26 data: 27 Data. Can be a pyarrow table, pandas dataframe, polars dataframe 28 or numpy array 29 dependent_vars: 30 List of dependent variables. Can be string or index if using numpy 31 data_splits: 32 Optional list of `DataSplit` 33 data_profile: 34 Optional ydata-profiling `ProfileReport` 35 feature_map: 36 Dictionary of features -> automatically generated 37 feature_descriptions: 38 Dictionary or feature descriptions 39 sql_logic: 40 Sql logic used to generate data 41 42 """ 43 44 data: Optional[Any] = None 45 data_splits: List[DataSplit] = [] 46 dependent_vars: List[Union[int, str]] = [] 47 data_profile: Optional[ProfileReport] = None 48 feature_map: Dict[str, Feature] = {} 49 feature_descriptions: Dict[str, str] = {} 50 sql_logic: Dict[str, str] = {} 51 52 model_config = ConfigDict( 53 arbitrary_types_allowed=True, 54 validate_assignment=False, 55 validate_default=True, 56 ) 57 58 @property 59 def data_type(self) -> str: 60 return CommonKwargs.UNDEFINED.value 61 62 @field_validator("sql_logic", mode="before") 63 @classmethod 64 def _load_sql(cls, sql_logic: Dict[str, str]) -> Dict[str, str]: 65 if not bool(sql_logic): 66 return sql_logic 67 68 for name, query in sql_logic.items(): 69 if ".sql" in query: 70 try: 71 sql_path = FileUtils.find_filepath(name=query) 72 with open(sql_path, "r", encoding="utf-8") as file_: 73 query_ = file_.read() 74 sql_logic[name] = query_ 75 76 except Exception as error: 77 raise ValueError(f"Could not load sql file {query}. {error}") from error 78 79 return sql_logic 80 81 def add_sql( 82 self, 83 name: str, 84 query: Optional[str] = None, 85 filename: Optional[str] = None, 86 ) -> None: 87 """ 88 Adds a query or query from file to the sql_logic dictionary. Either a query or 89 a filename pointing to a sql file are required in addition to a name. 90 91 Args: 92 name: 93 Name for sql query 94 query: 95 SQL query 96 filename: Filename of sql query 97 """ 98 if query is not None: 99 self.sql_logic[name] = query 100 101 elif filename is not None: 102 sql_path = str(FileUtils.find_filepath(name=filename)) 103 with open(sql_path, "r", encoding="utf-8") as file_: 104 query = file_.read() 105 self.sql_logic[name] = query 106 107 else: 108 raise ValueError("SQL Query or Filename must be provided") 109 110 @field_validator("data_profile", mode="before") 111 @classmethod 112 def _check_profile(cls, profile: Optional[ProfileReport]) -> Optional[ProfileReport]: 113 if profile is not None: 114 from ydata_profiling import ProfileReport as ydata_profile 115 116 assert isinstance(profile, ydata_profile) 117 return profile 118 119 def save_data(self, path: Path) -> None: 120 """Saves data to path. Base implementation use Joblib 121 122 Args: 123 path: 124 Pathlib object 125 """ 126 assert self.data is not None, "No data detected in interface" 127 joblib.dump(self.data, path) 128 129 self.feature_map = { 130 "features": Feature( 131 feature_type=str(type(self.data)), 132 shape=CommonKwargs.UNDEFINED.value, 133 ) 134 } 135 136 def load_data(self, path: Path) -> None: 137 """Load data from pathlib object 138 139 Args: 140 path: 141 Pathlib object 142 """ 143 144 self.data = joblib.load(path) 145 146 def load_data_profile(self, path: Path) -> None: 147 """Load data profile from pathlib object 148 149 Args: 150 path: 151 Pathlib object 152 """ 153 self.data_profile = ProfileReport().loads( 154 joblib.load(path), 155 ) 156 157 def save_data_profile(self, path: Path) -> None: 158 """Saves data profile to path. Data profiles are saved as joblib 159 joblib 160 161 Args: 162 path: 163 Pathlib object 164 """ 165 assert self.data_profile is not None, "No data profile detected in interface" 166 167 if path.suffix == Suffix.HTML.value: 168 profile_artifact = self.data_profile.to_html() 169 path.write_text(profile_artifact, encoding="utf-8") 170 else: 171 profile_artifact = self.data_profile.dumps() 172 joblib.dump(profile_artifact, path) 173 174 def create_data_profile(self, sample_perc: float = 1, name: str = "data_profile") -> ProfileReport: 175 """Creates a data profile report 176 177 Args: 178 sample_perc: 179 Percentage of data to use when creating a profile. Sampling is recommended for large dataframes. 180 Percentage is expressed as a decimal (e.g. 1 = 100%, 0.5 = 50%, etc.) 181 name: 182 Name of data profile 183 184 """ 185 from opsml.profile.profile_data import DataProfiler 186 187 if isinstance(self.data, (pl.DataFrame, pd.DataFrame)): 188 if self.data_profile is None: 189 self.data_profile = DataProfiler.create_profile_report( 190 data=self.data, 191 name=name, 192 sample_perc=min(sample_perc, 1), # max of 1 193 ) 194 return self.data_profile 195 196 logger.info("Data profile already exists") 197 return self.data_profile 198 199 raise ValueError("A pandas dataframe type is required to create a data profile") 200 201 def split_data(self) -> Dict[str, Data]: 202 """ 203 Loops through data splits and splits data either by indexing or 204 column values 205 206 Example: 207 208 ```python 209 card_info = CardInfo(name="linnerrud", repository="tutorial", contact="user@email.com") 210 data_card = DataCard( 211 info=card_info, 212 data=data, 213 dependent_vars=["Pulse"], 214 # define splits 215 data_splits=[ 216 DataSplit(label="train", indices=train_idx), 217 DataSplit(label="test", indices=test_idx), 218 ], 219 220 ) 221 222 splits = data_card.split_data() 223 print(splits["train"].X.head()) 224 225 Chins Situps Jumps 226 0 5.0 162.0 60.0 227 1 2.0 110.0 60.0 228 2 12.0 101.0 101.0 229 3 12.0 105.0 37.0 230 4 13.0 155.0 58.0 231 ``` 232 233 Returns 234 Class containing data splits 235 """ 236 if self.data is None: 237 raise ValueError("Data must not be None. Either supply data or load data") 238 239 if len(self.data_splits) > 0: 240 data_holder: Dict[str, Data] = {} 241 for data_split in self.data_splits: 242 label, data = DataSplitter.split( 243 split=data_split, 244 dependent_vars=self.dependent_vars, 245 data=self.data, 246 data_type=self.data_type, 247 ) 248 data_holder[label] = data 249 250 return data_holder 251 raise ValueError("No data splits provided") 252 253 @property 254 def data_suffix(self) -> str: 255 """Returns suffix for storage""" 256 return Suffix.JOBLIB.value 257 258 @staticmethod 259 def name() -> str: 260 raise NotImplementedError
logger =
<builtins.Logger object>
class
DataInterface(pydantic.main.BaseModel):
23class DataInterface(BaseModel): 24 """Base data interface for all data types 25 26 Args: 27 data: 28 Data. Can be a pyarrow table, pandas dataframe, polars dataframe 29 or numpy array 30 dependent_vars: 31 List of dependent variables. Can be string or index if using numpy 32 data_splits: 33 Optional list of `DataSplit` 34 data_profile: 35 Optional ydata-profiling `ProfileReport` 36 feature_map: 37 Dictionary of features -> automatically generated 38 feature_descriptions: 39 Dictionary or feature descriptions 40 sql_logic: 41 Sql logic used to generate data 42 43 """ 44 45 data: Optional[Any] = None 46 data_splits: List[DataSplit] = [] 47 dependent_vars: List[Union[int, str]] = [] 48 data_profile: Optional[ProfileReport] = None 49 feature_map: Dict[str, Feature] = {} 50 feature_descriptions: Dict[str, str] = {} 51 sql_logic: Dict[str, str] = {} 52 53 model_config = ConfigDict( 54 arbitrary_types_allowed=True, 55 validate_assignment=False, 56 validate_default=True, 57 ) 58 59 @property 60 def data_type(self) -> str: 61 return CommonKwargs.UNDEFINED.value 62 63 @field_validator("sql_logic", mode="before") 64 @classmethod 65 def _load_sql(cls, sql_logic: Dict[str, str]) -> Dict[str, str]: 66 if not bool(sql_logic): 67 return sql_logic 68 69 for name, query in sql_logic.items(): 70 if ".sql" in query: 71 try: 72 sql_path = FileUtils.find_filepath(name=query) 73 with open(sql_path, "r", encoding="utf-8") as file_: 74 query_ = file_.read() 75 sql_logic[name] = query_ 76 77 except Exception as error: 78 raise ValueError(f"Could not load sql file {query}. {error}") from error 79 80 return sql_logic 81 82 def add_sql( 83 self, 84 name: str, 85 query: Optional[str] = None, 86 filename: Optional[str] = None, 87 ) -> None: 88 """ 89 Adds a query or query from file to the sql_logic dictionary. Either a query or 90 a filename pointing to a sql file are required in addition to a name. 91 92 Args: 93 name: 94 Name for sql query 95 query: 96 SQL query 97 filename: Filename of sql query 98 """ 99 if query is not None: 100 self.sql_logic[name] = query 101 102 elif filename is not None: 103 sql_path = str(FileUtils.find_filepath(name=filename)) 104 with open(sql_path, "r", encoding="utf-8") as file_: 105 query = file_.read() 106 self.sql_logic[name] = query 107 108 else: 109 raise ValueError("SQL Query or Filename must be provided") 110 111 @field_validator("data_profile", mode="before") 112 @classmethod 113 def _check_profile(cls, profile: Optional[ProfileReport]) -> Optional[ProfileReport]: 114 if profile is not None: 115 from ydata_profiling import ProfileReport as ydata_profile 116 117 assert isinstance(profile, ydata_profile) 118 return profile 119 120 def save_data(self, path: Path) -> None: 121 """Saves data to path. Base implementation use Joblib 122 123 Args: 124 path: 125 Pathlib object 126 """ 127 assert self.data is not None, "No data detected in interface" 128 joblib.dump(self.data, path) 129 130 self.feature_map = { 131 "features": Feature( 132 feature_type=str(type(self.data)), 133 shape=CommonKwargs.UNDEFINED.value, 134 ) 135 } 136 137 def load_data(self, path: Path) -> None: 138 """Load data from pathlib object 139 140 Args: 141 path: 142 Pathlib object 143 """ 144 145 self.data = joblib.load(path) 146 147 def load_data_profile(self, path: Path) -> None: 148 """Load data profile from pathlib object 149 150 Args: 151 path: 152 Pathlib object 153 """ 154 self.data_profile = ProfileReport().loads( 155 joblib.load(path), 156 ) 157 158 def save_data_profile(self, path: Path) -> None: 159 """Saves data profile to path. Data profiles are saved as joblib 160 joblib 161 162 Args: 163 path: 164 Pathlib object 165 """ 166 assert self.data_profile is not None, "No data profile detected in interface" 167 168 if path.suffix == Suffix.HTML.value: 169 profile_artifact = self.data_profile.to_html() 170 path.write_text(profile_artifact, encoding="utf-8") 171 else: 172 profile_artifact = self.data_profile.dumps() 173 joblib.dump(profile_artifact, path) 174 175 def create_data_profile(self, sample_perc: float = 1, name: str = "data_profile") -> ProfileReport: 176 """Creates a data profile report 177 178 Args: 179 sample_perc: 180 Percentage of data to use when creating a profile. Sampling is recommended for large dataframes. 181 Percentage is expressed as a decimal (e.g. 1 = 100%, 0.5 = 50%, etc.) 182 name: 183 Name of data profile 184 185 """ 186 from opsml.profile.profile_data import DataProfiler 187 188 if isinstance(self.data, (pl.DataFrame, pd.DataFrame)): 189 if self.data_profile is None: 190 self.data_profile = DataProfiler.create_profile_report( 191 data=self.data, 192 name=name, 193 sample_perc=min(sample_perc, 1), # max of 1 194 ) 195 return self.data_profile 196 197 logger.info("Data profile already exists") 198 return self.data_profile 199 200 raise ValueError("A pandas dataframe type is required to create a data profile") 201 202 def split_data(self) -> Dict[str, Data]: 203 """ 204 Loops through data splits and splits data either by indexing or 205 column values 206 207 Example: 208 209 ```python 210 card_info = CardInfo(name="linnerrud", repository="tutorial", contact="user@email.com") 211 data_card = DataCard( 212 info=card_info, 213 data=data, 214 dependent_vars=["Pulse"], 215 # define splits 216 data_splits=[ 217 DataSplit(label="train", indices=train_idx), 218 DataSplit(label="test", indices=test_idx), 219 ], 220 221 ) 222 223 splits = data_card.split_data() 224 print(splits["train"].X.head()) 225 226 Chins Situps Jumps 227 0 5.0 162.0 60.0 228 1 2.0 110.0 60.0 229 2 12.0 101.0 101.0 230 3 12.0 105.0 37.0 231 4 13.0 155.0 58.0 232 ``` 233 234 Returns 235 Class containing data splits 236 """ 237 if self.data is None: 238 raise ValueError("Data must not be None. Either supply data or load data") 239 240 if len(self.data_splits) > 0: 241 data_holder: Dict[str, Data] = {} 242 for data_split in self.data_splits: 243 label, data = DataSplitter.split( 244 split=data_split, 245 dependent_vars=self.dependent_vars, 246 data=self.data, 247 data_type=self.data_type, 248 ) 249 data_holder[label] = data 250 251 return data_holder 252 raise ValueError("No data splits provided") 253 254 @property 255 def data_suffix(self) -> str: 256 """Returns suffix for storage""" 257 return Suffix.JOBLIB.value 258 259 @staticmethod 260 def name() -> str: 261 raise NotImplementedError
Base data interface for all data types
Arguments:
- data: Data. Can be a pyarrow table, pandas dataframe, polars dataframe or numpy array
- dependent_vars: List of dependent variables. Can be string or index if using numpy
- data_splits: Optional list of
DataSplit
- data_profile: Optional ydata-profiling
ProfileReport
- feature_map: Dictionary of features -> automatically generated
- feature_descriptions: Dictionary or feature descriptions
- sql_logic: Sql logic used to generate data
data_splits: List[opsml.data.splitter.DataSplit]
model_config =
{'arbitrary_types_allowed': True, 'validate_assignment': False, 'validate_default': True}
def
add_sql( self, name: str, query: Optional[str] = None, filename: Optional[str] = None) -> None:
82 def add_sql( 83 self, 84 name: str, 85 query: Optional[str] = None, 86 filename: Optional[str] = None, 87 ) -> None: 88 """ 89 Adds a query or query from file to the sql_logic dictionary. Either a query or 90 a filename pointing to a sql file are required in addition to a name. 91 92 Args: 93 name: 94 Name for sql query 95 query: 96 SQL query 97 filename: Filename of sql query 98 """ 99 if query is not None: 100 self.sql_logic[name] = query 101 102 elif filename is not None: 103 sql_path = str(FileUtils.find_filepath(name=filename)) 104 with open(sql_path, "r", encoding="utf-8") as file_: 105 query = file_.read() 106 self.sql_logic[name] = query 107 108 else: 109 raise ValueError("SQL Query or Filename must be provided")
Adds a query or query from file to the sql_logic dictionary. Either a query or a filename pointing to a sql file are required in addition to a name.
Arguments:
- name: Name for sql query
- query: SQL query
- filename: Filename of sql query
def
save_data(self, path: pathlib.Path) -> None:
120 def save_data(self, path: Path) -> None: 121 """Saves data to path. Base implementation use Joblib 122 123 Args: 124 path: 125 Pathlib object 126 """ 127 assert self.data is not None, "No data detected in interface" 128 joblib.dump(self.data, path) 129 130 self.feature_map = { 131 "features": Feature( 132 feature_type=str(type(self.data)), 133 shape=CommonKwargs.UNDEFINED.value, 134 ) 135 }
Saves data to path. Base implementation use Joblib
Arguments:
- path: Pathlib object
def
load_data(self, path: pathlib.Path) -> None:
137 def load_data(self, path: Path) -> None: 138 """Load data from pathlib object 139 140 Args: 141 path: 142 Pathlib object 143 """ 144 145 self.data = joblib.load(path)
Load data from pathlib object
Arguments:
- path: Pathlib object
def
load_data_profile(self, path: pathlib.Path) -> None:
147 def load_data_profile(self, path: Path) -> None: 148 """Load data profile from pathlib object 149 150 Args: 151 path: 152 Pathlib object 153 """ 154 self.data_profile = ProfileReport().loads( 155 joblib.load(path), 156 )
Load data profile from pathlib object
Arguments:
- path: Pathlib object
def
save_data_profile(self, path: pathlib.Path) -> None:
158 def save_data_profile(self, path: Path) -> None: 159 """Saves data profile to path. Data profiles are saved as joblib 160 joblib 161 162 Args: 163 path: 164 Pathlib object 165 """ 166 assert self.data_profile is not None, "No data profile detected in interface" 167 168 if path.suffix == Suffix.HTML.value: 169 profile_artifact = self.data_profile.to_html() 170 path.write_text(profile_artifact, encoding="utf-8") 171 else: 172 profile_artifact = self.data_profile.dumps() 173 joblib.dump(profile_artifact, path)
Saves data profile to path. Data profiles are saved as joblib joblib
Arguments:
- path: Pathlib object
def
create_data_profile( self, sample_perc: float = 1, name: str = 'data_profile') -> ydata_profiling.profile_report.ProfileReport:
175 def create_data_profile(self, sample_perc: float = 1, name: str = "data_profile") -> ProfileReport: 176 """Creates a data profile report 177 178 Args: 179 sample_perc: 180 Percentage of data to use when creating a profile. Sampling is recommended for large dataframes. 181 Percentage is expressed as a decimal (e.g. 1 = 100%, 0.5 = 50%, etc.) 182 name: 183 Name of data profile 184 185 """ 186 from opsml.profile.profile_data import DataProfiler 187 188 if isinstance(self.data, (pl.DataFrame, pd.DataFrame)): 189 if self.data_profile is None: 190 self.data_profile = DataProfiler.create_profile_report( 191 data=self.data, 192 name=name, 193 sample_perc=min(sample_perc, 1), # max of 1 194 ) 195 return self.data_profile 196 197 logger.info("Data profile already exists") 198 return self.data_profile 199 200 raise ValueError("A pandas dataframe type is required to create a data profile")
Creates a data profile report
Arguments:
- sample_perc: Percentage of data to use when creating a profile. Sampling is recommended for large dataframes. Percentage is expressed as a decimal (e.g. 1 = 100%, 0.5 = 50%, etc.)
- name: Name of data profile
202 def split_data(self) -> Dict[str, Data]: 203 """ 204 Loops through data splits and splits data either by indexing or 205 column values 206 207 Example: 208 209 ```python 210 card_info = CardInfo(name="linnerrud", repository="tutorial", contact="user@email.com") 211 data_card = DataCard( 212 info=card_info, 213 data=data, 214 dependent_vars=["Pulse"], 215 # define splits 216 data_splits=[ 217 DataSplit(label="train", indices=train_idx), 218 DataSplit(label="test", indices=test_idx), 219 ], 220 221 ) 222 223 splits = data_card.split_data() 224 print(splits["train"].X.head()) 225 226 Chins Situps Jumps 227 0 5.0 162.0 60.0 228 1 2.0 110.0 60.0 229 2 12.0 101.0 101.0 230 3 12.0 105.0 37.0 231 4 13.0 155.0 58.0 232 ``` 233 234 Returns 235 Class containing data splits 236 """ 237 if self.data is None: 238 raise ValueError("Data must not be None. Either supply data or load data") 239 240 if len(self.data_splits) > 0: 241 data_holder: Dict[str, Data] = {} 242 for data_split in self.data_splits: 243 label, data = DataSplitter.split( 244 split=data_split, 245 dependent_vars=self.dependent_vars, 246 data=self.data, 247 data_type=self.data_type, 248 ) 249 data_holder[label] = data 250 251 return data_holder 252 raise ValueError("No data splits provided")
Loops through data splits and splits data either by indexing or column values
Example:
card_info = CardInfo(name="linnerrud", repository="tutorial", contact="user@email.com") data_card = DataCard( info=card_info, data=data, dependent_vars=["Pulse"], # define splits data_splits=[ DataSplit(label="train", indices=train_idx), DataSplit(label="test", indices=test_idx), ], ) splits = data_card.split_data() print(splits["train"].X.head()) Chins Situps Jumps 0 5.0 162.0 60.0 1 2.0 110.0 60.0 2 12.0 101.0 101.0 3 12.0 105.0 37.0 4 13.0 155.0 58.0
Returns Class containing data splits
data_suffix: str
254 @property 255 def data_suffix(self) -> str: 256 """Returns suffix for storage""" 257 return Suffix.JOBLIB.value
Returns suffix for storage
model_fields =
{'data': FieldInfo(annotation=Union[Any, NoneType], required=False), 'data_splits': FieldInfo(annotation=List[DataSplit], required=False, default=[]), 'dependent_vars': FieldInfo(annotation=List[Union[int, str]], required=False, default=[]), 'data_profile': FieldInfo(annotation=Union[ProfileReport, NoneType], required=False), 'feature_map': FieldInfo(annotation=Dict[str, Feature], required=False, default={}), 'feature_descriptions': FieldInfo(annotation=Dict[str, str], required=False, default={}), 'sql_logic': FieldInfo(annotation=Dict[str, str], required=False, default={})}
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs