from pif_compiler.services.echa_find import search_dossier from bs4 import BeautifulSoup from markdownify import MarkdownConverter import pandas as pd import requests import os import re import markdown_to_json import json import copy import unicodedata from datetime import datetime import logging import duckdb # Settings per il logging logging.basicConfig( format="{asctime} - {levelname} - {message}", style="{", datefmt="%Y-%m-%d %H:%M", filename="echa.log", encoding="utf-8", filemode="a", level=logging.INFO, ) try: # Carico il full scraping in memoria se esiste con = duckdb.connect() os.chdir(".") # directory che legge python res = con.sql(""" CREATE TABLE echa_full_scraping AS SELECT * FROM read_csv_auto('src\data\echa_full_scraping.csv'); """) # leggi il file csv come db in memory logging.info( f"echa.echaProcess().main: Loaded echa scraped data into duckdb memory. First CAS in the df is: {con.sql('select CAS from echa_full_scraping limit 1').fetchone()[0]}" ) local_echa = True except: logging.error(f"echa.echaProcess().main: No local echa scraped data found") # Metodo per trovare le informazioni relative sul sito echa # Funziona sia con il nome della sostanza che con il CUS def openEchaPage(link, local=False): try: if local: page = open(link, encoding="utf8") soup = BeautifulSoup(page, "html.parser") else: page = requests.get(link) page.encoding = "utf-8" soup = BeautifulSoup(page.text, "html.parser") except: logging.error( f"echa.echaProcess.openEchaPage() error. could not open: '{link}'", exc_info=True, ) return soup # Metodo per trasformare la pagina dell'echa in un Markdown def echaPage_to_md(sezione, scrapingType=None, local=False, substance=None): # sezione : il soup della pagina estratta attraverso search_dossier # scrapingType : 'RepeatedDose' o 'AcuteToxicity' # local : se vuoi salvare il contenuto del markdown in locale. Utile per debuggare # substance : il nome della sostanza. Per salvarla nel path corretto # Create shorthand method for conversion def md(soup, **options): return MarkdownConverter(**options).convert_soup(soup) output = md(sezione) # Trasformo la section html in un markdown, che però va corretto. # Cambia un po' il modo in cui modifico il .md in base al tipo di pagina da scrappare # aggiungo eccezioni man mano che testo nuove sostanze if scrapingType == "RepeatedDose": output = output.replace("### Oral route", "#### oral") output = output.replace("### Dermal", "#### dermal") output = output.replace("### Inhalation", "#### inhalation") # Devo rimpiazzare >< con delle stringhe perchè sennò il jsonifier interpreta quei due simboli come dei codici e inserisce il testo nelle [] output = re.sub(r">\s+", "greater than ", output) # Replace '<' followed by whitespace with 'less than ' output = re.sub(r"<\s+", "less than ", output) output = re.sub(r">=\s*\n", "greater or equal than ", output) output = re.sub(r"<=\s*\n", "less or equal than ", output) elif scrapingType == "AcuteToxicity": # Devo rimpiazzare >< con delle stringhe perchè sennò il jsonifier interpreta quei due simboli come dei codici e inserisce il testo nelle [] output = re.sub(r">\s+", "greater than ", output) # Replace '<' followed by whitespace with 'less than ' output = re.sub(r"<\s+", "less than ", output) output = re.sub(r">=\s*\n", "greater or equal than", output) output = re.sub(r"<=\s*\n", "less or equal than ", output) output = output.replace("–", "-") output = re.sub(r"\s+mg", " mg", output) # sta parte serve per fixare le unità di misura che vanno a capo e sono separate dal valore if local and substance: path = f"{scrapingType}/mds/{substance}.md" os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as text_file: text_file.write(output) return output # Questo è la parte 2 del processing del sito ECHA. Va trasformato il markdown in un JSON def markdown_to_json_raw(output, scrapingType=None, local=False, substance=None): # Output: Il markdown # scrapingType : 'RepeatedDose' o 'AcuteToxicity' # substance : il nome della sostanza. Per salvarla nel path corretto jsonified = markdown_to_json.jsonify(output) dictified = json.loads(jsonified) # Salvo il json iniziale così come esce da jsonify if local and scrapingType and substance: path = f"{scrapingType}/jsons/raws/{substance}_raw0.json" os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as text_file: text_file.write(jsonified) # Ora splitto i contenuti dei dizionari innestati. for key, value in dictified.items(): if type(value) == dict: for key2, value2 in value.items(): parts = value2.split("\n\n") dictified[key][key2] = { parts[i]: parts[i + 1] for i in range(0, len(parts) - 1, 2) if parts[i + 1] != "[Empty]" } else: parts = value.split("\n\n") dictified[key] = { parts[i]: parts[i + 1] for i in range(0, len(parts) - 1, 2) if parts[i + 1] != "[Empty]" } jsonified = json.dumps(dictified) if local and scrapingType and substance: path = f"{scrapingType}/jsons/raws/{substance}_raw1.json" os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as text_file: text_file.write(jsonified) dictified = json.loads(jsonified) return jsonified # Metodo creato da claude per risolvere i problemi di unicode characters def normalize_unicode_characters(text): """ Normalize Unicode characters, with special handling for superscript """ if not isinstance(text, str): return text # Specific replacements for common Unicode encoding issues # e per altre eccezioni particolari replacements = { "\u00c2\u00b2": "²", # ² -> ² "\u00c2\u00b3": "³", # ³ -> ³ "\u00b2": "²", # Bare superscript 2 "\u00b3": "³", # Bare superscript 3 "\n": "", # ogni tanto ci sono degli \n brutti da togliere "greater than": ">", "less than": "<", "greater or equal than": ">=", "less or equal than": "<", # Ste due entry le ho messe io. >< creano problemi quindi le rinonimo temporaneamente } # Apply specific replacements first for old, new in replacements.items(): text = text.replace(old, new) # Normalize Unicode characters text = unicodedata.normalize("NFKD", text) return text # Un'altro metodo creato da Claude. # Pare che il mio cervello sia troppo piccolo per riuscire a ciclare ricursivamente # un dizionario innestato. Se magari avessimo fatto algoritmi e strutture dati... def clean_json(data): """ Recursively clean JSON by removing empty/uninformative entries and normalizing Unicode characters """ def is_uninformative(value, context=None): """ Check if a dictionary entry is considered uninformative Args: value: The value to check context: Additional context about where the value is located """ # Specific exceptions if context and context == "Key value for chemical safety assessment": # Always keep all entries in this specific section return False uninformative_values = ["hours/week", "", None] return value in uninformative_values or ( isinstance(value, str) and ( value.strip() in uninformative_values or value.lower() == "no information available" ) ) def clean_recursive(obj, context=None): # If it's a dictionary, process its contents if isinstance(obj, dict): # Create a copy to modify cleaned = {} for key, value in obj.items(): # Normalize key normalized_key = normalize_unicode_characters(key) # Set context for nested dictionaries new_context = context or normalized_key # Recursively clean nested structures cleaned_value = clean_recursive(value, new_context) # Conditions for keeping the entry keep_entry = ( cleaned_value not in [None, {}, ""] and not ( isinstance(cleaned_value, dict) and len(cleaned_value) == 0 ) and not is_uninformative(cleaned_value, new_context) ) # Add to cleaned dict if conditions are met if keep_entry: cleaned[normalized_key] = cleaned_value return cleaned if cleaned else None # If it's a list, clean each item elif isinstance(obj, list): cleaned_list = [clean_recursive(item, context) for item in obj] cleaned_list = [item for item in cleaned_list if item not in [None, {}, ""]] return cleaned_list if cleaned_list else None # For strings, normalize Unicode elif isinstance(obj, str): return normalize_unicode_characters(obj) # Return as-is for other types return obj # Create a deep copy to avoid modifying original data cleaned_data = clean_recursive(copy.deepcopy(data)) # Sì figa questa è la parte che mi ha fatto sclerare # Ciclare in dizionari innestati senza poter modificare la struttura return cleaned_data def json_to_dataframe(cleaned_json, scrapingType): rows = [] schema = { "RepeatedDose": [ "Substance", "CAS", "Toxicity Type", "Route", "Dose descriptor", "Effect level", "Species", "Extraction_Timestamp", "Endpoint conclusion", ], "AcuteToxicity": [ "Substance", "CAS", "Route", "Endpoint conclusion", "Dose descriptor", "Effect level", "Extraction_Timestamp", ], } if scrapingType == "RepeatedDose": # Iterate through top-level sections (excluding 'Key value for chemical safety assessment') for toxicity_type, routes in cleaned_json.items(): if toxicity_type == "Key value for chemical safety assessment": continue # Iterate through routes within each toxicity type for route, details in routes.items(): row = {"Toxicity Type": toxicity_type, "Route": route} # Add details to the row, excluding 'Link to relevant study record(s)' row.update( { k: v for k, v in details.items() if k != "Link to relevant study record(s)" } ) rows.append(row) elif scrapingType == "AcuteToxicity": for toxicity_type, routes in cleaned_json.items(): if ( toxicity_type == "Key value for chemical safety assessment" or not routes ): continue row = { "Route": toxicity_type.replace("Acute toxicity: via", "") .replace("route", "") .strip() } # Add details directly from the routes dictionary row.update( { k: v for k, v in routes.items() if k != "Link to relevant study record(s)" } ) rows.append(row) # Create DataFrame df = pd.DataFrame(rows) # Last moment fixes. Per forzare uno schema fair_columns = list(set(schema["RepeatedDose"] + schema["AcuteToxicity"])) df = df = df.loc[:, df.columns.intersection(fair_columns)] return df def save_dataframe(df, file_path, scrapingType, schema): """ Save DataFrame with strict column requirements. Args: df (pd.DataFrame): DataFrame to potentially append file_path (str): Path of CSV file """ # Mandatory columns for saved DataFrame saved_columns = schema[scrapingType] # Check if input DataFrame has at least Dose Descriptor and Effect Level if not all(col in df.columns for col in ["Effect level"]): return # If file exists, read it to get saved columns if os.path.exists(file_path): existing_df = pd.read_csv(file_path) # Reindex to match saved columns, filling missing with NaN df = df.reindex(columns=saved_columns) else: # If file doesn't exist, create DataFrame with saved columns df = df.reindex(columns=saved_columns) df = df[df["Effect level"].isna() == False] # Ignoro le righe che non hanno valori per Effect Level # Append or save the DataFrame df.to_csv( file_path, mode="a" if os.path.exists(file_path) else "w", header=not os.path.exists(file_path), index=False, ) def echaExtract( substance: str, scrapingType: str, outputType="df", key_infos=False, local_search=False, local_only = False ): """ Funzione principale per scrapare dal sito ECHA. Mette insieme tante funzioni diverse di ricerca, estrazione e pulizia. Registra il logging delle operazioni. Args: substance (str): CAS o nome della sostanza. Vanno bene entrambi ma il CAS funziona meglio. scrapingType (str): 'AcuteToxicity' (LD50) o 'RepeatedDose' (NOAEL) outputType (str): 'pd.DataFrame' o 'json' (sconsigliato) key_infos (bool): Di base True. Specifica se cercare la sezione "Description of Key Information" nei dossiers. Certe sostanze hanno i dati inseriti a cazzo e mettono le informazioni lì in forma discorsiva al posto che altrove. Output: un dataframe o un json, f"Non esistono lead dossiers attivi o inattivi per {substance}" """ # se local_search = True tento una ricerca in locale. Altrimenti la provo online. if local_search and local_echa: result = echaExtract_local(substance, scrapingType, key_infos) if not result.empty: logging.info( f"echa.echaProcess.echaExtract(): Found local data for {scrapingType}, {substance}. Returning it." ) return result elif result.empty: logging.info( f"echa.echaProcess.echaExtract(): Have not found local data for {scrapingType}, {substance}. Continuining." ) if local_only: logging.info(f'echa.echaProcess.echaExtract(): No data found in local-only search for {substance}, {scrapingType}') return f'No data found in local-only search for {substance}, {scrapingType}' try: # con search_dossier trovo le informazioni relative al dossiers cercando sul sito echa la sostanza fornita. links = search_dossier(substance) if not links: logging.info( f'echaProcess.echaExtract(). no active or unactive lead dossiers for: "{substance}". Ending extraction.' ) return f"Non esistono lead dossiers attivi o inattivi per {substance}" # Se non esistono LEAD dossiers (quelli con i riassunti tossicologici) attivi o inattivi # LEAD dossiers: riassumono le informazioni di un po' di tutti gli altri dossier, sono quelli completi dove c'erano le info necessarie # Se esistono, apro la pagina che mi interessa ('Acute Toxicity' o 'Repeated Dose') if not scrapingType in list(links.keys()): logging.info( f'echaProcess.echaExtract(). No page for "{scrapingType}", "{substance}"' ) return f'No data in "{scrapingType}", "{substance}". Page does not exist.' soup = openEchaPage(link=links[scrapingType]) logging.info( f"echaProcess.echaExtract(). soupped '{scrapingType}' echa page for '{substance}'" ) # Piglio la sezione che mi serve try: sezione = soup.find( "section", class_="KeyValueForChemicalSafetyAssessment", attrs={"data-cy": "das-block"}, ) except: logging.error( f'echaProcess.echaExtract(). could not extract the "section" for "{scrapingType}" for "{substance}"', exc_info=True, ) # Per ottenere il timestamp attuale now = datetime.now() # UPDATE. Cerco le key infos: recupera quel testo di summary generale key_infos_faund = False if key_infos: try: key_infos = soup.find( "section", class_="KeyInformation", attrs={"data-cy": "das-block"}, ) if key_infos: key_infos = key_infos.find( "div", class_="das-field_value das-field_value_html", ) key_infos = key_infos.text key_infos = key_infos if key_infos.strip() != "[Empty]" else None if key_infos: key_infos_faund = True logging.info( f"echaProcess.echaExtract(). Extracted key_infos from '{scrapingType}' echa page for '{substance}': {key_infos}" ) key_infos_df = pd.DataFrame(index=[0]) key_infos_df["key_information"] = key_infos key_infos_df = df_wrapper( df=key_infos_df, rmlName=links["rmlName"], rmlCas=links["rmlCas"], timestamp=now.strftime("%Y-%m-%d"), dossierType=links["dossierType"], # attivo o inattivo?? da verificare page=scrapingType, # repeated dose o acute toxicity linkPage=links[scrapingType], # i link al dossier di repeated dose o acute toxicity key_infos=True, ) else: logging.error( f'echaProcess.echaExtract() > echaProcess.echaPage_to_md() ERROR. could not extract key_infos for "{scrapingType}", "{substance}"' ) else: logging.error( f'echaProcess.echaExtract() > echaProcess.echaPage_to_md() ERROR. could not extract key_infos for "{scrapingType}", "{substance}"' ) except: logging.error( f'echaProcess.echaExtract() > echaProcess.echaPage_to_md() ERROR. could not extract key_infos for "{scrapingType}", "{substance}"', exc_info=True, ) try: if not sezione: # la sezione principale che viene scrapata logging.error( f'echaProcess.echaExtract() > echaProcess.echaPage_to_md() Empty section for the html > markdown conversion. No data for "{scrapingType}", "{substance}"' ) if not key_infos_faund: # Se non ci sono dati ma ci sono le key informations ritorno quelle return f'No data in "{scrapingType}", "{substance}"' else: return key_infos_df # Trasformo la sezione html in markdown output = echaPage_to_md( sezione, scrapingType=scrapingType, substance=substance ) logging.info( f'echaProcess.echaExtract() > echaProcess.echaPage_to_md() OK. created MD for "{scrapingType}", "{substance}"' ) # Ci sono rari casi in cui proprio non esistono pagine per l'acute toxicity o la repeated dose. In quel caso output sarà vuoto e darà errore # logging.info(output) except: logging.error( f'echaProcess.echaExtract() > echaProcess.echaPage_to_md() ERROR. could not MD for "{scrapingType}", "{substance}"', exc_info=True, ) try: # Trasformo il markdown nel primo json raw jsonified = markdown_to_json_raw( output, scrapingType=scrapingType, substance=substance ) logging.info( f'echaProcess.echaExtract() > echaProcess.markdown_to_json_raw() OK. created initial json for "{scrapingType}", "{substance}"' ) except: logging.error( f'echaProcess.echaExtract() > echaProcess.markdown_to_json_raw() ERROR. could not create initial json for "{scrapingType}", "{substance}"', exc_info=True, ) json_data = json.loads(jsonified) try: # Secondo step per il processing del json: pulisco i dizionari piu' innestati cleaned_data = clean_json(json_data) logging.info( f'echaProcess.echaExtract() > echaProcess.clean_json() OK. cleaned the json for "{scrapingType}", "{substance}"' ) # Se cleaned_data è vuoto vuol dire che non ci sono dati if not cleaned_data: logging.error( f'echaProcess.echaExtract() > echaProcess.clean_json() Empty cleaned_json. No data for "{scrapingType}", "{substance}"' ) if not key_infos_faund: # Se non ci sono dati ma ci sono le key informations ritorno quelle return f'No data in "{scrapingType}", "{substance}"' else: return key_infos_df except: logging.error( f'echaProcess.echaExtract() > echaProcess.clean_json() ERROR. cleaning the json for "{scrapingType}", "{substance}"' ) # Se si vuole come output il dataframe creo un dataframe e ci aggiungo un timestamp try: df = json_to_dataframe(cleaned_data, scrapingType) df = df_wrapper( df=df, rmlName=links["rmlName"], rmlCas=links["rmlCas"], timestamp=now.strftime("%Y-%m-%d"), dossierType=links["dossierType"], page=scrapingType, linkPage=links[scrapingType], ) if outputType == "df": logging.info( f'echaProcess.echaExtract(). succesfully extracted "{scrapingType}", "{substance}". Returning df' ) # Se l'utente vuole le key infos e le key_infos sono state trovate unisco i due df return df if not key_infos_faund else pd.concat([key_infos_df, df]) elif outputType == "json": if key_infos_faund: df = pd.concat([key_infos_df, df]) jayson = df.to_json(orient="records", force_ascii=False) logging.info( f'echaProcess.echaExtract(). succesfully extracted "{scrapingType}", "{substance}". Returning json' ) return jayson except KeyError: # Per gestire le pagine di merda che hanno solo "no information available" if key_infos_faund: return key_infos_df json_output = list(cleaned_data[list(cleaned_data.keys())[0]].values()) if json_output == ["no information available" for elem in json_output]: logging.info( f"echaProcess.echaExtract(). No data found for {scrapingType} for {substance}" ) return f'No data in "{scrapingType}", "{substance}"' else: logging.error( f"echaProcess.json_to_dataframe(). Could not create dataframe" ) cleaned_data["error"] = ( "Non sono riuscito a creare il dataframe, probabilmente non ci sono abbastanza informazioni. Ritorno il JSON" ) return cleaned_data except Exception: logging.error( f"echaProcess.echaExtract() ERROR. Something went wrong, not quite sure what.", exc_info=True, ) def df_wrapper( df, rmlName, rmlCas, timestamp, dossierType, page, linkPage, key_infos=False ): # Un semplice metodo per aggiungere tutta la roba che ci serve al dataframe. # Per non intasare echaExtract che già di suo è un figa di bordello df.insert(0, "Substance", rmlName) df.insert(1, "CAS", rmlCas) df["Extraction_Timestamp"] = timestamp df = df.replace("\n", "", regex=True) if not key_infos: df = df[df["Effect level"].isnull() == False] # Aggiungo il link del dossier e lo status df["dossierType"] = dossierType df["page"] = page df["linkPage"] = linkPage return df def echaExtract_specific( CAS: str, scrapingType="RepeatedDose", doseDescriptor="NOAEL", route="inhalation", local_search=False, local_only=False ): """ Dato un CAS cerca di trovare il dose descriptor (di base NOAEL) per la route specificata (di base 'inhalation'). Args: CAS (str): il cas o in alternativa la sostanza route (str): 'inhalation', 'oral', 'dermal'. Di base 'inhalation' scrapingType (str): la pagina su cui cercarlo doseDescriptor (str): il tipo di valore da ricercare (NOAEL, DNEL, LD50, LC50) """ # Tento di estrarre result = echaExtract( substance=CAS, scrapingType=scrapingType, outputType="df", local_search=local_search, local_only=local_only ) # Il risultato è un dataframe? if type(result) == pd.DataFrame: # Se sì, lo filtro per ciò che mi interessa filtered_df = result[ (result["Route"] == route) & (result["Dose descriptor"] == doseDescriptor) ] # Se non è vuoto lo ritorno if not filtered_df.empty: return filtered_df else: return f'Non ho trovato {doseDescriptor} in {scrapingType} con route "{route}" per {CAS}' elif type(result) == dict and result["error"]: # Questo significa che gli è arrivato qualche json con un errore return f'Non ho trovato {doseDescriptor} in {scrapingType} con route "{route}" per {CAS}' # Questo significa che ha ricevuto un "Non esistono" come risultato. Non esistono lead dossiers attivi o inattivi per la sostanza ricercata elif result.startswith("Non esistono"): return result def echa_noael_ld50(CAS: str, route="inhalation", outputType="df", local_search=False, local_only=False): """ Dato un CAS cerca di trovare il NOAEL per la route specificata (di base 'inhalation'). Se non esiste la pagina RepeatedDose con il NOAEL fa ritornare l'LD50 per quella route. Args: CAS (str): il cas o in alternativa la sostanza route (str): 'inhalation', 'oral', 'dermal'. Di base 'inhalation' outputType (str) = 'df', 'json'. Il tipo di output """ if route not in ["inhalation", "oral", "dermal"] and outputType not in [ "df", "json", ]: return "invalid input" # Di base cerco di scrapare la pagina "Repeated Dose" first_attempt = echaExtract_specific( CAS=CAS, scrapingType="RepeatedDose", doseDescriptor="NOAEL", route=route, local_search=local_search, local_only=local_only ) if isinstance(first_attempt, pd.DataFrame): return first_attempt elif isinstance(first_attempt, str) and first_attempt.startswith("Non ho trovato"): second_attempt = echaExtract_specific( CAS=CAS, scrapingType="AcuteToxicity", doseDescriptor="LD50", route=route, local_search=True, local_only=local_only ) if isinstance(second_attempt, pd.DataFrame): return second_attempt elif isinstance(second_attempt, str) and second_attempt.startswith( "Non ho trovato" ): return second_attempt.replace("LD50", "NOAEL ed LD50") elif first_attempt.startswith("Non esistono"): return first_attempt def echa_noael_ld50_multi( casList: list, route="inhalation", messages=False, local_search=False, local_only=False ): """ Metodo abbastanza semplice. Data una lista di cas esegue echa_noael_ld50. Quindi cerca i NOAEL per la route desiderata o gli LD50 se non trova i NOAEL. L'output è un df per le sostanze che trova e una lista di messaggi per quelle che non trova. Args: casList (list): la lista di CAS route (str): 'inhalation', 'oral', 'dermal'. Di base 'inhalation' messages (boolean) = True o False. Con True fa ritornare una lista. Il primo elemento sarà il dataframe, il secondo la lista di messaggi per le sostanze non trovate. Di base è False e fa ritornare solo il dataframe. """ messages_list = [] df = pd.DataFrame() for CAS in casList: output = echa_noael_ld50( CAS=CAS, route=route, outputType="df", local_search=local_search, local_only=local_only ) if isinstance(output, str): messages_list.append(output) elif isinstance(output, pd.DataFrame): df = pd.concat([df, output], ignore_index=True) df.dropna(axis=1, how="all", inplace=True) if messages and df.empty: messages_list.append( f'Non sono riuscito a trovare nessun NOAEL o LD50 per i cas per la route "{route}"' ) return [None, messages_list] elif messages and not df.empty: return [df, messages_list] elif not df.empty and not messages: return df elif df.empty and not messages: return f'Non sono riuscito a trovare nessun NOAEL o LD50 per i cas per la route "{route}"' def echaExtract_multi( casList: list, scrapingType="all", local=False, local_path=None, log_path=None, debug_print=False, error=False, error_path=None, key_infos=False, local_search=False, local_only=False, filter = None ): """ Data una lista di CAS cerca di estrarre tutte le pagine con le Repeated Dose, tutte le pagine con l'AcuteToxicity, o entrambe Args: casList (list): la lista di CAS scrapingType (str): 'RepeatedDose', 'AcuteToxicity', 'all' local (boolean): Se impostato su True questo parametro salva sul disco in maniera progressiva, appendendo ogni result man mano che li trova. è necessario per lo scraping su larga scala log_path (str): il path per il log da fillare durante lo scraping di massa debug_print (bool): per avere il printing durante lo scraping. per verificare l'avanzamento error (bool): Per far ritornare la lista degli errori una volta scrapato Output: pd.Dataframe """ cas_len = len(casList) i = 0 df = pd.DataFrame() if scrapingType == "all": scrapingTypeList = ["RepeatedDose", "AcuteToxicity"] else: scrapingTypeList = [scrapingType] logging.info( f"echa.echaExtract_multi(). Commencing mass extraction of {scrapingTypeList} for {casList}" ) errors = [] for cas in casList: for scrapingType in scrapingTypeList: extraction = echaExtract( substance=cas, scrapingType=scrapingType, outputType="df", key_infos=key_infos, local_search=local_search, local_only=local_only ) if isinstance(extraction, pd.DataFrame) and not extraction.empty: status = "successful_scrape" logging.info( f"echa.echaExtract_multi(). Succesfully scraped {scrapingType} for {cas}" ) df = pd.concat([df, extraction], ignore_index=True) if local and local_path: df.to_csv(local_path, index=False) elif ( (isinstance(extraction, pd.DataFrame) and extraction.empty) or (extraction is None) or (isinstance(extraction, str) and extraction.startswith("No data")) ): status = "no_data_found" logging.info( f"echa.echaExtract_multi(). Found no data for {scrapingType} for {cas}" ) elif isinstance(extraction, dict): if extraction["error"]: status = "df_creation_error" errors.append(extraction) logging.info( f"echa.echaExtract_multi(). Df creation error for {scrapingType} for {cas}" ) elif isinstance(extraction, str) and extraction.startswith("Non esistono"): status = "no_lead_dossiers" logging.info( f"echa.echaExtract_multi(). Found no lead dossiers for {cas}" ) else: status = "unknown_error" logging.error( f"echa.echaExtract_multi(). Unknown error for {scrapingType} for {cas}" ) if log_path: fill_log(cas, status, log_path, scrapingType) if debug_print: print(f"{i}: {cas}, {scrapingType}") i += 1 if error and errors and error_path: with open(error_path, "w") as json_file: json.dump(errors, json_file, indent=4) # Questa è la mossa che mi permette di eliminare 4 metodi if filter: df = filter_dataframe_by_dict(df, filter) return df def fill_log(cas: str, status: str, log_path: str, scrapingType: str): """ Funzione usata durante lo scraping di massa per fillare un log mentre estraggo le sostanze """ df = pd.read_csv(log_path) df.loc[df["casNo"] == cas, f"scraping_{scrapingType}"] = status df.loc[df["casNo"] == cas, "timestamp"] = datetime.now().strftime("%Y-%m-%d") df.to_csv(log_path, index=False) def echaExtract_local(substance:str, scrapingType:str, key_infos=False): if not key_infos: query = f""" SELECT * FROM echa_full_scraping WHERE CAS = '{substance}' AND page = '{scrapingType}' AND key_information IS NULL; """ elif key_infos: query = f""" SELECT * FROM echa_full_scraping WHERE CAS = '{substance}' AND page = '{scrapingType}'; """ result = con.sql(query).df() return result def filter_dataframe_by_dict(df, filter_dict): """ Filters a Pandas DataFrame based on a dictionary. Args: df (pd.DataFrame): The input DataFrame. filter_dict (dict): A dictionary where keys are column names and values are lists of allowed values for that column. Returns: pd.DataFrame: A new DataFrame containing only the rows that match the filter criteria. """ filter_condition = pd.Series(True, index=df.index) # Initialize with all True to start filtering for column_name, allowed_values in filter_dict.items(): if column_name in df.columns: # Check if the column exists in the DataFrame column_filter = df[column_name].isin(allowed_values) # Create a boolean Series for the current column filter_condition = filter_condition & column_filter # Combine with existing condition using 'and' else: print(f"Warning: Column '{column_name}' not found in the DataFrame. Filter for this column will be ignored.") filtered_df = df[filter_condition] # Apply the combined filter condition return filtered_df