diff --git a/.gitignore b/.gitignore index aeadf1ab..4e24a0a1 100644 --- a/.gitignore +++ b/.gitignore @@ -54,6 +54,8 @@ src/features/lexicons/certainty.txt examples/vector_data/* examples/output/* node_modules/ +*.csv +*.log # testing /output diff --git a/src/team_comm_tools/feature_builder.py b/src/team_comm_tools/feature_builder.py index fe433b08..bed0e8b9 100644 --- a/src/team_comm_tools/feature_builder.py +++ b/src/team_comm_tools/feature_builder.py @@ -1,12 +1,11 @@ -# feature_builder.py - # 3rd Party Imports import pandas as pd pd.options.mode.chained_assignment = None import re import numpy as np from pathlib import Path -import time +from datetime import datetime +from time import perf_counter import itertools import warnings @@ -47,9 +46,6 @@ class FeatureBuilder: be calculated. Defaults to an empty list (i.e., no additional features beyond the defaults will be computed). :type custom_features: list, optional - :param analyze_first_pct: Analyze the first X% of the data. This parameter is useful because the - earlier stages of the conversation may be more predictive than the later stages. Defaults to [1.0]. - :type analyze_first_pct: list(float), optional :param turns: If true, collapses multiple "chats"/messages by the same speaker in a row into a single "turn." Defaults to False. :type turns: bool, optional @@ -62,9 +58,9 @@ class FeatureBuilder: :param message_col: A string representing the column name that should be selected as the message. Defaults to "message". :type message_col: str, optional - :param timestamp_col: A string representing the column name that should be selected as the message. + :param timestamp_col: A timestamp column name, or a tuple of (start_timestamp_col, end_timestamp_col). Defaults to "timestamp". - :type timestamp_col: str, optional + :type timestamp_col: str | tuple[str, str], optional :param timestamp_unit: A string representing the unit of the timestamp (if the timestamp is numeric). Defaults to 'ms' (milliseconds). Other options (D, s, ms, us, ns) can be found on the Pandas reference: https://pandas.pydata.org/docs/reference/api/pandas.to_datetime.html @@ -75,7 +71,7 @@ class FeatureBuilder: :type grouping_keys: list, optional :param cumulative_grouping: If true, uses a cumulative way of grouping chats (looking not just within a single ID, but also at what happened before). NOTE: This parameter and the following one - (`within_grouping`) were created in the context of a multi-stage Empirica game (see: + (`within_task`) were created in the context of a multi-stage Empirica game (see: https://github.com/Watts-Lab/multi-task-empirica). Assumes exactly 3 nested columns at different levels: a High, Mid, and Low level; that are temporally nested. Defaults to False. :type cumulative_grouping: bool, optional @@ -87,7 +83,7 @@ class FeatureBuilder: :type ner_training_df: pd.DataFrame, optional :param ner_cutoff: The cutoff value for the confidence of prediction for each named entity. Defaults to 0.9. - :type ner_cutoff: int + :type ner_cutoff: float :param regenerate_vectors: If true, regenerates vector data even if it already exists. Defaults to False. :type regenerate_vectors: bool, optional :param compute_vectors_from_preprocessed: If true, computes vectors using preprocessed text (with @@ -113,6 +109,24 @@ class FeatureBuilder: :type user_columns: list, optional :param use_gpu: Specifies whether to use GPU for vert/bert model. Defaults to False. :type use_gpu: bool, optional + :param corr_thresh: Minimum absolute Spearman correlation used to treat two numeric + columns as redundant during summary reduction. Defaults to 0.95. + :type corr_thresh: float, optional + :param min_na_ratio: Threshold for dropping numeric columns with high missing-value + ratio during summary reduction. Defaults to 0.3. + :type min_na_ratio: float, optional + :param min_zero_ratio: Threshold for dropping numeric columns with high zero ratio + during summary reduction. Defaults to 0.9. + :type min_zero_ratio: float, optional + :param min_group_size: Minimum connected-component size to treat a correlated set + of columns as a redundancy group. Defaults to 2. + :type min_group_size: int, optional + :param treat_zero_as_na: If true, zeros are treated as missing values when computing + redundancy metrics and selecting representative columns. Defaults to True. + :type treat_zero_as_na: bool, optional + :param drop_redundant_columns: If true, chat/user/conversation outputs are reduced to + representative numeric columns based on summary statistics. Defaults to False. + :type drop_redundant_columns: bool, optional :return: The FeatureBuilder writes the generated features to files in the specified paths. The progress will be printed in the terminal, indicating completion with "All Done!". :rtype: None @@ -126,7 +140,7 @@ def __init__( output_file_path_user_level: str = None, output_file_path_conv_level: str = None, custom_features: list = [], - analyze_first_pct: list = [1.0], + # analyze_first_pct: list = [1.0], turns: bool = False, conversation_id_col: str = "conversation_num", speaker_id_col: str = "speaker_nickname", @@ -137,7 +151,7 @@ def __init__( cumulative_grouping = False, within_task = False, ner_training_df: pd.DataFrame = None, - ner_cutoff: int = 0.9, + ner_cutoff: float = 0.9, regenerate_vectors: bool = False, compute_vectors_from_preprocessed: bool = False, custom_liwc_dictionary_path: str = '', @@ -147,19 +161,35 @@ def __init__( user_aggregation = True, user_methods: list = ['mean', 'max', 'min', 'stdev'], user_columns: list = None, - use_gpu: bool = False + use_gpu: bool = False, + corr_thresh: float = 0.9, + min_na_ratio: float = 0.3, + min_zero_ratio: float = 0.9, + min_group_size: int = 2, + treat_zero_as_na: bool = True, + drop_redundant_columns: bool = False ) -> None: - # Some error catching + ###### Initialization ###### + # Ensure output_file_base only contains alphanumeric characters and underscores. + self.output_file_base = re.sub('[^A-Za-z0-9_]', '', output_file_base) + if self.output_file_base != output_file_base: + output_file_base = re.sub('[^A-Za-z0-9_]', '', output_file_base) + warnings.warn("WARNING: Special characters detected in output_file_base. These characters have been automatically removed.") + # Set up logging + self.logger = setup_logger(name="feature_builder_logger", log_file_path=f"./{self.output_file_base}/logs/feature_builder.log") + self.summ_logger = setup_logger(name="summary_details_logger", log_file_path=f"./{self.output_file_base}/logs/summary_details.log") + # Check that input is a dataframe if not isinstance(input_df, pd.DataFrame): + self.logger.error(f"Expected a Pandas DataFrame as input_df, but got {type(df).__name__}") raise TypeError(f"Expected a Pandas DataFrame as input_df, but got {type(df).__name__})") - - print("Initializing Featurization...") input_df = input_df.reset_index(drop=True) # reset index to avoid issues with indexing later on - ###### Set all parameters ###### + print("Initializing Featurization...") + self.logger.info("=== Start Initializing FeatureBuilder ===") - assert(all(0 <= x <= 1 for x in analyze_first_pct)) # first, type check that this is a list of numbers between 0 and 1 - self.first_pct = analyze_first_pct # Set first pct of conversation you want to analyze + ###### Set all parameters ###### + # assert(all(0 <= x <= 1 for x in analyze_first_pct)) # first, type check that this is a list of numbers between 0 and 1 + # self.first_pct = analyze_first_pct # Set first pct of conversation you want to analyze self.turns = turns self.conversation_id_col = conversation_id_col self.speaker_id_col = speaker_id_col @@ -184,6 +214,12 @@ def __init__( self.user_methods = user_methods self.user_columns = user_columns self.use_gpu = use_gpu + self.corr_thresh = corr_thresh + self.min_na_ratio = min_na_ratio + self.min_zero_ratio = min_zero_ratio + self.min_group_size = min_group_size + self.treat_zero_as_na = treat_zero_as_na + self.drop_redundant_columns = drop_redundant_columns # Defining input and output paths. self.chat_data = input_df.copy() self.orig_data = input_df.copy() @@ -236,7 +272,8 @@ def __init__( invalid_features.add(feat) if invalid_features: invalid_features_str = ', '.join(invalid_features) - warnings.warn(f"WARNING: Invalid custom features provided. Ignoring `{invalid_features_str}`.") + print(f"WARNING: Invalid custom features provided. Ignoring `{invalid_features_str}`.") + self.logger.warning(f"WARNING: Invalid custom features provided. Ignoring `{invalid_features_str}`.") # remove named entities if we didn't pass in the column if self.ner_training is None: self.feature_names.remove("Named Entity Recognition") @@ -320,29 +357,35 @@ def __init__( self.output_file_path_conv_level = output_file_path_conv_level self.output_file_path_user_level = output_file_path_user_level - # Ensure output_file_base is alphanumeric + hyphens - if(re.sub('[^A-Za-z0-9_]', '', output_file_base) != output_file_base): - print('here1') - output_file_base = re.sub('[^A-Za-z0-9_]', '', output_file_base) - warnings.warn("WARNING: Special characters detected in output_file_base. These characters have been automatically removed.") + if self.output_file_path_chat_level is None: - self.output_file_path_chat_level = "./" + output_file_base + "_chat_level.csv" + self.output_file_path_chat_level = "./" + self.output_file_base + "_chat_level.csv" if self.output_file_path_conv_level is None: - self.output_file_path_conv_level = "./" + output_file_base + "_conv_level.csv" + self.output_file_path_conv_level = "./" + self.output_file_base + "_conv_level.csv" if self.output_file_path_user_level is None: - self.output_file_path_user_level = "./" + output_file_base + "_user_level.csv" + self.output_file_path_user_level = "./" + self.output_file_base + "_user_level.csv" # Basic error detetection if not bool(self.output_file_path_conv_level) or not bool(re.sub('[^A-Za-z0-9_]', '', self.output_file_path_conv_level)): + self.logger.error("ERROR: Improper conversation-level output file name detected.") raise ValueError("ERROR: Improper conversation-level output file name detected.") if not bool(self.output_file_path_user_level) or not bool(re.sub('[^A-Za-z0-9_]', '', self.output_file_path_user_level)): + self.logger.error("ERROR: Improper user (speaker)-level output file name detected.") raise ValueError("ERROR: Improper user (speaker)-level output file name detected.") # We assume that the base file name is the last item in the output path; we will use this to name the stored vectors. if ('/' not in self.output_file_path_chat_level or '/' not in self.output_file_path_conv_level or '/' not in self.output_file_path_user_level): + self.logger.error( + "We expect you to pass a path in for your output files " + "(output_file_path_chat_level, output_file_path_user_level, and " + "output_file_path_conv_level). If you would like the output to be " + "the current directory, please append './' to the beginning of your " + "filename(s). Your filename should be in the format: " + "path/to/output_name.csv or ./output_name.csv for the current working directory." + ) raise ValueError( "We expect you to pass a path in for your output files " "(output_file_path_chat_level, output_file_path_user_level, and " @@ -355,9 +398,11 @@ def __init__( try: base_file_name = self.output_file_path_chat_level.split("/")[-1] except: + self.logger.error("ERROR: Improper chat-level output file name detected.") raise ValueError("ERROR: Improper chat-level output file name detected.") if not bool(base_file_name) or not bool(re.sub('[^A-Za-z0-9_]', '', base_file_name)): # user didn't specify a file name, or specified one with only nonalphanumeric chars + self.logger.error("ERROR: Improper chat-level output file name detected.") raise ValueError("ERROR: Improper chat-level output file name detected.") try: @@ -396,7 +441,7 @@ def __init__( self.vect_path = vector_directory + "sentence/" + ("turns" if self.turns else "chats") + "/" + base_file_name self.bert_path = vector_directory + "sentiment/" + ("turns" if self.turns else "chats") + "/" + base_file_name - check_embeddings(self.chat_data, self.vect_path, self.bert_path, need_sentence, need_sentiment, self.regenerate_vectors, self.use_gpu, message_col = self.vector_colname) + check_embeddings(self.chat_data, self.vect_path, self.bert_path, need_sentence, need_sentiment, self.regenerate_vectors, self.use_gpu, self.vector_colname, self.logger) if(need_sentence): self.vect_data = pd.read_csv(self.vect_path, encoding='mac_roman') @@ -410,7 +455,9 @@ def __init__( # Deriving the base conversation level dataframe. self.conv_data = self.chat_data[[self.conversation_id_col]].drop_duplicates() - + print("Initialization Complete.") + self.logger.info("=== Initialization Complete ===") + self.logger.info("") def set_self_conv_data(self) -> None: @@ -473,9 +520,18 @@ def featurize(self) -> None: :return: None :rtype: None """ - + # Log start of run + start_time = perf_counter() + self.logger.info(f"=== Team Communication Toolkit FeatureBuilder Run initiated ===") + self.logger.info(f"Featurize started at {datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S %Z')}") + num_lines = self.chat_data.shape[0] + num_speakers = self.chat_data[self.speaker_id_col].nunique() + num_conversations = self.chat_data[self.conversation_id_col].nunique() + self.logger.info(f"Data file has {num_lines} lines (chats), {num_speakers} unique speakers, {num_conversations} unique conversations.") + # Step 1. Create chat level features. print("Chat Level Features ...") + self.logger.info("--- Chat Level Features ---") self.chat_level_features() # Things to store before we loop through truncations @@ -487,63 +543,76 @@ def featurize(self) -> None: # Step 2. # Run the chat-level features once, then produce different summaries based on # user specification. - for percentage in self.first_pct: + # for percentage in self.first_pct: # Reset chat, conv, and user objects - self.chat_data = self.chat_data_complete - self.user_data = self.chat_data[[self.conversation_id_col, self.speaker_id_col]].drop_duplicates() - self.set_self_conv_data() + self.chat_data = self.chat_data_complete + self.user_data = self.chat_data[[self.conversation_id_col, self.speaker_id_col]].drop_duplicates() + self.set_self_conv_data() - print("Generating features for the first " + str(percentage*100) + "% of messages...") - self.get_first_pct_of_chat(percentage) + # print("Generating features for the first " + str(percentage*100) + "% of messages...") + # self.logger.info("Generating features for the first " + str(percentage*100) + "% of messages...") + # self.get_first_pct_of_chat(percentage) # update output paths based on truncation percentage to save in a designated folder - if percentage != 1: # special folders for when the percentage is partial - self.output_file_path_user_level = re.sub('/output/', '/output/first_' + str(int(percentage*100)) + "/", self.output_file_path_user_level_original) - self.output_file_path_chat_level = re.sub('/output/', '/output/first_' + str(int(percentage*100)) + "/", self.output_file_path_chat_level_original) - self.output_file_path_conv_level = re.sub('/output/', '/output/first_' + str(int(percentage*100)) + "/", self.output_file_path_conv_level_original) - else: - self.output_file_path_user_level = self.output_file_path_user_level_original - self.output_file_path_chat_level = self.output_file_path_chat_level_original - self.output_file_path_conv_level = self.output_file_path_conv_level_original - - # Make it possible to create folders if they don't exist - Path(self.output_file_path_user_level).parent.mkdir(parents=True, exist_ok=True) - Path(self.output_file_path_chat_level).parent.mkdir(parents=True, exist_ok=True) - Path(self.output_file_path_conv_level).parent.mkdir(parents=True, exist_ok=True) + # if percentage != 1: # special folders for when the percentage is partial + # self.output_file_path_user_level = re.sub('/output/', '/output/first_' + str(int(percentage*100)) + "/", self.output_file_path_user_level_original) + # self.output_file_path_chat_level = re.sub('/output/', '/output/first_' + str(int(percentage*100)) + "/", self.output_file_path_chat_level_original) + # self.output_file_path_conv_level = re.sub('/output/', '/output/first_' + str(int(percentage*100)) + "/", self.output_file_path_conv_level_original) + # else: + self.output_file_path_user_level = self.output_file_path_user_level_original + self.output_file_path_chat_level = self.output_file_path_chat_level_original + self.output_file_path_conv_level = self.output_file_path_conv_level_original - # Store column names of what we generated, so that the user can easily access them - self.chat_features = list(itertools.chain(*[feature_dict[feature]["columns"] for feature in self.feature_names if feature_dict[feature]["level"] == "Chat"])) - if self.custom_liwc_dictionary: - self.chat_features += [lexicon_type + "_lexical_wordcount_custom" for lexicon_type in self.custom_liwc_dictionary.keys()] - self.conv_features_base = list(itertools.chain(*[feature_dict[feature]["columns"] for feature in self.feature_names if feature_dict[feature]["level"] == "Conversation"])) + # Make it possible to create folders if they don't exist + Path(self.output_file_path_user_level).parent.mkdir(parents=True, exist_ok=True) + Path(self.output_file_path_chat_level).parent.mkdir(parents=True, exist_ok=True) + Path(self.output_file_path_conv_level).parent.mkdir(parents=True, exist_ok=True) - # Step 3a. Create user level features. - print("Generating User Level Features ...") - self.user_level_features() - - # Step 3b. Create conversation level features. - print("Generating Conversation Level Features ...") - self.conv_level_features() - self.merge_conv_data_with_original() - - # Step 4. Write the features into the files defined in the output paths. - self.conv_features_all = [col for col in self.conv_data if col not in list(self.orig_data.columns) + ["conversation_num", self.message_col + "_original", "message_lower_with_punc"]] # save the column names that we generated! - print("All Done!") + # Store column names of what we generated, so that the user can easily access them + self.chat_features = list(itertools.chain(*[feature_dict[feature]["columns"] for feature in self.feature_names if feature_dict[feature]["level"] == "Chat"])) + if self.custom_liwc_dictionary: + self.chat_features += [lexicon_type + "_lexical_wordcount_custom" for lexicon_type in self.custom_liwc_dictionary.keys()] + self.conv_features_base = list(itertools.chain(*[feature_dict[feature]["columns"] for feature in self.feature_names if feature_dict[feature]["level"] == "Conversation"])) - self.save_features() + # Step 3a. Create user level features. + print("Generating User Level Features ...") + self.logger.info("--- User Level Features ---") + self.user_level_features() + + # Step 3b. Create conversation level features. + print("Generating Conversation Level Features ...") + self.logger.info("--- Conversation Level Features ---") + self.conv_level_features() + self.merge_conv_data_with_original() + + # Step 4. Write the features into the files defined in the output paths. + self.conv_features_all = [col for col in self.conv_data if col not in list(self.orig_data.columns) + ["conversation_num", self.message_col + "_original", "message_lower_with_punc"]] # save the column names that we generated! + end_time = perf_counter() + print("All Done!") + self.logger.info(f"=== Featurization Completed in {end_time - start_time:.2f} seconds! ===") + self.logger.info("") + + self.logger.info("=== Feature Output Summary (Please see summary_details.log for all the details) ===") + self.logger.info("--- Chat Level ---") + chat_data_reduced = self.generate_summary_stats(self.chat_data) + if self.drop_redundant_columns: + self.chat_data = chat_data_reduced + self.logger.info("--- Conversation Level ---") + conv_data_reduced = self.generate_summary_stats(self.conv_data) + if self.drop_redundant_columns: + self.conv_data = conv_data_reduced + self.logger.info("--- User Level ---") + user_data_reduced = self.generate_summary_stats(self.user_data) + if self.drop_redundant_columns: + self.user_data = user_data_reduced + + self.save_features() def preprocess_chat_data(self) -> None: """ Call all preprocessing modules needed to clean the chat text. This function groups the chat data as specified, verifies column presence, creates original and lowercased columns, preprocesses text, and optionally processes chat turns. - - :param turns: Whether to preprocess naive turns, defaults to False - :type turns: bool, optional - :param col: Columns to preprocess, including conversation_id, speaker_id and message, defaults to None - :type cumulative_grouping: bool, optional - :param within_task: Whether to group within tasks, defaults to False - :type within_task: bool, optional :return: None :rtype: None @@ -626,31 +695,32 @@ def chat_level_features(self) -> None: message_col = self.message_col, timestamp_col = self.timestamp_col, timestamp_unit = self.timestamp_unit, - custom_liwc_dictionary = self.custom_liwc_dictionary + custom_liwc_dictionary = self.custom_liwc_dictionary, + logger = self.logger ) # Calling the driver inside this class to create the features. self.chat_data = chat_feature_builder.calculate_chat_level_features(self.feature_methods_chat) # Remove special characters in column names self.chat_data.columns = ["".join(c for c in col if c.isalnum() or c == '_') for col in self.chat_data.columns] - def get_first_pct_of_chat(self, percentage) -> None: - """ - Truncate each conversation to the first X% of rows. + # def get_first_pct_of_chat(self, percentage) -> None: + # """ + # Truncate each conversation to the first X% of rows. - This function groups the chat data by `conversation_num` and retains only - the first X% of rows for each conversation. + # This function groups the chat data by `conversation_num` and retains only + # the first X% of rows for each conversation. - :param percentage: Percentage of rows to retain in each conversation - :type percentage: float + # :param percentage: Percentage of rows to retain in each conversation + # :type percentage: float - :return: None - :rtype: None - """ - chat_grouped = self.chat_data.groupby(self.conversation_id_col) - num_rows_to_retain = pd.DataFrame(np.ceil(chat_grouped.size() * percentage)).reset_index() - chat_truncated = pd.DataFrame() - for conversation_num, num_rows in num_rows_to_retain.itertuples(index=False): - chat_truncated = pd.concat([chat_truncated,chat_grouped.get_group(conversation_num).head(int(num_rows))], ignore_index = True) + # :return: None + # :rtype: None + # """ + # chat_grouped = self.chat_data.groupby(self.conversation_id_col) + # num_rows_to_retain = pd.DataFrame(np.ceil(chat_grouped.size() * percentage)).reset_index() + # chat_truncated = pd.DataFrame() + # for conversation_num, num_rows in num_rows_to_retain.itertuples(index=False): + # chat_truncated = pd.concat([chat_truncated,chat_grouped.get_group(conversation_num).head(int(num_rows))], ignore_index = True) def user_level_features(self) -> None: """ @@ -672,7 +742,8 @@ def user_level_features(self) -> None: user_aggregation = self.user_aggregation, user_methods = self.user_methods, user_columns = self.user_columns, - chat_features = self.chat_features + chat_features = self.chat_features, + logger=self.logger ) self.user_data = user_feature_builder.calculate_user_level_features() # Remove special characters in column names @@ -705,6 +776,7 @@ def conv_level_features(self) -> None: user_methods = self.user_methods, user_columns = self.user_columns, chat_features = self.chat_features, + logger=self.logger ) # Calling the driver inside this class to create the features. self.conv_data = conv_feature_builder.calculate_conversation_level_features(self.feature_methods_conv) @@ -727,9 +799,10 @@ def load_custem_liwc_dict(self, custom_liwc_dictionary_path: str) -> dict: """ Load the custom LIWC dictionary from the provided path. - This function reads the custom LIWC dictionary from the provided path and returns the dictionary. + This function reads the custom LIWC dictionary from the provided path and returns + the parsed dictionary. If the path is empty/invalid, returns an empty dict. - :param custom_liwc_dictionary_path: Path to the custom LIWC dictionary file + :param custom_liwc_dictionary_path: Path to the custom LIWC dictionary file. :type custom_liwc_dictionary_path: str :return: Custom LIWC dictionary @@ -760,7 +833,7 @@ def verify_timestamp_format(self, timestamp_col) -> None: Verifies that a column in a DataFrame is composed of values that can be parsed either as datetime or as numeric values suitable for time difference calculations. - :param timestamp_col: The name of the column to verify + :param timestamp_col: The name of the column to verify. :type timestamp_col: str :return: None @@ -786,4 +859,169 @@ def verify_timestamp_format(self, timestamp_col) -> None: raise ValueError( f"Column '{timestamp_col}' contains values that are neither parseable as datetime " f"nor convertible to numeric format." - ) \ No newline at end of file + ) + + def log_column_groups(self, groups, max_groups, max_cols_per_group): + """ + Log correlated feature groups to standard and detailed loggers. + + :param groups: Correlated column groups. + :type groups: list[list[str]] + :param max_groups: Maximum number of groups to print to the standard logger. + :type max_groups: int + :param max_cols_per_group: Maximum number of columns shown per group in + the standard logger. + :type max_cols_per_group: int + + :return: None + :rtype: None + """ + total_groups = len(groups) + self.logger.info("Found %s correlated feature groups", total_groups) + for i, group in enumerate(groups[:max_groups], 1): + size = len(group) + if size > max_cols_per_group: + shown = ", ".join(group[:max_cols_per_group]) + self.logger.info( + "[Group %02d | size=%d] %s ... (+%d more)", + i, size, shown, size - max_cols_per_group + ) + else: + self.logger.info( + "[Group %02d | size=%d] %s", + i, size, ", ".join(group) + ) + if total_groups > max_groups: + self.logger.info( + "... (%d more groups not shown)", + total_groups - max_groups + ) + self.summ_logger.info("Full correlated feature groups output:") + for i, group in enumerate(groups, 1): + self.summ_logger.info( + "[Group %02d | size=%d] %s", + i, len(group), ", ".join(group) + ) + + def keep_one_column_per_group(self, df, groups): + """ + Select one representative column from each correlated group. + + Non-grouped columns are preserved, and grouped columns are reduced to the + best-scoring representative based on valid-count and variance. + + :param df: Original dataframe. + :type df: pd.DataFrame + :param groups: Groups of similar columns. + :type groups: list[list[str]] + + :return: Final list of columns to keep. + :rtype: list[str] + """ + grouped_cols = set() + representative_map = {} + kept_group_cols = [] + + for group in groups: + grouped_cols.update(group) + + def score(col): + s = df[col] + if self.treat_zero_as_na: + valid_count = ((~s.isna()) & (s != 0)).sum() + else: + valid_count = s.notna().sum() + + variance = s.replace(0, pd.NA).dropna().var() if self.treat_zero_as_na else s.dropna().var() + variance = 0 if pd.isna(variance) else variance + + return (valid_count, variance) + + best_col = max(group, key=score) + kept_group_cols.append(best_col) + representative_map[best_col] = [c for c in group if c != best_col] + + ungrouped_cols = [c for c in df.columns if c not in grouped_cols] + + kept_columns = ungrouped_cols + kept_group_cols + return kept_columns + + def generate_summary_stats(self, df) -> pd.DataFrame: + """ + Log and optionally reduce redundant numeric feature columns. + + The method identifies numeric columns with high missingness and zero rates, + discovers highly correlated feature groups, and retains one representative + per group. Non-numeric columns are preserved and reattached before return. + + :param df: Input dataframe to summarize and optionally reduce. + :type df: pd.DataFrame + + :return: Dataframe with non-numeric columns plus filtered numeric columns. + :rtype: pd.DataFrame + """ + # drop non-numeric columns + df_reduced = df.select_dtypes(include=[np.number]) + df_other = df.select_dtypes(exclude=[np.number]) + + # 1. list columns with lots of NAs + na_ratio = df_reduced.isna().mean() + cols_with_many_nas = na_ratio[na_ratio > self.min_na_ratio].index.tolist() + drop_str = " were dropped" if self.drop_redundant_columns else "" + self.logger.info( + f"{len(cols_with_many_nas)} columns with more than {self.min_na_ratio * 100}% NA's{drop_str}" + ) + self.summ_logger.info( + f"Columns with more than {self.min_na_ratio * 100}% NA's{drop_str}:\n"\ + + "\n".join(" "*30 + f"- {str(col)}" for col in cols_with_many_nas)) + df_reduced = df_reduced.drop(columns=cols_with_many_nas) + + # 2. list columns with lots of zeros + zero_ratio = (df_reduced == 0).mean() + cols_with_many_zeros = zero_ratio[zero_ratio > self.min_zero_ratio].index.tolist() + self.logger.info( + f"{len(cols_with_many_zeros)} columns with more than {self.min_zero_ratio * 100}% zeros{drop_str}" + ) + self.summ_logger.info( + f"Columns with more than {self.min_zero_ratio * 100}% zeros{drop_str}:\n"\ + + "\n".join(" "*30 + f"- {str(col)}" for col in cols_with_many_zeros)) + df_reduced = df_reduced.drop(columns=cols_with_many_zeros) + + # 3. cluster similar columns + if self.treat_zero_as_na: + df_reduced = df_reduced.replace(0, np.nan) + corr = df_reduced.corr(method="spearman", min_periods=max(10, int(0.05 * len(df_reduced)))).abs() + cols = corr.columns.tolist() + graph = {col: set() for col in cols} + for i in range(len(cols)): + for j in range(i + 1, len(cols)): + r = corr.iloc[i, j] + if pd.notna(r) and r >= self.corr_thresh: + a, b = cols[i], cols[j] + graph[a].add(b) + graph[b].add(a) + visited = set() + groups = [] + for col in cols: + if col in visited: + continue + stack = [col] + group = [] + while stack: + node = stack.pop() + if node in visited: + continue + visited.add(node) + group.append(node) + stack.extend(graph[node] - visited) + if len(group) >= self.min_group_size: + groups.append(sorted(group)) + groups.sort(key=lambda g: (-len(g), g)) + self.log_column_groups(groups, max_groups=10, max_cols_per_group=8) + + kept_columns = self.keep_one_column_per_group(df_reduced, groups) + df_reduced = df_reduced[kept_columns] + if self.drop_redundant_columns: + self.logger.info("For each group of similar columns, one representative with the most valid data and highest variance was retained") + df_final = pd.concat([df_other, df_reduced], axis=1) + return df_final \ No newline at end of file diff --git a/src/team_comm_tools/utils/calculate_chat_level_features.py b/src/team_comm_tools/utils/calculate_chat_level_features.py index 051544b7..836875b4 100644 --- a/src/team_comm_tools/utils/calculate_chat_level_features.py +++ b/src/team_comm_tools/utils/calculate_chat_level_features.py @@ -19,6 +19,7 @@ # Importing utils from .preload_word_lists import * from .zscore_chats_and_conversation import get_zscore_across_all_chats, get_zscore_across_all_conversations +from time import perf_counter # Loading bar from tqdm import tqdm @@ -69,7 +70,8 @@ def __init__( message_col: str, timestamp_col: str | tuple[str, str], timestamp_unit: str, - custom_liwc_dictionary: dict + custom_liwc_dictionary: dict, + logger: logging.Logger ) -> None: self.chat_data = chat_data @@ -86,6 +88,7 @@ def __init__( self.function_words = get_function_words() # load function words exactly once self.question_words = get_question_words() # load question words exactly once self.first_person = get_first_person_words() # load first person words exactly once + self.logger = logger def calculate_chat_level_features(self, feature_methods: list) -> pd.DataFrame: """ @@ -99,7 +102,10 @@ def calculate_chat_level_features(self, feature_methods: list) -> pd.DataFrame: """ for method in tqdm(feature_methods): + start_time = perf_counter() method(self) + end_time = perf_counter() + self.logger.info(f" - {method.__name__}: {end_time - start_time:.2f} seconds.") # Return the input dataset with the chat level features appended (as columns) return self.chat_data diff --git a/src/team_comm_tools/utils/calculate_conversation_level_features.py b/src/team_comm_tools/utils/calculate_conversation_level_features.py index 4e959559..f6ab7e1d 100644 --- a/src/team_comm_tools/utils/calculate_conversation_level_features.py +++ b/src/team_comm_tools/utils/calculate_conversation_level_features.py @@ -8,6 +8,7 @@ from team_comm_tools.utils.gini_coefficient import * from team_comm_tools.utils.preprocess import * from fuzzywuzzy import process +from time import perf_counter class ConversationLevelFeaturesCalculator: """ @@ -57,6 +58,7 @@ def __init__(self, chat_data: pd.DataFrame, user_methods: list, user_columns: list, chat_features: list, + logger ) -> None: # Initializing variables @@ -75,6 +77,7 @@ def __init__(self, chat_data: pd.DataFrame, self.user_methods = user_methods self.user_columns = user_columns self.chat_features = chat_features + self.logger = logger def clean_up_aggregation_method_names(aggregation_method_names:list, method_param:str) -> list: """ @@ -234,7 +237,10 @@ def calculate_conversation_level_features(self, feature_methods: list) -> pd.Dat """ for method in feature_methods: + start_time = perf_counter() method(self) + end_time = perf_counter() + self.logger.info(f" - {method.__name__}: {end_time - start_time:.2f} seconds.") return self.conv_data diff --git a/src/team_comm_tools/utils/calculate_user_level_features.py b/src/team_comm_tools/utils/calculate_user_level_features.py index 0d9043c9..cf85b47f 100644 --- a/src/team_comm_tools/utils/calculate_user_level_features.py +++ b/src/team_comm_tools/utils/calculate_user_level_features.py @@ -3,6 +3,7 @@ from team_comm_tools.features.get_user_network import * from team_comm_tools.features.user_centroids import * from fuzzywuzzy import process +from time import perf_counter class UserLevelFeaturesCalculator: """ @@ -38,7 +39,8 @@ def __init__(self, chat_data: pd.DataFrame, user_aggregation: bool, user_methods: list, user_columns: list, - chat_features: list) -> None: + chat_features: list, + logger) -> None: # Initializing variables self.chat_data = chat_data @@ -49,6 +51,7 @@ def __init__(self, chat_data: pd.DataFrame, self.user_aggregation = user_aggregation self.user_methods = user_methods self.chat_features = chat_features + self.logger = logger def clean_up_aggregation_method_names(aggregation_method_names:list) -> list: """ @@ -152,16 +155,28 @@ def calculate_user_level_features(self) -> pd.DataFrame: """ # Get total counts for features that need to be summed, regardless of what the user specified + start_time = perf_counter() self.get_user_level_summed_features() - + end_time = perf_counter() + self.logger.info(f" - user_level_summed_features: {end_time - start_time:.2f} seconds.") + # Get user summary statistics for all features (e.g. mean, min, max, stdev) + start_time = perf_counter() self.get_user_level_summary_statistics_features() - + end_time = perf_counter() + self.logger.info(f" - user_level_summary_statistics_features: {end_time - start_time:.2f} seconds.") + # Get 4 discursive features (discursive diversity, variance in DD, incongruent modulation, within-person discursive range) + start_time = perf_counter() self.get_centroids() + end_time = perf_counter() + self.logger.info(f" - user_centroids: {end_time - start_time:.2f} seconds.") # Get list of other users in a given conversation + start_time = perf_counter() self.get_user_network() + end_time = perf_counter() + self.logger.info(f" - user_network: {end_time - start_time:.2f} seconds.") return self.user_data diff --git a/src/team_comm_tools/utils/check_embeddings.py b/src/team_comm_tools/utils/check_embeddings.py index e5c6eba7..06e9a1bd 100644 --- a/src/team_comm_tools/utils/check_embeddings.py +++ b/src/team_comm_tools/utils/check_embeddings.py @@ -6,14 +6,13 @@ import warnings from tqdm import tqdm from pathlib import Path +from time import perf_counter -import torch -from sentence_transformers import SentenceTransformer, util +from torch import cuda, no_grad +from sentence_transformers import SentenceTransformer -from transformers import AutoTokenizer -from transformers import AutoModelForSequenceClassification +from transformers import AutoTokenizer, AutoModelForSequenceClassification, logging from scipy.special import softmax -from transformers import logging logging.set_verbosity(40) # only log errors @@ -26,7 +25,7 @@ # Check if embeddings exist def check_embeddings(chat_data: pd.DataFrame, vect_path: str, bert_path: str, need_sentence: bool, - need_sentiment: bool, regenerate_vectors: bool, use_gpu: bool, message_col: str = "message"): + need_sentiment: bool, regenerate_vectors: bool, use_gpu: bool, message_col: str, logger): """ Check if embeddings and required lexicons exist, and generate them if they don't. @@ -49,43 +48,71 @@ def check_embeddings(chat_data: pd.DataFrame, vect_path: str, bert_path: str, ne :type use_gpu: bool :param message_col: A string representing the column name that should be selected as the message. Defaults to "message". :type message_col: str, optional + :param logger: Logger for logging messages + :type logger: logging.Logger :return: None :rtype: None """ device = "cpu" if use_gpu: - if torch.cuda.is_available(): + if cuda.is_available(): print("Using GPU for embeddings.") + logger.info("Using GPU for embeddings.") device = "cuda" else: print("GPU not available, using CPU for embeddings.") + logger.info("GPU not available, using CPU for embeddings.") if (regenerate_vectors or (not os.path.isfile(vect_path))) and need_sentence: + logger.info("Generating sentence vectors cache...") + start_time = perf_counter() generate_vect(chat_data, vect_path, message_col, device) + end_time = perf_counter() + logger.info(f"Sentence vectors generation completed in {end_time - start_time:.2f} seconds.") if (regenerate_vectors or (not os.path.isfile(bert_path))) and need_sentiment: + logger.info("Generating BERT vectors cache...") + start_time = perf_counter() generate_bert(chat_data, bert_path, message_col, device) + end_time = perf_counter() + logger.info(f"BERT vectors generation completed in {end_time - start_time:.2f} seconds.") try: vector_df = pd.read_csv(vect_path) # check whether the given vector and bert data matches length of chat data if len(vector_df) != len(chat_data): print("ERROR: The length of the vector data does not match the length of the chat data. Regenerating...") + logger.error("The length of the vector data does not match the length of the chat data. Regenerating...") + start_time = perf_counter() generate_vect(chat_data, vect_path, message_col, device) + end_time = perf_counter() + logger.info(f"Sentence vectors regeneration completed in {end_time - start_time:.2f} seconds.") except FileNotFoundError: # It's OK if we don't have the path, if the sentence vectors are not necessary if need_sentence: + logger.error("Vector embeddings file not found. Generating new vector embeddings.") + start_time = perf_counter() generate_vect(chat_data, vect_path, message_col, device) + end_time = perf_counter() + logger.info(f"Sentence vectors generation completed in {end_time - start_time:.2f} seconds.") try: bert_df = pd.read_csv(bert_path) if len(bert_df) != len(chat_data): print("ERROR: The length of the sentiment data does not match the length of the chat data. Regenerating...") + logger.error("The length of the sentiment data does not match the length of the chat data. Regenerating...") # delete the file + start_time = perf_counter() generate_bert(chat_data, bert_path, message_col, device) + end_time = perf_counter() + logger.info(f"BERT vectors regeneration completed in {end_time - start_time:.2f} seconds.") except FileNotFoundError: if need_sentiment: # It's OK if we don't have the path, if the sentiment features are not necessary + logger.error("BERT sentiment file not found. Generating new BERT sentiments.") + start_time = perf_counter() generate_bert(chat_data, bert_path, message_col, device) - + end_time = perf_counter() + logger.info(f"BERT vectors generation completed in {end_time - start_time:.2f} seconds.") + # Get the lexicon pickle(s) if they don't exist current_script_directory = Path(__file__).resolve().parent LEXICON_PATH_STATIC = current_script_directory.parent/"features/assets/lexicons_dict.pkl" @@ -448,7 +475,7 @@ def get_sentiment(texts, model_bert, device): encoded = tokenizer(non_null_non_empty_texts, padding=True, truncation=True, max_length=512, return_tensors='pt') encoded = {k: v.to(device) for k, v in encoded.items()} - with torch.no_grad(): + with no_grad(): output = model_bert(**encoded) scores = output[0].detach().cpu().numpy() diff --git a/src/team_comm_tools/utils/preprocess.py b/src/team_comm_tools/utils/preprocess.py index 0234eba8..0f04d2f6 100644 --- a/src/team_comm_tools/utils/preprocess.py +++ b/src/team_comm_tools/utils/preprocess.py @@ -1,6 +1,7 @@ import re +import logging import pandas as pd -# import warnings +import os EMOJIS = { "(:", "(;", "):", "/:", ":(", ":)", ":/", ";)", # 8 emojis from LIWC 2017 @@ -296,3 +297,34 @@ def create_cumulative_rows(input_df, conversation_id, timestamp_col, grouping_ke ) return result_df + +def setup_logger(name: str, log_file_path: str, level: int=logging.INFO): + """Set up a logger + + :param name: The name of the logger. + :type name: str + :param log_file_path: Path to the log file, such as './output/logs/feature_builder.log'. + :type log_file_path: str + :param level: Logging level, defaults to logging.INFO. All levels: 0: NOTSET, 10: DEBUG, 20: INFO, 30: WARNING, 40: ERROR, 50: CRITICAL. + :type level: int, optional + :return: Configured logger. + :rtype: logging.Logger + """ + formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") + log_dir = os.path.dirname(log_file_path) + if log_dir: + os.makedirs(log_dir, exist_ok=True) + logger = logging.getLogger(name) + logger.setLevel(level) + # Prevent “double logging” via parent/root handlers + logger.propagate = False + abs_path = os.path.abspath(log_file_path) + # If a FileHandler for this same file already exists, don’t add another + for h in logger.handlers: + if isinstance(h, logging.FileHandler) and os.path.abspath(getattr(h, "baseFilename", "")) == abs_path: + return logger + handler = logging.FileHandler(log_file_path) + handler.setFormatter(formatter) + logger.addHandler(handler) + + return logger \ No newline at end of file