opsml.registry.registry
1# pylint: disable=protected-access 2# Copyright (c) Shipt, Inc. 3# This source code is licensed under the MIT license found in the 4# LICENSE file in the root directory of this source tree. 5import textwrap 6from typing import Any, Dict, List, Optional, Type, Union 7 8from opsml.cards import ArtifactCard, CardInfo 9from opsml.data import DataInterface 10from opsml.helpers.logging import ArtifactLogger 11from opsml.helpers.utils import clean_string 12from opsml.model import ModelInterface 13from opsml.registry.backend import _set_registry 14from opsml.registry.semver import VersionType 15from opsml.storage.card_loader import CardLoader 16from opsml.types import CommonKwargs, RegistryType 17 18logger = ArtifactLogger.get_logger() 19 20 21class CardRegistry: 22 def __init__(self, registry_type: Union[RegistryType, str]): 23 """ 24 Interface for connecting to any of the ArtifactCard registries 25 26 Args: 27 registry_type: 28 Type of card registry to create 29 settings: 30 Storage settings 31 32 Returns: 33 Instantiated connection to specific Card registry 34 35 Example: 36 data_registry = CardRegistry(RegistryType.DATA) 37 data_registry.list_cards() 38 39 or 40 data_registry = CardRegistry("data") 41 data_registry.list_cards() 42 """ 43 44 _registry_type = ( 45 registry_type if isinstance(registry_type, RegistryType) else RegistryType.from_str(registry_type) 46 ) 47 48 self._registry = _set_registry(_registry_type) 49 self.table_name = self._registry.table_name 50 51 @property 52 def registry_type(self) -> RegistryType: 53 "Registry type for card registry" 54 return self._registry.registry_type 55 56 def list_cards( 57 self, 58 uid: Optional[str] = None, 59 name: Optional[str] = None, 60 repository: Optional[str] = None, 61 version: Optional[str] = None, 62 tags: Optional[Dict[str, str]] = None, 63 info: Optional[CardInfo] = None, 64 max_date: Optional[str] = None, 65 limit: Optional[int] = None, 66 ignore_release_candidates: bool = False, 67 ) -> List[Dict[str, Any]]: 68 """Retrieves records from registry 69 70 Args: 71 name: 72 Card name 73 repository: 74 Repository associated with card 75 version: 76 Optional version number of existing data. If not specified, the 77 most recent version will be used 78 tags: 79 Dictionary of key, value tags to search for 80 uid: 81 Unique identifier for Card. If present, the uid takes precedence 82 max_date: 83 Max date to search. (e.g. "2023-05-01" would search for cards up to and including "2023-05-01") 84 limit: 85 Places a limit on result list. Results are sorted by SemVer 86 info: 87 CardInfo object. If present, the info object takes precedence 88 ignore_release_candidates: 89 If True, ignores release candidates 90 91 Returns: 92 pandas dataframe of records or list of dictionaries 93 """ 94 95 if info is not None: 96 name = name or info.name 97 repository = repository or info.repository 98 uid = uid or info.uid 99 version = version or info.version 100 tags = tags or info.tags 101 102 if name is not None: 103 name = name.lower() 104 105 if repository is not None: 106 repository = repository.lower() 107 108 if all(not bool(var) for var in [name, repository, version, uid, tags]): 109 limit = limit or 25 110 111 card_list = self._registry.list_cards( 112 uid=uid, 113 name=name, 114 repository=repository, 115 version=version, 116 max_date=max_date, 117 limit=limit, 118 tags=tags, 119 ignore_release_candidates=ignore_release_candidates, 120 ) 121 122 return card_list 123 124 def load_card( 125 self, 126 name: Optional[str] = None, 127 repository: Optional[str] = None, 128 uid: Optional[str] = None, 129 tags: Optional[Dict[str, str]] = None, 130 version: Optional[str] = None, 131 info: Optional[CardInfo] = None, 132 ignore_release_candidates: bool = False, 133 interface: Optional[Union[Type[ModelInterface], Type[DataInterface]]] = None, 134 ) -> ArtifactCard: 135 """Loads a specific card 136 137 Args: 138 name: 139 Optional Card name 140 uid: 141 Unique identifier for card. If present, the uid takes 142 precedence. 143 tags: 144 Optional tags associated with model. 145 repository: 146 Optional repository associated with card 147 version: 148 Optional version number of existing data. If not specified, the 149 most recent version will be used 150 info: 151 Optional CardInfo object. If present, the info takes precedence 152 ignore_release_candidates: 153 If True, ignores release candidates 154 interface: 155 Optional interface to use for loading card. This is required for when using 156 subclassed interfaces. 157 158 Returns 159 ArtifactCard 160 """ 161 162 # find better way to do this later 163 if info is not None: 164 name = name or info.name 165 uid = uid or info.uid 166 version = version or info.version 167 tags = tags or info.tags 168 169 name = clean_string(name) 170 171 records = self.list_cards( 172 uid=uid, 173 name=name, 174 repository=repository, 175 version=version, 176 tags=tags, 177 ignore_release_candidates=ignore_release_candidates, 178 limit=1, 179 ) 180 181 return CardLoader( 182 card_args=records[0], 183 registry_type=self.registry_type, 184 ).load_card(interface=interface) 185 186 def register_card( 187 self, 188 card: ArtifactCard, 189 version_type: Union[VersionType, str] = VersionType.MINOR, 190 pre_tag: str = "rc", 191 build_tag: str = "build", 192 ) -> None: 193 """ 194 Adds a new `Card` record to registry. Registration will be skipped if the card already exists. 195 196 Args: 197 card: 198 card to register 199 version_type: 200 Version type for increment. Options are "major", "minor" and 201 "patch". Defaults to "minor". 202 pre_tag: 203 pre-release tag to add to card version 204 build_tag: 205 build tag to add to card version 206 """ 207 208 _version_type = version_type if isinstance(version_type, VersionType) else VersionType.from_str(version_type) 209 210 if card.uid is not None and card.version != CommonKwargs.BASE_VERSION.value: 211 logger.info( 212 textwrap.dedent( 213 f""" 214 Card {card.uid} already exists. Skipping registration. If you'd like to register 215 a new card, please instantiate a new Card object. If you'd like to update the 216 existing card, please use the update_card method. 217 """ 218 ) 219 ) 220 221 else: 222 self._registry.register_card( 223 card=card, 224 version_type=_version_type, 225 pre_tag=pre_tag, 226 build_tag=build_tag, 227 ) 228 229 def update_card(self, card: ArtifactCard) -> None: 230 """ 231 Update an artifact card based on current registry 232 233 Args: 234 card: 235 Card to register 236 """ 237 return self._registry.update_card(card=card) 238 239 def query_value_from_card(self, uid: str, columns: List[str]) -> Dict[str, Any]: 240 """ 241 Query column values from a specific Card 242 243 Args: 244 uid: 245 Uid of Card 246 columns: 247 List of columns to query 248 249 Returns: 250 Dictionary of column, values pairs 251 """ 252 results = self._registry.list_cards(uid=uid)[0] 253 return {col: results[col] for col in columns} 254 255 def delete_card(self, card: ArtifactCard) -> None: 256 """ 257 Delete a specific Card 258 259 Args: 260 card: 261 Card to delete 262 """ 263 return self._registry.delete_card(card) 264 265 266class CardRegistries: 267 def __init__(self) -> None: 268 """Instantiates class that contains all registries""" 269 270 self.data = CardRegistry(registry_type=RegistryType.DATA) 271 self.model = CardRegistry(registry_type=RegistryType.MODEL) 272 self.run = CardRegistry(registry_type=RegistryType.RUN) 273 self.pipeline = CardRegistry(registry_type=RegistryType.PIPELINE) 274 self.project = CardRegistry(registry_type=RegistryType.PROJECT) 275 self.audit = CardRegistry(registry_type=RegistryType.AUDIT)
logger =
<builtins.Logger object>
class
CardRegistry:
22class CardRegistry: 23 def __init__(self, registry_type: Union[RegistryType, str]): 24 """ 25 Interface for connecting to any of the ArtifactCard registries 26 27 Args: 28 registry_type: 29 Type of card registry to create 30 settings: 31 Storage settings 32 33 Returns: 34 Instantiated connection to specific Card registry 35 36 Example: 37 data_registry = CardRegistry(RegistryType.DATA) 38 data_registry.list_cards() 39 40 or 41 data_registry = CardRegistry("data") 42 data_registry.list_cards() 43 """ 44 45 _registry_type = ( 46 registry_type if isinstance(registry_type, RegistryType) else RegistryType.from_str(registry_type) 47 ) 48 49 self._registry = _set_registry(_registry_type) 50 self.table_name = self._registry.table_name 51 52 @property 53 def registry_type(self) -> RegistryType: 54 "Registry type for card registry" 55 return self._registry.registry_type 56 57 def list_cards( 58 self, 59 uid: Optional[str] = None, 60 name: Optional[str] = None, 61 repository: Optional[str] = None, 62 version: Optional[str] = None, 63 tags: Optional[Dict[str, str]] = None, 64 info: Optional[CardInfo] = None, 65 max_date: Optional[str] = None, 66 limit: Optional[int] = None, 67 ignore_release_candidates: bool = False, 68 ) -> List[Dict[str, Any]]: 69 """Retrieves records from registry 70 71 Args: 72 name: 73 Card name 74 repository: 75 Repository associated with card 76 version: 77 Optional version number of existing data. If not specified, the 78 most recent version will be used 79 tags: 80 Dictionary of key, value tags to search for 81 uid: 82 Unique identifier for Card. If present, the uid takes precedence 83 max_date: 84 Max date to search. (e.g. "2023-05-01" would search for cards up to and including "2023-05-01") 85 limit: 86 Places a limit on result list. Results are sorted by SemVer 87 info: 88 CardInfo object. If present, the info object takes precedence 89 ignore_release_candidates: 90 If True, ignores release candidates 91 92 Returns: 93 pandas dataframe of records or list of dictionaries 94 """ 95 96 if info is not None: 97 name = name or info.name 98 repository = repository or info.repository 99 uid = uid or info.uid 100 version = version or info.version 101 tags = tags or info.tags 102 103 if name is not None: 104 name = name.lower() 105 106 if repository is not None: 107 repository = repository.lower() 108 109 if all(not bool(var) for var in [name, repository, version, uid, tags]): 110 limit = limit or 25 111 112 card_list = self._registry.list_cards( 113 uid=uid, 114 name=name, 115 repository=repository, 116 version=version, 117 max_date=max_date, 118 limit=limit, 119 tags=tags, 120 ignore_release_candidates=ignore_release_candidates, 121 ) 122 123 return card_list 124 125 def load_card( 126 self, 127 name: Optional[str] = None, 128 repository: Optional[str] = None, 129 uid: Optional[str] = None, 130 tags: Optional[Dict[str, str]] = None, 131 version: Optional[str] = None, 132 info: Optional[CardInfo] = None, 133 ignore_release_candidates: bool = False, 134 interface: Optional[Union[Type[ModelInterface], Type[DataInterface]]] = None, 135 ) -> ArtifactCard: 136 """Loads a specific card 137 138 Args: 139 name: 140 Optional Card name 141 uid: 142 Unique identifier for card. If present, the uid takes 143 precedence. 144 tags: 145 Optional tags associated with model. 146 repository: 147 Optional repository associated with card 148 version: 149 Optional version number of existing data. If not specified, the 150 most recent version will be used 151 info: 152 Optional CardInfo object. If present, the info takes precedence 153 ignore_release_candidates: 154 If True, ignores release candidates 155 interface: 156 Optional interface to use for loading card. This is required for when using 157 subclassed interfaces. 158 159 Returns 160 ArtifactCard 161 """ 162 163 # find better way to do this later 164 if info is not None: 165 name = name or info.name 166 uid = uid or info.uid 167 version = version or info.version 168 tags = tags or info.tags 169 170 name = clean_string(name) 171 172 records = self.list_cards( 173 uid=uid, 174 name=name, 175 repository=repository, 176 version=version, 177 tags=tags, 178 ignore_release_candidates=ignore_release_candidates, 179 limit=1, 180 ) 181 182 return CardLoader( 183 card_args=records[0], 184 registry_type=self.registry_type, 185 ).load_card(interface=interface) 186 187 def register_card( 188 self, 189 card: ArtifactCard, 190 version_type: Union[VersionType, str] = VersionType.MINOR, 191 pre_tag: str = "rc", 192 build_tag: str = "build", 193 ) -> None: 194 """ 195 Adds a new `Card` record to registry. Registration will be skipped if the card already exists. 196 197 Args: 198 card: 199 card to register 200 version_type: 201 Version type for increment. Options are "major", "minor" and 202 "patch". Defaults to "minor". 203 pre_tag: 204 pre-release tag to add to card version 205 build_tag: 206 build tag to add to card version 207 """ 208 209 _version_type = version_type if isinstance(version_type, VersionType) else VersionType.from_str(version_type) 210 211 if card.uid is not None and card.version != CommonKwargs.BASE_VERSION.value: 212 logger.info( 213 textwrap.dedent( 214 f""" 215 Card {card.uid} already exists. Skipping registration. If you'd like to register 216 a new card, please instantiate a new Card object. If you'd like to update the 217 existing card, please use the update_card method. 218 """ 219 ) 220 ) 221 222 else: 223 self._registry.register_card( 224 card=card, 225 version_type=_version_type, 226 pre_tag=pre_tag, 227 build_tag=build_tag, 228 ) 229 230 def update_card(self, card: ArtifactCard) -> None: 231 """ 232 Update an artifact card based on current registry 233 234 Args: 235 card: 236 Card to register 237 """ 238 return self._registry.update_card(card=card) 239 240 def query_value_from_card(self, uid: str, columns: List[str]) -> Dict[str, Any]: 241 """ 242 Query column values from a specific Card 243 244 Args: 245 uid: 246 Uid of Card 247 columns: 248 List of columns to query 249 250 Returns: 251 Dictionary of column, values pairs 252 """ 253 results = self._registry.list_cards(uid=uid)[0] 254 return {col: results[col] for col in columns} 255 256 def delete_card(self, card: ArtifactCard) -> None: 257 """ 258 Delete a specific Card 259 260 Args: 261 card: 262 Card to delete 263 """ 264 return self._registry.delete_card(card)
CardRegistry(registry_type: Union[opsml.types.card.RegistryType, str])
23 def __init__(self, registry_type: Union[RegistryType, str]): 24 """ 25 Interface for connecting to any of the ArtifactCard registries 26 27 Args: 28 registry_type: 29 Type of card registry to create 30 settings: 31 Storage settings 32 33 Returns: 34 Instantiated connection to specific Card registry 35 36 Example: 37 data_registry = CardRegistry(RegistryType.DATA) 38 data_registry.list_cards() 39 40 or 41 data_registry = CardRegistry("data") 42 data_registry.list_cards() 43 """ 44 45 _registry_type = ( 46 registry_type if isinstance(registry_type, RegistryType) else RegistryType.from_str(registry_type) 47 ) 48 49 self._registry = _set_registry(_registry_type) 50 self.table_name = self._registry.table_name
Interface for connecting to any of the ArtifactCard registries
Arguments:
- registry_type: Type of card registry to create
- settings: Storage settings
Returns:
Instantiated connection to specific Card registry
Example:
data_registry = CardRegistry(RegistryType.DATA) data_registry.list_cards()
or data_registry = CardRegistry("data") data_registry.list_cards()
registry_type: opsml.types.card.RegistryType
52 @property 53 def registry_type(self) -> RegistryType: 54 "Registry type for card registry" 55 return self._registry.registry_type
Registry type for card registry
def
list_cards( self, uid: Optional[str] = None, name: Optional[str] = None, repository: Optional[str] = None, version: Optional[str] = None, tags: Optional[Dict[str, str]] = None, info: Optional[opsml.types.card.CardInfo] = None, max_date: Optional[str] = None, limit: Optional[int] = None, ignore_release_candidates: bool = False) -> List[Dict[str, Any]]:
57 def list_cards( 58 self, 59 uid: Optional[str] = None, 60 name: Optional[str] = None, 61 repository: Optional[str] = None, 62 version: Optional[str] = None, 63 tags: Optional[Dict[str, str]] = None, 64 info: Optional[CardInfo] = None, 65 max_date: Optional[str] = None, 66 limit: Optional[int] = None, 67 ignore_release_candidates: bool = False, 68 ) -> List[Dict[str, Any]]: 69 """Retrieves records from registry 70 71 Args: 72 name: 73 Card name 74 repository: 75 Repository associated with card 76 version: 77 Optional version number of existing data. If not specified, the 78 most recent version will be used 79 tags: 80 Dictionary of key, value tags to search for 81 uid: 82 Unique identifier for Card. If present, the uid takes precedence 83 max_date: 84 Max date to search. (e.g. "2023-05-01" would search for cards up to and including "2023-05-01") 85 limit: 86 Places a limit on result list. Results are sorted by SemVer 87 info: 88 CardInfo object. If present, the info object takes precedence 89 ignore_release_candidates: 90 If True, ignores release candidates 91 92 Returns: 93 pandas dataframe of records or list of dictionaries 94 """ 95 96 if info is not None: 97 name = name or info.name 98 repository = repository or info.repository 99 uid = uid or info.uid 100 version = version or info.version 101 tags = tags or info.tags 102 103 if name is not None: 104 name = name.lower() 105 106 if repository is not None: 107 repository = repository.lower() 108 109 if all(not bool(var) for var in [name, repository, version, uid, tags]): 110 limit = limit or 25 111 112 card_list = self._registry.list_cards( 113 uid=uid, 114 name=name, 115 repository=repository, 116 version=version, 117 max_date=max_date, 118 limit=limit, 119 tags=tags, 120 ignore_release_candidates=ignore_release_candidates, 121 ) 122 123 return card_list
Retrieves records from registry
Arguments:
- name: Card name
- repository: Repository associated with card
- version: Optional version number of existing data. If not specified, the most recent version will be used
- tags: Dictionary of key, value tags to search for
- uid: Unique identifier for Card. If present, the uid takes precedence
- max_date: Max date to search. (e.g. "2023-05-01" would search for cards up to and including "2023-05-01")
- limit: Places a limit on result list. Results are sorted by SemVer
- info: CardInfo object. If present, the info object takes precedence
- ignore_release_candidates: If True, ignores release candidates
Returns:
pandas dataframe of records or list of dictionaries
def
load_card( self, name: Optional[str] = None, repository: Optional[str] = None, uid: Optional[str] = None, tags: Optional[Dict[str, str]] = None, version: Optional[str] = None, info: Optional[opsml.types.card.CardInfo] = None, ignore_release_candidates: bool = False, interface: Union[Type[opsml.data.interfaces._base.DataInterface], Type[opsml.model.interfaces.base.ModelInterface], NoneType] = None) -> opsml.cards.base.ArtifactCard:
125 def load_card( 126 self, 127 name: Optional[str] = None, 128 repository: Optional[str] = None, 129 uid: Optional[str] = None, 130 tags: Optional[Dict[str, str]] = None, 131 version: Optional[str] = None, 132 info: Optional[CardInfo] = None, 133 ignore_release_candidates: bool = False, 134 interface: Optional[Union[Type[ModelInterface], Type[DataInterface]]] = None, 135 ) -> ArtifactCard: 136 """Loads a specific card 137 138 Args: 139 name: 140 Optional Card name 141 uid: 142 Unique identifier for card. If present, the uid takes 143 precedence. 144 tags: 145 Optional tags associated with model. 146 repository: 147 Optional repository associated with card 148 version: 149 Optional version number of existing data. If not specified, the 150 most recent version will be used 151 info: 152 Optional CardInfo object. If present, the info takes precedence 153 ignore_release_candidates: 154 If True, ignores release candidates 155 interface: 156 Optional interface to use for loading card. This is required for when using 157 subclassed interfaces. 158 159 Returns 160 ArtifactCard 161 """ 162 163 # find better way to do this later 164 if info is not None: 165 name = name or info.name 166 uid = uid or info.uid 167 version = version or info.version 168 tags = tags or info.tags 169 170 name = clean_string(name) 171 172 records = self.list_cards( 173 uid=uid, 174 name=name, 175 repository=repository, 176 version=version, 177 tags=tags, 178 ignore_release_candidates=ignore_release_candidates, 179 limit=1, 180 ) 181 182 return CardLoader( 183 card_args=records[0], 184 registry_type=self.registry_type, 185 ).load_card(interface=interface)
Loads a specific card
Arguments:
- name: Optional Card name
- uid: Unique identifier for card. If present, the uid takes precedence.
- tags: Optional tags associated with model.
- repository: Optional repository associated with card
- version: Optional version number of existing data. If not specified, the most recent version will be used
- info: Optional CardInfo object. If present, the info takes precedence
- ignore_release_candidates: If True, ignores release candidates
- interface: Optional interface to use for loading card. This is required for when using subclassed interfaces.
Returns ArtifactCard
def
register_card( self, card: opsml.cards.base.ArtifactCard, version_type: Union[opsml.registry.semver.VersionType, str] = <VersionType.MINOR: 'minor'>, pre_tag: str = 'rc', build_tag: str = 'build') -> None:
187 def register_card( 188 self, 189 card: ArtifactCard, 190 version_type: Union[VersionType, str] = VersionType.MINOR, 191 pre_tag: str = "rc", 192 build_tag: str = "build", 193 ) -> None: 194 """ 195 Adds a new `Card` record to registry. Registration will be skipped if the card already exists. 196 197 Args: 198 card: 199 card to register 200 version_type: 201 Version type for increment. Options are "major", "minor" and 202 "patch". Defaults to "minor". 203 pre_tag: 204 pre-release tag to add to card version 205 build_tag: 206 build tag to add to card version 207 """ 208 209 _version_type = version_type if isinstance(version_type, VersionType) else VersionType.from_str(version_type) 210 211 if card.uid is not None and card.version != CommonKwargs.BASE_VERSION.value: 212 logger.info( 213 textwrap.dedent( 214 f""" 215 Card {card.uid} already exists. Skipping registration. If you'd like to register 216 a new card, please instantiate a new Card object. If you'd like to update the 217 existing card, please use the update_card method. 218 """ 219 ) 220 ) 221 222 else: 223 self._registry.register_card( 224 card=card, 225 version_type=_version_type, 226 pre_tag=pre_tag, 227 build_tag=build_tag, 228 )
Adds a new Card
record to registry. Registration will be skipped if the card already exists.
Arguments:
- card: card to register
- version_type: Version type for increment. Options are "major", "minor" and "patch". Defaults to "minor".
- pre_tag: pre-release tag to add to card version
- build_tag: build tag to add to card version
230 def update_card(self, card: ArtifactCard) -> None: 231 """ 232 Update an artifact card based on current registry 233 234 Args: 235 card: 236 Card to register 237 """ 238 return self._registry.update_card(card=card)
Update an artifact card based on current registry
Arguments:
- card: Card to register
def
query_value_from_card(self, uid: str, columns: List[str]) -> Dict[str, Any]:
240 def query_value_from_card(self, uid: str, columns: List[str]) -> Dict[str, Any]: 241 """ 242 Query column values from a specific Card 243 244 Args: 245 uid: 246 Uid of Card 247 columns: 248 List of columns to query 249 250 Returns: 251 Dictionary of column, values pairs 252 """ 253 results = self._registry.list_cards(uid=uid)[0] 254 return {col: results[col] for col in columns}
Query column values from a specific Card
Arguments:
- uid: Uid of Card
- columns: List of columns to query
Returns:
Dictionary of column, values pairs
class
CardRegistries:
267class CardRegistries: 268 def __init__(self) -> None: 269 """Instantiates class that contains all registries""" 270 271 self.data = CardRegistry(registry_type=RegistryType.DATA) 272 self.model = CardRegistry(registry_type=RegistryType.MODEL) 273 self.run = CardRegistry(registry_type=RegistryType.RUN) 274 self.pipeline = CardRegistry(registry_type=RegistryType.PIPELINE) 275 self.project = CardRegistry(registry_type=RegistryType.PROJECT) 276 self.audit = CardRegistry(registry_type=RegistryType.AUDIT)
CardRegistries()
268 def __init__(self) -> None: 269 """Instantiates class that contains all registries""" 270 271 self.data = CardRegistry(registry_type=RegistryType.DATA) 272 self.model = CardRegistry(registry_type=RegistryType.MODEL) 273 self.run = CardRegistry(registry_type=RegistryType.RUN) 274 self.pipeline = CardRegistry(registry_type=RegistryType.PIPELINE) 275 self.project = CardRegistry(registry_type=RegistryType.PROJECT) 276 self.audit = CardRegistry(registry_type=RegistryType.AUDIT)
Instantiates class that contains all registries