Spaces:
Runtime error
Runtime error
| import os | |
| import shutil | |
| import tempfile | |
| from typing import Any, Dict, List, Optional, Union | |
| from cassandra.auth import PlainTextAuthProvider | |
| from cassandra.cluster import Cluster, Session | |
| from cassio.config.bundle_download import download_astra_bundle_url | |
| from cassio.config.bundle_management import ( | |
| infer_keyspace_from_bundle, | |
| init_string_to_bundle_path_and_options, | |
| ) | |
| ASTRA_CLOUD_AUTH_USERNAME = "token" | |
| DOWNLOADED_BUNDLE_FILE_NAME = "secure-connect-bundle_devopsapi.zip" | |
| default_session: Optional[Session] = None | |
| default_keyspace: Optional[str] = None | |
| def init( | |
| auto: bool = False, | |
| session: Optional[Session] = None, | |
| secure_connect_bundle: Optional[str] = None, | |
| init_string: Optional[str] = None, | |
| token: Optional[str] = None, | |
| database_id: Optional[str] = None, | |
| keyspace: Optional[str] = None, | |
| contact_points: Optional[Union[str, List[str]]] = None, | |
| username: Optional[str] = None, | |
| password: Optional[str] = None, | |
| cluster_kwargs: Optional[Dict[str, Any]] = None, | |
| tempfile_basedir: Optional[str] = None, | |
| bundle_url_template: Optional[str] = None, | |
| cloud_kwargs: Optional[Dict[str, Any]] = None, | |
| ) -> None: | |
| """ | |
| Globally set the default Cassandra connection (/keyspace) for CassIO. | |
| This default will be used by all other db-requiring CassIO instantiations, | |
| unless passed to the respective classes' __init__. | |
| There are various ways to achieve this, depending on the passed parameters. | |
| Broadly speaking, this method allows to pass one's own ready Session, | |
| or to have it created in the method. For this second case, both Astra DB | |
| and a regular Cassandra cluster can be targeted. | |
| One can also simply call cassio.init(auto=True) and let it figure out | |
| what to do based on standard environment variables. | |
| CASSANDRA | |
| If one passes `contact_points`, it is assumed that this is Cassandra. | |
| In that case, only the following arguments will be used: | |
| `contact_points`, `keyspace`, `username`, `password`, `cluster_kwargs` | |
| Note that when passing a `session` all other parameters are ignored. | |
| ASTRA DB: | |
| If `contact_points` is not passed, one of several methods to connect to | |
| Astra should be supplied for the connection to Astra. These overlap: | |
| see below for their precedence. | |
| Note that when passing a `session` all other parameters are ignored. | |
| AUTO: | |
| If passing auto=True, no other parameter is allowed except for | |
| `tempfile_basedir`. The rest, including choosing Astra DB / Cassandra, | |
| is autofilled by inspecting the following environment variables: | |
| CASSANDRA_CONTACT_POINTS | |
| CASSANDRA_USERNAME | |
| CASSANDRA_PASSWORD | |
| CASSANDRA_KEYSPACE | |
| ASTRA_DB_APPLICATION_TOKEN | |
| ASTRA_DB_INIT_STRING | |
| ASTRA_DB_SECURE_BUNDLE_PATH | |
| ASTRA_DB_KEYSPACE | |
| ASTRA_DB_DATABASE_ID | |
| PARAMETERS: | |
| `auto`: (bool = False), whether to auto-guess all connection params. | |
| `session` (optional cassandra.cluster.Session), an established connection. | |
| `secure_connect_bundle` (optional str), full path to a Secure Bundle. | |
| `init_string` (optional str), a stand-alone "db init string" credential | |
| (which can optionally contain keyspace and/or token). | |
| `token` (optional str), the Astra DB "AstraCS:..." token. | |
| `database_id` (optional str), the Astra DB ID. Used only for Astra | |
| connections with no `secure_connect_bundle` parameter passed. | |
| `keyspace` (optional str), the keyspace to work. | |
| `contact_points` (optional List[str]), for Cassandra connection | |
| `username` (optional str), username for Cassandra connection | |
| `password` (optional str), password for Cassandra connection | |
| `cluster_kwargs` (optional dict), additional arguments to `Cluster(...)`. | |
| `tempfile_basedir` (optional str), where to create temporary work directories. | |
| `bundle_url_template` (optional str), url template for getting the database | |
| secure bundle. The "databaseId" variable is resolved with the actual value. | |
| Default (for Astra DB): | |
| "https://api.astra.datastax.com/v2/databases/{database_id}/secureBundleURL" | |
| `cloud_kwargs` (optional dict), additional arguments to `Cluster(cloud={...})` | |
| (i.e. additional key-value pairs within the passed `cloud` dict). | |
| ASTRA DB: | |
| The Astra-related parameters are arranged in a chain of fallbacks. | |
| In case redundant information is supplied, these are the precedences: | |
| session > secure_connect_bundle > init_string | |
| token > (from init_string if any) | |
| keyspace > (from init_string if any) > (from bundle if any) | |
| If a secure-connect-bundle is needed and not passed, it will be downloaded: | |
| this requires `database_id` to be passed, suitable token permissions. | |
| Constraints and caveats: | |
| `secure_connect_bundle` requires `token`. | |
| `session` does not make use of `cluster_kwargs` and will ignore it. | |
| The Session is created at `init` time and kept around, available to any | |
| subsequent table creation. If calling `init` a second time, a new Session | |
| will be made available replacing the previous one. | |
| """ | |
| global default_session | |
| global default_keyspace | |
| temp_dir_created: bool = False | |
| temp_dir: Optional[str] = None | |
| direct_session: Optional[Session] = None | |
| bundle_from_is: Optional[str] = None | |
| bundle_from_arg: Optional[str] = None | |
| bundle_from_download: Optional[str] = None | |
| keyspace_from_is: Optional[str] = None | |
| keyspace_from_bundle: Optional[str] = None | |
| keyspace_from_arg: Optional[str] = None | |
| token_from_is: Optional[str] = None | |
| token_from_arg: Optional[str] = None | |
| # | |
| if auto: | |
| if any( | |
| v is not None | |
| for v in ( | |
| session, | |
| secure_connect_bundle, | |
| init_string, | |
| token, | |
| database_id, | |
| keyspace, | |
| contact_points, | |
| username, | |
| password, | |
| # cluster_kwargs is allowed | |
| # tempfile_basedir is allowed | |
| bundle_url_template, | |
| cloud_kwargs, | |
| ) | |
| ): | |
| raise ValueError( | |
| "When auto=True, no arguments can be passed other than " | |
| "tempfile_basedir and cluster_kwargs." | |
| ) | |
| # setting some arguments from environment variables | |
| if "CASSANDRA_CONTACT_POINTS" in os.environ: | |
| contact_points = os.environ["CASSANDRA_CONTACT_POINTS"] | |
| username = os.environ.get("CASSANDRA_USERNAME") | |
| password = os.environ.get("CASSANDRA_PASSWORD") | |
| keyspace = os.environ.get("CASSANDRA_KEYSPACE") | |
| elif any( | |
| avar in os.environ | |
| for avar in [ | |
| "ASTRA_DB_APPLICATION_TOKEN", | |
| "ASTRA_DB_INIT_STRING", | |
| ] | |
| ): | |
| token = os.environ.get("ASTRA_DB_APPLICATION_TOKEN") | |
| init_string = os.environ.get("ASTRA_DB_INIT_STRING") | |
| secure_connect_bundle = os.environ.get("ASTRA_DB_SECURE_BUNDLE_PATH") | |
| keyspace = os.environ.get("ASTRA_DB_KEYSPACE") | |
| database_id = os.environ.get("ASTRA_DB_DATABASE_ID") | |
| # | |
| try: | |
| # process init_string | |
| base_dir = tempfile_basedir if tempfile_basedir else tempfile.gettempdir() | |
| if init_string: | |
| temp_dir = tempfile.mkdtemp(dir=base_dir) | |
| temp_dir_created = True | |
| bundle_from_is, options_from_is = init_string_to_bundle_path_and_options( | |
| init_string, | |
| target_dir=temp_dir, | |
| ) | |
| keyspace_from_is = options_from_is.get("keyspace") | |
| token_from_is = options_from_is.get("token") | |
| # for the session | |
| if session: | |
| direct_session = session | |
| if secure_connect_bundle: | |
| if not token: | |
| raise ValueError( | |
| "`token` is required if `secure_connect_bundle` is passed" | |
| ) | |
| # params from arguments: | |
| bundle_from_arg = secure_connect_bundle | |
| token_from_arg = token | |
| keyspace_from_arg = keyspace | |
| can_be_astra = any( | |
| [ | |
| secure_connect_bundle is not None, | |
| init_string is not None, | |
| token is not None, | |
| ] | |
| ) | |
| # resolution of priority among args | |
| if direct_session: | |
| default_session = direct_session | |
| else: | |
| # first determine if Cassandra or Astra | |
| is_cassandra = all( | |
| [ | |
| secure_connect_bundle is None, | |
| init_string is None, | |
| token is None, | |
| contact_points is not None, | |
| ] | |
| ) | |
| if is_cassandra: | |
| is_astra_db = False | |
| else: | |
| # determine if Astra DB | |
| is_astra_db = can_be_astra | |
| # | |
| if is_cassandra: | |
| # need to take care of: | |
| # contact_points, username, password, cluster_kwargs | |
| chosen_contact_points: Union[List[str], None] | |
| if contact_points: | |
| if isinstance(contact_points, str): | |
| chosen_contact_points = [ | |
| cp.strip() for cp in contact_points.split(",") if cp.strip() | |
| ] | |
| else: | |
| # assume it's a list already | |
| chosen_contact_points = contact_points | |
| else: | |
| # normalize "" to None for later `Cluster(...)` call | |
| chosen_contact_points = None | |
| # | |
| if username is not None and password is not None: | |
| chosen_auth_provider = PlainTextAuthProvider( | |
| username, | |
| password, | |
| ) | |
| else: | |
| if username is not None or password is not None: | |
| raise ValueError( | |
| "Please provide both usename/password or none." | |
| ) | |
| else: | |
| chosen_auth_provider = None | |
| # | |
| if chosen_contact_points is None: | |
| cluster = Cluster( | |
| auth_provider=chosen_auth_provider, | |
| **(cluster_kwargs if cluster_kwargs is not None else {}), | |
| ) | |
| else: | |
| cluster = Cluster( | |
| contact_points=chosen_contact_points, | |
| auth_provider=chosen_auth_provider, | |
| **(cluster_kwargs if cluster_kwargs is not None else {}), | |
| ) | |
| default_session = cluster.connect() | |
| elif is_astra_db: | |
| # Astra DB | |
| chosen_token = _first_valid(token_from_arg, token_from_is) | |
| if chosen_token is None: | |
| raise ValueError( | |
| "A token must be supplied if connection is to be established." | |
| ) | |
| chosen_bundle_pre_token = _first_valid(bundle_from_arg, bundle_from_is) | |
| # Try to get the bundle from the token if not supplied otherwise | |
| if chosen_bundle_pre_token is None: | |
| if database_id is None: | |
| raise ValueError( | |
| "A database_id must be supplied if no " | |
| "secure_connect_bundle is provided." | |
| ) | |
| if not temp_dir_created: | |
| temp_dir = tempfile.mkdtemp(dir=base_dir) | |
| temp_dir_created = True | |
| bundle_from_download = os.path.join( | |
| temp_dir or "", DOWNLOADED_BUNDLE_FILE_NAME | |
| ) | |
| download_astra_bundle_url( | |
| database_id, | |
| chosen_token, | |
| bundle_from_download, | |
| bundle_url_template, | |
| ) | |
| # After the devops-api part, re-evaluate chosen_bundle: | |
| chosen_bundle = _first_valid( | |
| bundle_from_download, chosen_bundle_pre_token | |
| ) | |
| # | |
| if chosen_bundle: | |
| keyspace_from_bundle = infer_keyspace_from_bundle(chosen_bundle) | |
| cluster = Cluster( | |
| cloud={ | |
| "secure_connect_bundle": chosen_bundle, | |
| **(cloud_kwargs if cloud_kwargs is not None else {}), | |
| }, | |
| auth_provider=PlainTextAuthProvider( | |
| ASTRA_CLOUD_AUTH_USERNAME, | |
| chosen_token, | |
| ), | |
| **(cluster_kwargs if cluster_kwargs is not None else {}), | |
| ) | |
| default_session = cluster.connect() | |
| else: | |
| raise ValueError("No secure-connect-bundle was available.") | |
| # keyspace to be resolved in any case | |
| chosen_keyspace = _first_valid( | |
| keyspace_from_arg, keyspace_from_is, keyspace_from_bundle | |
| ) | |
| default_keyspace = chosen_keyspace | |
| finally: | |
| if temp_dir_created and temp_dir is not None: | |
| shutil.rmtree(temp_dir) | |
| def resolve_session(arg_session: Optional[Session] = None) -> Optional[Session]: | |
| """Utility to fall back to the global defaults when null args are passed.""" | |
| if arg_session is not None: | |
| return arg_session | |
| else: | |
| return default_session | |
| def check_resolve_session(arg_session: Optional[Session] = None) -> Session: | |
| s = resolve_session(arg_session) | |
| if s is None: | |
| raise ValueError("DB session not set.") | |
| else: | |
| return s | |
| def resolve_keyspace(arg_keyspace: Optional[str] = None) -> Optional[str]: | |
| """Utility to fall back to the global defaults when null args are passed.""" | |
| if arg_keyspace is not None: | |
| return arg_keyspace | |
| else: | |
| return default_keyspace | |
| def check_resolve_keyspace(arg_keyspace: Optional[str] = None) -> str: | |
| s = resolve_keyspace(arg_keyspace) | |
| if s is None: | |
| raise ValueError("DB keyspace not set.") | |
| else: | |
| return s | |
| def _first_valid(*pargs: Any) -> Union[Any, None]: | |
| for entry in pargs: | |
| if entry is not None: | |
| return entry | |
| return None | |