Spaces:
Build error
Build error
| from typing import Dict, List, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| import streamlit as st | |
| from pandas.core.frame import DataFrame | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.preprocessing import LabelEncoder | |
| from sklearn.utils import resample | |
| from .configs import InputTransformConfigs, ModelConfigs | |
| def input_transform( | |
| text: pd.Series, labels: pd.Series, configs=InputTransformConfigs | |
| ) -> Dict[str, np.ndarray]: | |
| """ | |
| Encodes text in mathematical object ameanable to training algorithm | |
| """ | |
| tfidf_vectorizer = TfidfVectorizer( | |
| input="content", # default: file already in memory | |
| encoding="utf-8", # default | |
| decode_error="strict", # default | |
| strip_accents=None, # do nothing | |
| lowercase=False, # do nothing | |
| preprocessor=None, # do nothing - default | |
| tokenizer=None, # default | |
| stop_words=None, # do nothing | |
| analyzer="word", | |
| ngram_range=configs.NGRAM_RANGE.value, # maximum 3-ngrams | |
| min_df=configs.MIN_DF.value, | |
| max_df=configs.MAX_DF.value, | |
| sublinear_tf=configs.SUBLINEAR.value, | |
| ) | |
| label_encoder = LabelEncoder() | |
| X = tfidf_vectorizer.fit_transform(text.values) | |
| y = label_encoder.fit_transform(labels.values) | |
| return { | |
| "X": X, | |
| "y": y, | |
| "X_names": np.array(tfidf_vectorizer.get_feature_names_out()), | |
| "y_names": label_encoder.classes_, | |
| } | |
| def wordifier( | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| X_names: List[str], | |
| y_names: List[str], | |
| configs=ModelConfigs, | |
| ) -> List[Tuple[str, float, str]]: | |
| n_instances, n_features = X.shape | |
| n_classes = len(y_names) | |
| # NOTE: the * 10 / 10 trick is to have "nice" round-ups | |
| sample_fraction = np.ceil((n_features / n_instances) * 10) / 10 | |
| sample_size = min( | |
| # this is the maximum supported | |
| configs.MAX_SELECTION.value, | |
| # at minimum you want MIN_SELECTION but in general you want | |
| # n_instances * sample_fraction | |
| max(configs.MIN_SELECTION.value, int(n_instances * sample_fraction)), | |
| # however if previous one is bigger the the available instances take | |
| # the number of available instances | |
| n_instances, | |
| ) | |
| # TODO: might want to try out something to subsample features at each iteration | |
| # initialize coefficient matrices | |
| pos_scores = np.zeros((n_classes, n_features), dtype=int) | |
| neg_scores = np.zeros((n_classes, n_features), dtype=int) | |
| pbar = st.progress(0) | |
| for i, _ in enumerate(range(configs.NUM_ITERS.value)): | |
| # run randomized regression | |
| clf = LogisticRegression( | |
| penalty="l1", | |
| C=configs.PENALTIES.value[np.random.randint(len(configs.PENALTIES.value))], | |
| solver="liblinear", | |
| multi_class="auto", | |
| max_iter=500, | |
| class_weight="balanced", | |
| random_state=42, | |
| ) | |
| # sample indices to subsample matrix | |
| selection = resample( | |
| np.arange(n_instances), replace=True, stratify=y, n_samples=sample_size | |
| ) | |
| # fit | |
| try: | |
| clf.fit(X[selection], y[selection]) | |
| except ValueError: | |
| continue | |
| # record coefficients | |
| if n_classes == 2: | |
| pos_scores[1] = pos_scores[1] + (clf.coef_ > 0.0) | |
| neg_scores[1] = neg_scores[1] + (clf.coef_ < 0.0) | |
| pos_scores[0] = pos_scores[0] + (clf.coef_ < 0.0) | |
| neg_scores[0] = neg_scores[0] + (clf.coef_ > 0.0) | |
| else: | |
| pos_scores += clf.coef_ > 0 | |
| neg_scores += clf.coef_ < 0 | |
| pbar.progress(round(i / configs.NUM_ITERS.value, 1)) | |
| # normalize | |
| pos_scores = pos_scores / configs.NUM_ITERS.value | |
| neg_scores = neg_scores / configs.NUM_ITERS.value | |
| # get only active features | |
| pos_positions = np.where( | |
| pos_scores >= configs.SELECTION_THRESHOLD.value, pos_scores, 0 | |
| ) | |
| neg_positions = np.where( | |
| neg_scores >= configs.SELECTION_THRESHOLD.value, neg_scores, 0 | |
| ) | |
| # prepare DataFrame | |
| pos = [ | |
| (X_names[i], pos_scores[c, i], y_names[c]) | |
| for c, i in zip(*pos_positions.nonzero()) | |
| ] | |
| neg = [ | |
| (X_names[i], neg_scores[c, i], y_names[c]) | |
| for c, i in zip(*neg_positions.nonzero()) | |
| ] | |
| return pos, neg | |
| def output_transform( | |
| pos: List[Tuple[str, float, str]], neg: List[Tuple[str, float, str]] | |
| ) -> DataFrame: | |
| posdf = pd.DataFrame(pos, columns="word score label".split()).sort_values( | |
| ["label", "score"], ascending=False | |
| ) | |
| posdf["correlation"] = "positive" | |
| negdf = pd.DataFrame(neg, columns="word score label".split()).sort_values( | |
| ["label", "score"], ascending=False | |
| ) | |
| negdf["correlation"] = "negative" | |
| output = pd.concat([posdf, negdf], ignore_index=False, axis=0) | |
| output.columns = output.columns.str.title() | |
| return output | |