Compare commits

..

No commits in common. "2da0f315c62eac47424651c90899fe57f08b8966" and "398e2d2d0067c50c1eaaa002cfc324c9ca5854cd" have entirely different histories.

14 changed files with 956 additions and 13613 deletions

119
app.py
View file

@ -1,10 +1,8 @@
import re import re
import extra_streamlit_components as stx
import streamlit as st import streamlit as st
from functions import check_auth, do_logout from functions_ui import search_cas_inci
from functions_ui import search_cas_inci, search_cir, show_login_page
# Configure page # Configure page
st.set_page_config( st.set_page_config(
@ -13,45 +11,69 @@ st.set_page_config(
layout="wide" layout="wide"
) )
st.session_state["_cm"] = stx.CookieManager(key="pif_cookies") # una sola volta per rerun # Password protection
def check_password():
"""Returns `True` if the user had the correct password."""
if not check_auth(): def password_entered():
show_login_page() """Checks whether a password entered by the user is correct."""
if st.session_state["password"] == st.secrets["passwords"]["app_password"]:
st.session_state["password_correct"] = True
del st.session_state["password"] # Don't store password
else:
st.session_state["password_correct"] = False
# First run, show input for password
if "password_correct" not in st.session_state:
st.text_input(
"Password", type="password", on_change=password_entered, key="password"
)
return False
# Password not correct, show input + error
elif not st.session_state["password_correct"]:
st.text_input(
"Password", type="password", on_change=password_entered, key="password"
)
st.error("😕 Password incorrect")
return False
# Password correct
else:
return True
if not check_password():
st.stop() st.stop()
# Define home page function # Define home page function
def home(): def home():
name = st.session_state.get("user_name") or st.session_state.get("user_email") or ""
greeting = f"Benvenuto, {name}" if name else "Benvenuto"
st.title("LMB App: PIF & Database Tossicologico") st.title("LMB App: PIF & Database Tossicologico")
st.caption(greeting)
if "selected_cas" not in st.session_state: # Inizializza session_state per il CAS number se non esiste
if 'selected_cas' not in st.session_state:
st.session_state.selected_cas = None st.session_state.selected_cas = None
if "selected_inci" not in st.session_state:
st.session_state.selected_inci = None
with st.container(border=True): with st.container(border=True):
col_left, col_right = st.columns([2, 1]) col_left, col_right = st.columns([2, 1])
with col_left: with col_left:
search_type = st.radio("Cerca per:", ("CAS", "INCI"), index=0, key="search_mode", horizontal=True) type = st.radio("Cerca per:", ("CAS", "INCI"), index=0, key="search_mode", horizontal=True)
search_input = st.text_input("Inserisci:", "") input = st.text_input("Inserisci:", "")
with col_right: with col_right:
real_time = st.checkbox("Ricerca online", value=False, key="real_time_search") real_time = st.checkbox("Ricerca online", value=False, key="real_time_search")
force_refresh = st.checkbox("Aggiorna Ingrediente", value=False) force_refresh = st.checkbox("Aggiorna Ingrediente", value=False)
search_cir_enabled = st.checkbox("Cerca in CIR", value=False, key="search_cir")
if search_input: if input:
st.caption(f"Ricerca per {search_input}: trovati i seguenti ingredienti.") st.caption(f"Ricerca per {input}: trovati i seguenti ingredienti.")
results = search_cas_inci(search_input, type="cas" if search_type == "CAS" else "inci") if type == "CAS":
results = search_cas_inci(input, type='cas')
else:
results = search_cas_inci(input, type='inci')
if results: if results:
display_options = [f"{cas} - {inci}" for cas, inci in results] display_options = [f"{cas} - {inci}" for cas, inci in results]
selected_display = st.selectbox("Risultati", options=[""] + display_options, key="cas_selectbox") selected_display = st.selectbox("Risultati", options=[""] + display_options, key="cas_selectbox")
if selected_display: if selected_display and selected_display != "":
found_cas = re.findall(r"\b\d{2,7}-\d{2}-\d\b", selected_display) cas_pattern = r'\b\d{2,7}-\d{2}-\d\b'
found_cas = re.findall(cas_pattern, selected_display)
unique_cas = list(dict.fromkeys(found_cas)) unique_cas = list(dict.fromkeys(found_cas))
if not unique_cas: if not unique_cas:
@ -59,22 +81,20 @@ def home():
selected_cas = None selected_cas = None
elif len(unique_cas) == 1: elif len(unique_cas) == 1:
selected_cas = unique_cas[0] selected_cas = unique_cas[0]
#st.info(f"CAS rilevato: {selected_cas}")
else: else:
selected_cas = st.selectbox( selected_cas = st.selectbox(
label="Sono stati rilevati più CAS. Selezionane uno:", label="Sono stati rilevati più CAS. Selezionane uno:",
options=unique_cas options=unique_cas
) )
if selected_cas: if selected_cas:
st.session_state.selected_cas = selected_cas st.session_state.selected_cas = selected_cas
match = next((inci for cas, inci in results if cas == selected_cas), None) #st.success(f"CAS selezionato: {selected_cas}")
st.session_state.selected_inci = match
else: else:
st.warning("Nessun risultato trovato nel database.") st.warning("Nessun risultato trovato nel database.")
if st.button("Usa questo CAS") and search_type == "CAS": if st.button("Usa questo CAS") and type == "CAS":
st.session_state.selected_cas = search_input.strip() st.session_state.selected_cas = input.strip()
st.session_state.selected_inci = None st.success(f"CAS salvato: {input}")
st.success(f"CAS salvato: {search_input}")
else: else:
st.info("INCI non trovato, cerca per CAS o modifica l'input.") st.info("INCI non trovato, cerca per CAS o modifica l'input.")
@ -88,17 +108,7 @@ def home():
st.session_state.force_refresh = force_refresh st.session_state.force_refresh = force_refresh
# CIR search results
if search_cir_enabled and search_input:
st.markdown("---")
st.markdown("**Rapporti CIR:**")
cir_results = search_cir(search_input)
if cir_results:
for name, inci, url in cir_results:
label = name if name == inci else f"{name} ({inci})"
st.markdown(f"- [{label}]({url})")
else:
st.caption("Nessun rapporto CIR trovato.")
# Guide section # Guide section
st.divider() st.divider()
@ -158,24 +168,8 @@ def home():
# Changelog section # Changelog section
with st.expander("📝 Registro degli aggiornamenti"): with st.expander("📝 Registro degli aggiornamenti"):
# Placeholder for future versions
st.markdown(""" st.markdown("""
### v1.0
- Aggiunta autenticazione con mail e password
- Modificato il backend per maggiore sicurezza
- Ottimizzazioni generali e refactoring del codice
- Aggiunto form per segnalare errori
- Migliorato l'algoritmo per scegliere l'indicatore tossicologico migliore:
* Ora si preferiscono indicatori con route più rilevante per il MoS (es. NOAEL dermal > NOAEL orale)
* In caso di più indicatori con stesso endpoint, si sceglie quello con la dose più bassa (solo per NOAEL/LOAEL)
### v0.9 - 2026-03-01
- Ottimizzazione della pagina 'Ingrediente', risolto un bug che impediva di caricare i dati CosIng
- Modificato la validazione dei dati in esposizione
- Creata una pagina di impostazione per:
* Creazione di parametri custom di tossicologia per un ingrediente
* Cancellazione di clienti (e relativi dati)
* Visualizzare gli ingredienti salvati nel database
### v0.8 ### v0.8
*v0.8.0 - 2026-02-22 *v0.8.0 - 2026-02-22
- Versione iniziale (v0.8.0) rilasciata per test interno e feedback - Versione iniziale (v0.8.0) rilasciata per test interno e feedback
@ -229,23 +223,18 @@ def home():
# Navigation # Navigation
home_page = st.Page(home, title="Cerca", icon="🏠", default=True) home_page = st.Page(home, title="Cerca", icon="🏠", default=True)
echa_page = st.Page("pages/echa.py", title="ECHA (legacy)", icon="🧪") echa_page = st.Page("pages/echa.py", title="ECHA Database", icon="🧪")
#cosing_page = st.Page("pages/cosing.py", title="CosIng", icon="💄")
#dap_page = st.Page("pages/dap.py", title="DAP", icon="🧬")
exposition_page = st.Page("pages/exposition_page.py", title="Esposizione", icon="☀️") exposition_page = st.Page("pages/exposition_page.py", title="Esposizione", icon="☀️")
ingredients_page = st.Page("pages/ingredients_page.py", title="Ingrediente", icon="📋") ingredients_page = st.Page("pages/ingredients_page.py", title="Ingrediente", icon="📋")
order_page = st.Page("pages/order_page.py", title="Nuovo PIF", icon="🛒") order_page = st.Page("pages/order_page.py", title="Nuovo PIF", icon="🛒")
list_orders = st.Page("pages/list_orders.py", title="Ordini PIF", icon="📦") list_orders = st.Page("pages/list_orders.py", title="Ordini PIF", icon="📦")
settings_page = st.Page("pages/settings_page.py", title="Impostazioni", icon="⚙️")
ticket_page = st.Page("pages/ticket.py", title="Segnala problema", icon="🚩")
pg = st.navigation({ pg = st.navigation({
"Ricerca": [home_page, ingredients_page], "Ricerca": [home_page, ingredients_page],
"PIF": [list_orders, order_page, exposition_page], "PIF": [list_orders, order_page, exposition_page],
"Utilità": [echa_page, settings_page, ticket_page], "Online": [echa_page],
}) })
with st.sidebar:
if st.button("Esci", use_container_width=True):
do_logout()
st.rerun()
pg.run() pg.run()

File diff suppressed because it is too large Load diff

View file

@ -1,632 +1,71 @@
import re import json
import time from typing import Any, Dict, List, Union
from typing import Any, Dict, Optional
import extra_streamlit_components as stx
import pandas as pd
import requests import requests
import pandas as pd
import streamlit as st import streamlit as st
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
API_BASE = "http://localhost:8000/api/v1"
AUTH_BASE = "http://localhost:8000/api/v1"
CAS_PATTERN = re.compile(r"^\d{2,7}-\d{2}-\d$")
PERCENTAGE_TARGET = 100.0
PERCENTAGE_TOLERANCE = 0.01
WATER_INCI = {"aqua", "water", "eau", "aqua/water", "water/aqua", "aqua/eau", "acqua"}
STATUS_MAP = {
1: ("Ricevuto", "🔵"),
2: ("Validato", "🟡"),
3: ("Compilazione", "🟠"),
5: ("Arricchito", "🟢"),
6: ("Calcolo", "🔵"),
8: ("Completato", ""),
9: ("Errore", "🔴"),
}
# ---------------------------------------------------------------------------
# General utilities
# ---------------------------------------------------------------------------
def is_water_inci(inci_value: str) -> bool:
"""Controlla se l'INCI corrisponde ad acqua o varianti."""
if not inci_value or not isinstance(inci_value, str):
return False
return inci_value.strip().lower() in WATER_INCI
def status_label(stato_ordine: int) -> str:
"""Ritorna label con emoji per lo stato."""
name, emoji = STATUS_MAP.get(stato_ordine, ("Sconosciuto", ""))
return f"{emoji} {name}"
def make_empty_ingredient_df(n: int = 5) -> pd.DataFrame:
"""Crea un DataFrame vuoto per la tabella ingredienti."""
return pd.DataFrame({
"inci": [""] * n,
"cas": [""] * n,
"percentage": [0.0] * n,
"is_colorante": [False] * n,
"skip_tox": [False] * n,
})
# ---------------------------------------------------------------------------
# Cookie
# ---------------------------------------------------------------------------
_COOKIE_RT = "pif_rt"
_COOKIE_MAX_AGE = 7 * 24 * 3600 # 7 giorni in secondi
def get_cookie_manager() -> stx.CookieManager:
"""Ritorna il CookieManager della sessione corrente (creato una volta sola per rerun in app.py)."""
return st.session_state.get("_cm")
# ---------------------------------------------------------------------------
# Auth
# ---------------------------------------------------------------------------
def do_login(email: str, password: str) -> bool:
"""Chiama l'endpoint di login e salva i token in session_state. Ritorna True se ok."""
try:
resp = requests.post(
f"{AUTH_BASE}/auth/login",
json={"email": email, "password": password},
timeout=10,
)
except requests.RequestException as e:
st.error(f"Errore di rete: {e}")
return False
if resp.status_code == 401:
st.error("Credenziali non valide.")
return False
if resp.status_code != 200:
st.error(f"Errore del server ({resp.status_code}). Riprova più tardi.")
return False
data = resp.json()
st.session_state["access_token"] = data["access_token"]
st.session_state["refresh_token"] = data["refresh_token"]
st.session_state["expires_at"] = time.time() + data["expires_in"] - 30
get_cookie_manager().set(_COOKIE_RT, data["refresh_token"], max_age=_COOKIE_MAX_AGE, key="set_rt_login")
_fetch_user_info()
return True
def do_refresh() -> bool:
"""Rinnova l'access_token. Ritorna False se il refresh fallisce (forza re-login)."""
try:
resp = requests.post(
f"{AUTH_BASE}/auth/refresh",
json={"refresh_token": st.session_state.get("refresh_token", "")},
timeout=10,
)
except requests.RequestException:
return False
if resp.status_code != 200:
return False
data = resp.json()
st.session_state["access_token"] = data["access_token"]
st.session_state["refresh_token"] = data["refresh_token"]
st.session_state["expires_at"] = time.time() + data["expires_in"] - 30
get_cookie_manager().set(_COOKIE_RT, data["refresh_token"], max_age=_COOKIE_MAX_AGE, key="set_rt_refresh")
return True
def do_logout():
"""Chiama il logout sull'API e pulisce la session_state."""
refresh_token = st.session_state.get("refresh_token", "")
if refresh_token:
try:
requests.post(
f"{AUTH_BASE}/auth/logout",
json={"refresh_token": refresh_token},
timeout=5,
)
except requests.RequestException:
pass
for key in ["access_token", "refresh_token", "expires_at", "user_name", "user_email", "user_id"]:
st.session_state.pop(key, None)
get_cookie_manager().delete(_COOKIE_RT, key="del_rt_logout")
def check_auth() -> bool:
"""Ritorna True se l'utente è autenticato e il token è valido.
Se il token è assente dalla session_state tenta il restore dal cookie.
Se il token è scaduto tenta il refresh."""
if "access_token" in st.session_state:
if time.time() < st.session_state.get("expires_at", 0):
return True
if do_refresh():
return True
do_logout()
return False
# Nessun token in memoria: prova il restore dal cookie
cm = get_cookie_manager()
rt = cm.get(_COOKIE_RT)
if rt:
st.session_state["refresh_token"] = rt
if do_refresh():
_fetch_user_info()
return True
# Cookie scaduto o invalidato: rimuovilo
cm.delete(_COOKIE_RT, key="del_rt_expired")
return False
def _auth_headers() -> dict:
"""Ritorna gli headers Authorization con il Bearer token corrente."""
token = st.session_state.get("access_token", "")
return {"Authorization": f"Bearer {token}"}
def _fetch_user_info() -> None:
"""Chiama /auth/me e salva email e nome in session_state."""
try:
resp = requests.get(f"{AUTH_BASE}/auth/me", headers=_auth_headers(), timeout=5)
if resp.status_code == 200:
data = resp.json()
st.session_state["user_email"] = data.get("email")
st.session_state["user_name"] = data.get("name")
st.session_state["user_id"] = data.get("id")
except requests.RequestException:
pass
# ---------------------------------------------------------------------------
# Order validation and building
# ---------------------------------------------------------------------------
def validate_order(
client_name: str,
product_name: str,
selected_preset: str | None,
df: pd.DataFrame,
) -> list[str]:
"""Valida tutti i campi dell'ordine. Ritorna lista di errori (vuota = valido)."""
errors = []
if not client_name or not client_name.strip():
errors.append("Il campo 'Nome del cliente' e obbligatorio.")
if not product_name or not product_name.strip():
errors.append("Il campo 'Nome del prodotto' e obbligatorio.")
if not selected_preset:
errors.append("Selezionare un preset di esposizione.")
str_df = df.copy()
str_df["inci"] = str_df["inci"].fillna("")
str_df["cas"] = str_df["cas"].fillna("")
active = str_df[(str_df["cas"].str.strip() != "") | (str_df["percentage"] > 0)]
if active.empty:
errors.append("Inserire almeno un ingrediente.")
return errors
for idx, row in active.iterrows():
cas_val = row["cas"].strip()
inci_val = row["inci"].strip()
is_col = bool(row["is_colorante"])
if is_water_inci(inci_val) and cas_val == "":
continue
if is_col:
continue
if cas_val == "":
errors.append(f"Riga {idx + 1}: inserire un CAS number oppure selezionare 'Colorante'.")
continue
if not CAS_PATTERN.match(cas_val):
hint = f" ({inci_val})" if inci_val else ""
errors.append(
f"Formato CAS non valido alla riga {idx + 1}{hint}: '{cas_val}'. "
"Formato atteso: XX-XX-X (es. 56-81-5)."
)
total_pct = active["percentage"].sum()
if abs(total_pct - PERCENTAGE_TARGET) > PERCENTAGE_TOLERANCE:
errors.append(
f"La somma delle percentuali e {total_pct:.6f}%, "
f"ma deve essere 100% (tolleranza +/- {PERCENTAGE_TOLERANCE}%)."
)
return errors
def build_order_payload(
client_name: str,
product_name: str,
preset_name: str,
df: pd.DataFrame,
) -> dict:
"""Costruisce il JSON dell'ordine a partire dai dati del form."""
str_df = df.copy()
str_df["inci"] = str_df["inci"].fillna("")
str_df["cas"] = str_df["cas"].fillna("")
active = str_df[(str_df["cas"].str.strip() != "") | (str_df["percentage"] > 0)]
ingredients = []
for _, row in active.iterrows():
inci_val = row["inci"].strip()
skip = bool(row["skip_tox"]) or is_water_inci(inci_val)
ingredients.append({
"inci": inci_val if inci_val else None,
"cas": row["cas"].strip(),
"percentage": round(float(row["percentage"]), 6),
"is_colorante": bool(row["is_colorante"]),
"skip_tox": skip,
})
return {
"client_name": client_name.strip(),
"product_name": product_name.strip(),
"preset_esposizione": preset_name,
"ingredients": ingredients,
}
# ---------------------------------------------------------------------------
# Ingredients API
# ---------------------------------------------------------------------------
def fetch_ingredient(cas: str, force: bool = False) -> tuple[bool, Any]:
"""Cerca un ingrediente per CAS. Ritorna (success, data) oppure (False, error_message)."""
try:
resp = requests.post(
f"{API_BASE}/ingredients/search",
json={"cas": cas, "force": force},
headers=_auth_headers(),
timeout=120,
)
result = resp.json()
if result.get("success") and result.get("data"):
return True, result["data"]
return False, result.get("error", f"Nessun dato per CAS {cas}")
except requests.ConnectionError:
return False, "Impossibile connettersi all'API. Verifica che il server sia attivo."
except Exception as e:
return False, str(e)
def add_tox_indicator(payload: dict) -> requests.Response:
"""Aggiunge un indicatore tossicologico custom."""
return requests.post(f"{API_BASE}/ingredients/add-tox-indicator", json=payload, headers=_auth_headers(), timeout=30)
def fetch_all_ingredients() -> dict:
"""Recupera tutti gli ingredienti dal database. Ritorna il JSON grezzo."""
resp = requests.get(f"{API_BASE}/ingredients/list", headers=_auth_headers(), timeout=10)
return resp.json()
def fetch_clients() -> list[dict]:
"""Recupera la lista clienti. Ritorna lista di {id_cliente, nome_cliente}."""
try:
resp = requests.get(f"{API_BASE}/ingredients/clients", headers=_auth_headers(), timeout=10)
data = resp.json()
if data.get("success") and data.get("data"):
return data["data"]
return []
except requests.ConnectionError:
return []
except Exception:
return []
def create_client(nome_cliente: str) -> bool:
"""Crea un nuovo cliente via API. Ritorna True se riuscito."""
try:
resp = requests.post(
f"{API_BASE}/ingredients/clients",
json={"nome_cliente": nome_cliente},
headers=_auth_headers(),
timeout=10,
)
return resp.json().get("success", False)
except Exception:
return False
def delete_client(nome: str) -> requests.Response:
"""Elimina un cliente per nome. Ritorna la Response grezza."""
return requests.delete(f"{API_BASE}/ingredients/clients/{nome}", headers=_auth_headers(), timeout=10)
# ---------------------------------------------------------------------------
# Esposizione / Presets API
# ---------------------------------------------------------------------------
def fetch_presets() -> list[str]:
"""Recupera i nomi dei preset di esposizione dall'API."""
try:
resp = requests.get(f"{API_BASE}/esposition/presets", headers=_auth_headers(), timeout=10)
data = resp.json()
if data.get("success") and data.get("data"):
return [p["preset_name"] for p in data["data"]]
return []
except requests.ConnectionError:
st.error("Impossibile connettersi all'API per caricare i preset. Verifica che il server sia attivo.")
return []
except Exception as e:
st.error(f"Errore nel caricamento dei preset: {e}")
return []
def fetch_all_presets() -> dict:
"""Recupera tutti i preset con dati completi. Ritorna il JSON grezzo."""
resp = requests.get(f"{API_BASE}/esposition/presets", headers=_auth_headers(), timeout=10)
return resp.json()
def create_exposition_preset(payload: dict) -> dict:
"""Crea un nuovo preset di esposizione. Ritorna il JSON della risposta."""
resp = requests.post(f"{API_BASE}/esposition/create", json=payload, headers=_auth_headers(), timeout=10)
return resp.json()
def delete_exposition_preset(name: str) -> dict:
"""Elimina un preset di esposizione per nome. Ritorna il JSON della risposta."""
resp = requests.delete(f"{API_BASE}/esposition/delete/{name}", headers=_auth_headers(), timeout=10)
return resp.json()
# ---------------------------------------------------------------------------
# Orders API
# ---------------------------------------------------------------------------
def fetch_orders() -> list:
"""Recupera la lista ordini dall'API."""
try:
resp = requests.get(f"{API_BASE}/orders/list", headers=_auth_headers(), timeout=15)
data = resp.json()
if data.get("success"):
return data.get("data", [])
return []
except requests.ConnectionError:
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo.")
return []
except Exception as e:
st.error(f"Errore nel caricamento degli ordini: {e}")
return []
def fetch_order_detail(id_ordine: int) -> Optional[dict]:
"""Recupera il dettaglio completo di un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/detail/{id_ordine}", headers=_auth_headers(), timeout=15)
data = resp.json()
if data.get("success"):
return data.get("order")
st.error(data.get("detail", "Errore nel recupero dettaglio ordine"))
return None
except requests.ConnectionError:
st.error("Impossibile connettersi all'API.")
return None
except Exception as e:
st.error(f"Errore: {e}")
return None
def api_retry_order(id_ordine: int) -> dict:
"""Chiama POST /orders/retry/{id_ordine}."""
try:
resp = requests.post(f"{API_BASE}/orders/retry/{id_ordine}", headers=_auth_headers(), timeout=15)
return resp.json()
except Exception as e:
return {"success": False, "error": str(e)}
def api_delete_order(id_ordine: int) -> dict:
"""Chiama DELETE /orders/{id_ordine}."""
try:
resp = requests.delete(f"{API_BASE}/orders/{id_ordine}", headers=_auth_headers(), timeout=15)
return resp.json()
except Exception as e:
return {"success": False, "error": str(e)}
def download_excel(id_ordine: int) -> Optional[bytes]:
"""Scarica il file Excel per un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/export/{id_ordine}", headers=_auth_headers(), timeout=120)
if resp.status_code == 200:
return resp.content
st.error(f"Errore download Excel: {resp.json().get('detail', resp.status_code)}")
return None
except Exception as e:
st.error(f"Errore download: {e}")
return None
def download_sources(id_ordine: int) -> Optional[bytes]:
"""Scarica lo ZIP delle fonti PDF per un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/export-sources/{id_ordine}", headers=_auth_headers(), timeout=300)
if resp.status_code == 200:
return resp.content
st.error(f"Errore download fonti: {resp.json().get('detail', resp.status_code)}")
return None
except Exception as e:
st.error(f"Errore download: {e}")
return None
# ---------------------------------------------------------------------------
# Segnalazioni (ticket)
# ---------------------------------------------------------------------------
def send_segnalazione(
page: str,
description: str,
priority: str,
cas: Optional[str] = None,
error: Optional[str] = None,
) -> bool:
"""Invia una segnalazione all'API. Ritorna True se salvata con successo."""
payload = {
"page": page,
"description": description,
"priority": priority,
}
if cas:
payload["cas"] = cas
if error:
payload["error"] = error
try:
resp = requests.post(
f"{API_BASE}/common/segnalazione",
json=payload,
headers=_auth_headers(),
timeout=10,
)
if resp.status_code == 200 and resp.json().get("success"):
return True
st.error(resp.json().get("detail", "Errore nell'invio della segnalazione"))
return False
except Exception as e:
st.error(f"Errore di connessione: {e}")
return False
# ---------------------------------------------------------------------------
# ECHA data extraction (for echa.py page)
# ---------------------------------------------------------------------------
def extract_tox_info_values(data: dict) -> list:
"""Extract DNEL values from toxicological information."""
rows = []
sections = data.get("toxicological_information", {}).get("sections", [])
for section in sections:
label = section.get("label", "")
if "subsections" in section:
for subsec in section["subsections"]:
effect_type = subsec.get("label", "")
if "subsections" in subsec:
for sub2 in subsec["subsections"]:
dose = sub2.get("StDose", {})
if isinstance(dose, dict) and dose.get("value"):
rows.append({
"Population/Route": label,
"Effect Type": effect_type,
"Exposure": sub2.get("label", ""),
"Assessment": sub2.get("HazardAssessment", ""),
"Value numerical": dose.get("value", ""),
"Unit": dose.get("unit", ""),
"Endpoint": sub2.get("MostSensitiveEndpoint", "")
})
return rows
def extract_acute_values(data: dict) -> list:
"""Extract acute toxicity values."""
rows = []
sections = data.get("acute_toxicity", {}).get("sections", [])
for section in sections:
if section.get("label") == "Key value for assessment":
for subsec in section.get("subsections", []):
if subsec.get("EffectLevelValue"):
rows.append({
"Route": subsec.get("label", "").replace("Acute toxicity: ", ""),
"Endpoint": subsec.get("EffectLevelUnit", ""),
"Value": subsec.get("EffectLevelValue", ""),
"Conclusion": subsec.get("EndpointConclusion", "")
})
return rows
def extract_repeated_values(data: dict) -> list:
"""Extract repeated dose toxicity values."""
rows = []
sections = data.get("repeated_dose_toxicity", {}).get("sections", [])
for section in sections:
if section.get("label") == "Key value for assessment":
for subsec in section.get("subsections", []):
study_type = subsec.get("label", "")
for sub2 in subsec.get("subsections", []):
if sub2.get("EffectLevelValue"):
rows.append({
"Study Type": study_type,
"Route": sub2.get("label", ""),
"Endpoint": sub2.get("EffectLevelUnit", ""),
"Value": sub2.get("EffectLevelValue", ""),
"Species": sub2.get("Species", "-")
})
return rows
# ---------------------------------------------------------------------------
# Legacy / original API functions
# ---------------------------------------------------------------------------
def echa_request(cas_num: str) -> Dict[str, Any]: def echa_request(cas_num: str) -> Dict[str, Any]:
"""Recupera i dati ECHA per un numero CAS.""" url = 'https://api.cosmoguard.it/api/v1/echa/search'
response = requests.post(f"{API_BASE}/echa/search", json={"cas": cas_num}, headers=_auth_headers()) response = requests.post(url, json={'cas': cas_num})
data = response.json() data = response.json()
return data["data"] if data["success"] else data["error"] if data['success'] == True:
return data['data']
else:
return data['error']
def cosing_request(cas_num: str) -> Dict[str, Any]: def cosing_request(cas_num: str) -> Dict[str, Any]:
"""Recupera i dati COSING per un numero CAS.""" url = 'https://api.cosmoguard.it/api/v1/cosing/search'
response = requests.post( response = requests.post(url, json={
f"{API_BASE}/cosing/search", "full": True,
json={"full": True, "mode": "cas", "text": cas_num}, "mode": "cas",
headers=_auth_headers(), "text": cas_num
) })
data = response.json() data = response.json()
return data["data"] if data["success"] else data["error"] if data['success'] == True:
return data['data']
else:
return data['error']
def generate_pdf_download(cas, origin, link): def generate_pdf_download(cas, origin, link):
"""Genera e scarica un PDF tramite l'API.""" url = 'https://api.cosmoguard.it/api/v1/common/generate-pdf'
name = f"{cas}_{origin}" name = f'{cas}_{origin}'
if link is not None: if link is not None:
response = requests.post( response = requests.post(
f"{API_BASE}/common/generate-pdf", url,
json={"link": link, "name": name}, json = {
headers=_auth_headers(), 'link': link,
'name': name
}
) )
data = response.json() data = response.json()
else: else:
data = {"success": False, "error": "No dossier exists for this origin."} data = {
'success': False,
if data["success"]: 'error': 'No dossier exists for this origin.'
response = requests.get(f"{API_BASE}/common/download-pdf/{name}", headers=_auth_headers()) }
if data['success'] == True:
url = f'https://api.cosmoguard.it/api/v1/common/download-pdf/{name}'
response = requests.get(url)
response.raise_for_status() response.raise_for_status()
return response.content return response.content
else: else:
return data["error"] return data['error']
def cosing_download(ref_no: str): def cosing_download(ref_no: str):
"""Scarica il PDF ufficiale COSING per un numero di riferimento.""" url = f'https://api.tech.ec.europa.eu/cosing20/1.0/api/cosmetics/{ref_no}/export-pdf'
url = f"https://api.tech.ec.europa.eu/cosing20/1.0/api/cosmetics/{ref_no}/export-pdf"
headers = { headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0", 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0',
"Accept": "application/json, text/plain, */*", 'Accept': 'application/json, text/plain, */*',
"Accept-Language": "it-IT,it;q=0.9", 'Accept-Language': 'it-IT,it;q=0.9',
"Cache-Control": "No-Cache", 'Cache-Control': 'No-Cache',
"Origin": "https://ec.europa.eu", 'Origin': 'https://ec.europa.eu',
"Referer": "https://ec.europa.eu/", 'Referer': 'https://ec.europa.eu/',
"Sec-Fetch-Dest": "empty", 'Sec-Fetch-Dest': 'empty',
"Sec-Fetch-Mode": "cors", 'Sec-Fetch-Mode': 'cors',
"Sec-Fetch-Site": "same-site", 'Sec-Fetch-Site': 'same-site',
} }
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
if response.status_code == 200: if response.status_code == 200:
return response.content return response.content
return f"Error: {response.status_code} - {response.text}" else:
return f"Error: {response.status_code} - {response.text}"

View file

@ -1,134 +1,51 @@
import pandas as pd
import streamlit as st import streamlit as st
import json
import uuid
from functions import do_login, generate_pdf_download from functions import generate_pdf_download
def open_csv_file(file_path): def open_csv_file(file_path):
"""Apre un file CSV e restituisce una connessione DuckDB in-memory.""" """Apre un file CSV e restituisce i dati come dataframe."""
import duckdb import duckdb
con = duckdb.connect(database=":memory:")
con.execute(f"CREATE TABLE index AS SELECT * FROM read_csv_auto('{file_path}')") # Usa DuckDB per leggere il file CSV
con = duckdb.connect(database=':memory:')
query = f"CREATE TABLE index AS SELECT * FROM read_csv_auto('{file_path}')"
con.execute(query)
return con return con
def search_cas_inci(input, type = 'cas'):
def search_cas_inci(input, type="cas"): """Cerca un numero CAS nei dati forniti e restituisce CAS e INCI."""
"""Cerca un numero CAS o INCI nel file CSV. Ritorna lista di tuple (casNo, inciName).""" con = open_csv_file('data.csv')
con = open_csv_file("streamlit\data.csv") if type == 'cas':
if type == "cas":
query = f"SELECT * FROM index WHERE casNo LIKE '%{input}%'" query = f"SELECT * FROM index WHERE casNo LIKE '%{input}%'"
else: else:
query = f"SELECT * FROM index WHERE inciName ILIKE '%{input}%'" query = f"SELECT * FROM index WHERE inciName ILIKE '%{input}%'"
results = con.execute(query).fetchdf() results = con.execute(query).fetchdf()
# Restituisce una lista di tuple (casNo, inciName)
if not results.empty: if not results.empty:
return list(zip(results["casNo"].tolist(), results["inciName"].tolist())) return list(zip(results['casNo'].tolist(), results['inciName'].tolist()))
return [] return []
def search_cir(input_text: str) -> list[tuple]:
"""Cerca nel CIR database per nome ingrediente o INCI. Ritorna lista di (name, inci, url)."""
con = open_csv_file("streamlit\cir-reports.csv")
query = f"""
SELECT
"tablescraper-selected-row" AS ingredient_name,
"tablescraper-selected-row 2" AS inci_name,
"tablescraper-selected-row href" AS url
FROM index
WHERE (
"tablescraper-selected-row" ILIKE '%{input_text}%'
OR "tablescraper-selected-row 2" ILIKE '%{input_text}%'
)
AND "tablescraper-selected-row" != 'Ingredient Name as Used:'
AND "tablescraper-selected-row href" != ''
LIMIT 20
"""
results = con.execute(query).fetchdf()
if not results.empty:
return list(zip(
results["ingredient_name"].tolist(),
results["inci_name"].tolist(),
results["url"].tolist(),
))
return []
def download_pdf(casNo, origin, link): def download_pdf(casNo, origin, link):
"""Mostra i pulsanti per generare e scaricare un PDF dall'API.""" """Scarica un PDF generato dall'API."""
if st.button("Generate PDF", key=f"gen_{casNo}_{origin}"): if st.button("Generate PDF", key= f"gen_{casNo}_{origin}"):
with st.spinner("Fetching PDF..."): with st.spinner("Fetching PDF..."):
pdf_data = generate_pdf_download(cas=casNo, origin=origin, link=link) pdf_data = generate_pdf_download(
st.download_button( cas=casNo,
label="Download PDF", origin=origin,
data=pdf_data, link=link
file_name=f"{casNo}_{origin}.pdf", )
mime="application/pdf",
key=f"dl_{casNo}_{origin}",
)
def show_login_page():
"""Mostra il form di login."""
_, col, _ = st.columns([1, 1, 1])
with col:
st.title("LMB App")
st.caption("Accedi per continuare")
with st.form("login_form"):
email = st.text_input("Email", placeholder="utente@esempio.it")
password = st.text_input("Password", type="password")
submitted = st.form_submit_button("Accedi", use_container_width=True, type="primary")
if submitted:
if not email or not password:
st.warning("Inserisci email e password.")
else:
with st.spinner("Autenticazione in corso..."):
if do_login(email, password):
st.rerun()
def display_orderData(order_data: dict):
"""Mostra un riepilogo leggibile dell'ordine (non JSON)."""
st.markdown("### Riepilogo Ordine")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Cliente", order_data["client_name"])
with col2:
st.metric("Prodotto", order_data["product_name"])
with col3:
st.metric("Preset Esposizione", order_data["preset_esposizione"])
ingredients = order_data.get("ingredients", [])
total_pct = sum(i["percentage"] for i in ingredients)
col4, col5, col6 = st.columns(3)
with col4:
st.metric("Numero ingredienti", len(ingredients))
with col5:
st.metric("Percentuale totale", f"{total_pct:.6f}%")
with col6:
n_col = sum(1 for i in ingredients if i["is_colorante"])
n_skip = sum(1 for i in ingredients if i["skip_tox"])
parts = []
if n_col > 0:
parts.append(f"{n_col} colorante/i")
if n_skip > 0:
parts.append(f"{n_skip} senza tox")
st.metric("Flag attivi", ", ".join(parts) if parts else "Nessuno")
st.markdown("**Ingredienti:**")
display_rows = [
{
"INCI": i["inci"] if i["inci"] else "-",
"CAS": i["cas"] if i["cas"] else "-",
"Percentuale (%)": f"{i['percentage']:.6f}",
"Colorante": "Si" if i["is_colorante"] else "-",
"Salta Tox": "Si" if i["skip_tox"] else "-",
}
for i in ingredients
]
st.dataframe(pd.DataFrame(display_rows), width="stretch", hide_index=True)
st.download_button(
label="Download PDF",
data=pdf_data,
file_name=f"{casNo}_{origin}.pdf",
mime="application/pdf",
key=f"dl_{casNo}_{origin}"
)
if __name__ == "__main__": if __name__ == "__main__":
data = search_cas_inci("102242-62-6") data = search_cas_inci('102242-62-6')
print(data) print(data)

138
old/cosing.py Normal file
View file

@ -0,0 +1,138 @@
import streamlit as st
import json
from functions import cosing_request
from functions_ui import download_pdf
st.title("CosIng Database Viewer")
st.set_page_config(
page_title="CosIng",
page_icon="🧪",
layout="wide"
)
def display_ingredient(data: dict, level: int = 0):
"""Display ingredient information using containers and metrics."""
# Get names
name = data.get("commonName") or data.get("inciName") or "Unknown"
item_type = data.get("itemType", "ingredient")
# Header
if level == 0:
st.title(f"🧪 {name}")
else:
st.markdown(f"### {name}")
# Type badge and chemical name
col_header1, col_header2 = st.columns([1, 3])
with col_header1:
if item_type == "substance":
st.caption("🔬 Substance")
else:
st.caption("🧴 Ingredient")
# Identifiers container
cas_numbers = data.get("casNo", [])
ec_numbers = data.get("ecNo", [])
ref_no = data.get("refNo", "")
if cas_numbers or ec_numbers or ref_no:
with st.container(border=True):
cols = st.columns(3)
with cols[0]:
if cas_numbers:
st.metric("CAS", ", ".join(cas_numbers))
else:
st.metric("CAS", "")
with cols[1]:
if ec_numbers:
st.metric("EC", ", ".join(ec_numbers))
else:
st.metric("EC", "")
# Functions
functions = data.get("functionName", [])
if functions:
with st.container(border=True):
st.markdown("**Functions**")
func_cols = st.columns(len(functions))
for i, func in enumerate(functions):
with func_cols[i]:
st.success(func.title())
# Regulatory info
restrictions = data.get("otherRestrictions", [])
annex = data.get("annexNo", [])
regulations = data.get("otherRegulations", [])
opinions = data.get("sccsOpinion", [])
opinion_urls = data.get("sccsOpinionUrls", [])
if restrictions or annex or regulations or opinions or opinion_urls:
with st.container(border=True):
st.markdown("**Regulatory Information**")
if annex:
st.info(f"📋 **Annex {', '.join(annex)}**")
if restrictions:
for r in restrictions:
st.warning(f"⚠️ {r}")
if regulations:
st.write("Other regulations: " + "; ".join(regulations))
if opinions:
for opinion in opinions:
st.write(f"📄 {opinion}")
if opinion_urls:
for url in opinion_urls:
st.link_button("View SCCS Opinion", url)
# Source link
cosing_url = data.get("cosingUrl", "")
if cosing_url:
st.link_button("🔗 View on CosIng", cosing_url)
# Identified Ingredients (recursive)
identified = data.get("identifiedIngredient", [])
if identified:
st.divider()
# Check if it's a list of IDs or full objects
if identified and isinstance(identified[0], dict):
st.subheader(f"🔬 Related Substances ({len(identified)})")
for idx, ing in enumerate(identified):
ing_name = ing.get("commonName") or ing.get("inciName") or f"Substance {idx + 1}"
with st.expander(ing_name):
display_ingredient(ing, level=level + 1)
else:
# List of IDs only
st.subheader(f"🔬 Related Substances ({len(identified)} IDs)")
with st.expander("Show substance IDs"):
# Display in a grid
id_text = ", ".join(str(i) for i in identified[:20])
if len(identified) > 20:
id_text += f"... and {len(identified) - 20} more"
st.code(id_text)
def main():
if st.session_state.get('selected_cas', None) is None:
st.warning("Nessun CAS Number selezionato. Torna alla pagina principale per effettuare una ricerca.")
st.stop()
else:
cas_number = st.session_state.selected_cas
DATA = cosing_request(cas_number)
display_ingredient(DATA)
if __name__ == "__main__":
main()

121
old/dap.py Normal file
View file

@ -0,0 +1,121 @@
import streamlit as st
import requests
import json
st.title("PubChem Data Viewer")
if st.session_state.get('selected_cas', None) is None:
st.warning("Nessun CAS Number selezionato. Torna alla pagina principale per effettuare una ricerca.")
st.stop()
else:
cas_number = st.session_state.selected_cas
# Make API request
with st.spinner("Fetching data from PubChem..."):
try:
response = requests.post(
"https://api.cosmoguard.it/api/v1/common/pubchem",
json={"cas": cas_number}
)
response.raise_for_status()
result = response.json()
except requests.exceptions.RequestException as e:
st.error(f"Error fetching data: {e}")
st.stop()
# Check if request was successful
if not result.get("success", False):
st.error(f"API Error: {result.get('error', 'Unknown error')}")
st.stop()
data = result.get("data", {})
# Display substance header
st.subheader(f"{data.get('first_pubchem_name', 'Unknown').title()}")
# Basic info container
with st.container(border=True):
col1, col2, col3 = st.columns(3)
with col1:
st.caption("CAS Number")
st.write(data.get("CAS", ""))
with col2:
st.caption("PubChem CID")
st.write(data.get("CID", ""))
with col3:
if data.get("pubchem_link"):
st.link_button("View on PubChem", data.get("pubchem_link"))
st.divider()
# Physical/Chemical Properties
st.subheader("Physical & Chemical Properties")
with st.container(border=True):
prop_col1, prop_col2, prop_col3, prop_col4 = st.columns(4)
with prop_col1:
st.metric("Molecular Weight", f"{data.get('MolecularWeight', '')} g/mol" if data.get('MolecularWeight') else "")
with prop_col2:
st.metric("XLogP", data.get('XLogP', ''))
with prop_col3:
st.metric("Exact Mass", data.get('ExactMass', ''))
with prop_col4:
st.metric("TPSA", f"{data.get('TPSA', '')} Ų" if data.get('TPSA') else "")
st.divider()
# Melting Point
melting_points = data.get("Melting Point", [])
if melting_points:
st.subheader("Melting Point")
with st.expander(f"View {len(melting_points)} reference(s)", expanded=True):
for idx, mp in enumerate(melting_points):
with st.container(border=True):
st.markdown(f"**Reference {idx + 1}**")
if mp.get("Value"):
st.info(mp.get("Value"))
if mp.get("Reference"):
st.caption(f"📚 {mp.get('Reference')}")
if mp.get("Description"):
st.caption(f" {mp.get('Description')}")
if mp.get("ReferenceNumber"):
st.caption(f"Ref #: {mp.get('ReferenceNumber')}")
# Dissociation Constants
dissociation_constants = data.get("Dissociation Constants", [])
if dissociation_constants:
st.divider()
st.subheader("Dissociation Constants (pKa)")
with st.expander(f"View {len(dissociation_constants)} reference(s)", expanded=True):
for idx, dc in enumerate(dissociation_constants):
with st.container(border=True):
st.markdown(f"**Reference {idx + 1}**")
if dc.get("Value"):
# Check if it's a dictionary or string
value = dc.get("Value")
if isinstance(value, dict):
st.code(json.dumps(value, indent=2))
else:
st.info(value)
if dc.get("Reference"):
st.caption(f"📚 {dc.get('Reference')}")
if dc.get("ReferenceNumber"):
st.caption(f"Ref #: {dc.get('ReferenceNumber')}")
# Raw JSON viewer
st.divider()
with st.expander("View Raw JSON Response"):
st.json(result)

View file

@ -1,36 +1,99 @@
import streamlit as st import streamlit as st
import json
from functions import ( from functions import echa_request
echa_request,
extract_acute_values,
extract_repeated_values,
extract_tox_info_values,
)
from functions_ui import download_pdf from functions_ui import download_pdf
st.set_page_config( st.set_page_config(
page_title="ECHA Toxicological Data Viewer", page_title="ECHA Toxicological Data Viewer",
page_icon="🧊", page_icon="🧊",
layout="wide", layout="wide",
initial_sidebar_state="expanded", initial_sidebar_state="expanded"
) )
st.info("Questa pagina mostra i dati ECHA in modo più dettagliato. È consigliato però utilizzare la pagina **Ingrediente** per avere informazioni più complete sull'ingrediente.") st.info("Questa pagina mostra i dati ECHA in modo più dettagliato. È consigliato però utilizzare la pagina **Ingrediente** per avere informazioni più complete sull'ingrediente.")
if st.session_state.get("selected_cas") is None: if st.session_state.get('selected_cas', None) is None:
st.warning("Nessun CAS Number selezionato. Torna alla pagina principale per effettuare una ricerca.") st.warning("Nessun CAS Number selezionato. Torna alla pagina principale per effettuare una ricerca.")
st.stop() st.stop()
else:
cas_number = st.session_state.selected_cas
cas_number = st.session_state.selected_cas
DATA = echa_request(cas_number) DATA = echa_request(cas_number)
if "substance" not in DATA: if 'substance' not in DATA:
st.error(DATA) st.error(DATA)
st.stop() st.stop()
def extract_tox_info_values(data):
"""Extract DNEL values from toxicological information."""
rows = []
sections = data.get("toxicological_information", {}).get("sections", [])
for section in sections:
label = section.get("label", "")
if "subsections" in section:
for subsec in section["subsections"]:
effect_type = subsec.get("label", "")
if "subsections" in subsec:
for sub2 in subsec["subsections"]:
dose = sub2.get("StDose", {})
if isinstance(dose, dict) and dose.get("value"):
rows.append({
"Population/Route": label,
"Effect Type": effect_type,
"Exposure": sub2.get("label", ""),
"Assessment": sub2.get("HazardAssessment", ""),
"Value numerical": dose.get("value", ""),
"Unit": dose.get("unit", ""),
"Endpoint": sub2.get("MostSensitiveEndpoint", "")
})
return rows
def extract_acute_values(data):
"""Extract acute toxicity values."""
rows = []
sections = data.get("acute_toxicity", {}).get("sections", [])
for section in sections:
if section.get("label") == "Key value for assessment":
for subsec in section.get("subsections", []):
if subsec.get("EffectLevelValue"):
rows.append({
"Route": subsec.get("label", "").replace("Acute toxicity: ", ""),
"Endpoint": subsec.get("EffectLevelUnit", ""),
"Value": subsec.get("EffectLevelValue", ""),
"Conclusion": subsec.get("EndpointConclusion", "")
})
return rows
def extract_repeated_values(data):
"""Extract repeated dose toxicity values."""
rows = []
sections = data.get("repeated_dose_toxicity", {}).get("sections", [])
for section in sections:
if section.get("label") == "Key value for assessment":
for subsec in section.get("subsections", []):
study_type = subsec.get("label", "")
for sub2 in subsec.get("subsections", []):
if sub2.get("EffectLevelValue"):
rows.append({
"Study Type": study_type,
"Route": sub2.get("label", ""),
"Endpoint": sub2.get("EffectLevelUnit", ""),
"Value": sub2.get("EffectLevelValue", ""),
"Species": sub2.get("Species", "-")
})
return rows
# App # App
st.title("Toxicological Data Viewer") st.title("Toxicological Data Viewer")
# Substance info # Substance info
substance = DATA["substance"] substance = DATA["substance"]
dossier = DATA["dossier_info"] dossier = DATA["dossier_info"]
@ -41,16 +104,16 @@ with st.container(border=True):
col1, col2, col3, col4 = st.columns(4) col1, col2, col3, col4 = st.columns(4)
with col1: with col1:
st.caption("CAS Number") st.caption("CAS Number")
st.write(substance["rmlCas"]) st.write(substance['rmlCas'])
with col2: with col2:
st.caption("EC Number") st.caption("EC Number")
st.write(substance["rmlEc"]) st.write(substance['rmlEc'])
with col3: with col3:
st.caption("Status") st.caption("Status")
st.write(dossier["registrationStatus"]) st.write(dossier['registrationStatus'])
with col4: with col4:
st.caption("Last Updated") st.caption("Last Updated")
st.write(dossier["lastUpdatedDate"]) st.write(dossier['lastUpdatedDate'])
# Tabs # Tabs
tab1, tab2, tab3 = st.tabs(["Toxicological Information", "Acute Toxicity", "Repeated Dose Toxicity"]) tab1, tab2, tab3 = st.tabs(["Toxicological Information", "Acute Toxicity", "Repeated Dose Toxicity"])
@ -64,9 +127,9 @@ with tab1:
st.info("No DNEL values found.") st.info("No DNEL values found.")
download_pdf( download_pdf(
casNo=substance["rmlCas"], casNo=substance['rmlCas'],
origin="echa_tox_info", origin='echa_tox_info',
link=DATA["index"]["toxicological_information_link"], link=DATA['index']['toxicological_information_link']
) )
with tab2: with tab2:
@ -78,9 +141,9 @@ with tab2:
st.info("No acute toxicity values found.") st.info("No acute toxicity values found.")
download_pdf( download_pdf(
casNo=substance["rmlCas"], casNo=substance['rmlCas'],
origin="echa_acute_tox", origin='echa_acute_tox',
link=DATA["index"]["acute_toxicity_link"], link=DATA['index']['acute_toxicity_link']
) )
with tab3: with tab3:
@ -92,12 +155,13 @@ with tab3:
st.info("No repeated dose toxicity values found.") st.info("No repeated dose toxicity values found.")
download_pdf( download_pdf(
casNo=substance["rmlCas"], casNo=substance['rmlCas'],
origin="echa_repeated_tox", origin='echa_repeated_tox',
link=DATA["index"]["repeated_dose_toxicity_link"], link=DATA['index']['repeated_dose_toxicity_link']
) )
# Key Information sections # Key Information sections
st.divider() st.divider()
st.subheader("Key Information") st.subheader("Key Information")

View file

@ -1,14 +1,19 @@
import pandas as pd
import streamlit as st import streamlit as st
import requests
import pandas as pd
from functions import ( API_BASE = "http://localhost:8000/api/v1"
create_exposition_preset,
delete_exposition_preset,
fetch_all_presets,
)
EXPOSURE_ROUTES = ["Dermal", "Oral", "Inhalation", "Ocular"] EXPOSURE_ROUTES = ["Dermal", "Oral", "Inhalation", "Ocular"]
RITENZIONE_PRESETS = {
"Leave-on (1.0)": 1.0,
"Rinse-off (0.01)": 0.01,
"Dentifricio (0.05)": 0.05,
"Collutorio (0.10)": 0.10,
"Tintura (0.10)": 0.10,
}
st.set_page_config(page_title="Gestione Esposizione", layout="wide") st.set_page_config(page_title="Gestione Esposizione", layout="wide")
st.title("Gestione Preset Esposizione") st.title("Gestione Preset Esposizione")
@ -46,8 +51,10 @@ with tab_crea:
esp_nano = st.multiselect("Nano", EXPOSURE_ROUTES, default=["Dermal"], key="esp_nano") esp_nano = st.multiselect("Nano", EXPOSURE_ROUTES, default=["Dermal"], key="esp_nano")
st.markdown("---") st.markdown("---")
ritenzione = st.number_input("Fattore di ritenzione", min_value=0.01, max_value=1.00, value=1.0, step=0.01, format="%.2f") ritenzione_label = st.selectbox("Fattore di ritenzione", list(RITENZIONE_PRESETS.keys()))
ritenzione = RITENZIONE_PRESETS[ritenzione_label]
# Preview payload
payload = { payload = {
"preset_name": preset_name, "preset_name": preset_name,
"tipo_prodotto": tipo_prodotto, "tipo_prodotto": tipo_prodotto,
@ -66,11 +73,14 @@ with tab_crea:
if st.button("Crea Preset", type="primary", disabled=not preset_name): if st.button("Crea Preset", type="primary", disabled=not preset_name):
try: try:
data = create_exposition_preset(payload) resp = requests.post(f"{API_BASE}/esposition/create", json=payload, timeout=10)
data = resp.json()
if data.get("success"): if data.get("success"):
st.success(f"Preset '{preset_name}' creato con successo (id: {data['data']['id_preset']})") st.success(f"Preset '{preset_name}' creato con successo (id: {data['data']['id_preset']})")
else: else:
st.error(f"Errore: {data.get('error', 'Sconosciuto')}") st.error(f"Errore: {data.get('error', 'Sconosciuto')}")
except requests.ConnectionError:
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo.")
except Exception as e: except Exception as e:
st.error(f"Errore: {e}") st.error(f"Errore: {e}")
@ -87,7 +97,8 @@ with tab_lista:
st.rerun() st.rerun()
try: try:
data = fetch_all_presets() resp = requests.get(f"{API_BASE}/esposition/presets", timeout=10)
data = resp.json()
if data.get("success") and data.get("data"): if data.get("success") and data.get("data"):
st.info(f"Trovati {data['total']} preset") st.info(f"Trovati {data['total']} preset")
@ -99,7 +110,7 @@ with tab_lista:
"ritenzione", "esposizione_calcolata", "esposizione_relativa", "ritenzione", "esposizione_calcolata", "esposizione_relativa",
] ]
existing = [c for c in display_cols if c in df.columns] existing = [c for c in display_cols if c in df.columns]
st.dataframe(df[existing], width="stretch", hide_index=True) st.dataframe(df[existing], use_container_width=True, hide_index=True)
st.divider() st.divider()
st.subheader("Elimina Preset") st.subheader("Elimina Preset")
@ -113,9 +124,10 @@ with tab_lista:
with col_btn: with col_btn:
sub1, sub2 = st.columns(2) sub1, sub2 = st.columns(2)
with sub1: with sub1:
if st.button("Si", key=f"confirm_{name}", type="primary", width="stretch"): if st.button("Si", key=f"confirm_{name}", type="primary", use_container_width=True):
try: try:
del_data = delete_exposition_preset(name) del_resp = requests.delete(f"{API_BASE}/esposition/delete/{name}", timeout=10)
del_data = del_resp.json()
if del_data.get("success"): if del_data.get("success"):
st.success(f"Preset '{name}' eliminato") st.success(f"Preset '{name}' eliminato")
st.session_state.confirm_delete_preset = None st.session_state.confirm_delete_preset = None
@ -125,16 +137,18 @@ with tab_lista:
except Exception as e: except Exception as e:
st.error(f"Errore: {e}") st.error(f"Errore: {e}")
with sub2: with sub2:
if st.button("No", key=f"cancel_{name}", width="stretch"): if st.button("No", key=f"cancel_{name}", use_container_width=True):
st.session_state.confirm_delete_preset = None st.session_state.confirm_delete_preset = None
st.rerun() st.rerun()
else: else:
with col_btn: with col_btn:
if st.button("Elimina", key=f"del_{name}", width="stretch"): if st.button("Elimina", key=f"del_{name}", use_container_width=True):
st.session_state.confirm_delete_preset = name st.session_state.confirm_delete_preset = name
st.rerun() st.rerun()
else: else:
st.warning("Nessun preset trovato.") st.warning("Nessun preset trovato.")
except requests.ConnectionError:
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo.")
except Exception as e: except Exception as e:
st.error(f"Errore nel caricamento: {e}") st.error(f"Errore nel caricamento: {e}")

View file

@ -1,9 +1,13 @@
import pandas as pd
import streamlit as st import streamlit as st
import requests
import pandas as pd
from functions import cosing_download, fetch_ingredient from functions import cosing_download
from functions_ui import download_pdf from functions_ui import download_pdf
#API_BASE = "https://api.cosmoguard.it/api/v1"
API_BASE = "http://localhost:8000/api/v1"
st.set_page_config(page_title="Ricerca Ingredienti", layout="wide") st.set_page_config(page_title="Ricerca Ingredienti", layout="wide")
st.title("Ricerca Ingredienti per CAS") st.title("Ricerca Ingredienti per CAS")
@ -13,18 +17,30 @@ if "ingredient_cas" not in st.session_state:
st.session_state.ingredient_cas = "" st.session_state.ingredient_cas = ""
cas_input = st.session_state.selected_cas cas_input = st.session_state.selected_cas
force_refresh = st.session_state.get("force_refresh", False) force_refresh = st.session_state.force_refresh if "force_refresh" in st.session_state else False
if cas_input: if cas_input:
with st.spinner(f"Ricerca in corso per {cas_input}..."): with st.spinner(f"Ricerca in corso per {cas_input}..."):
success, result = fetch_ingredient(cas_input, force=force_refresh) try:
if success: resp = requests.post(
st.session_state.ingredient_data = result f"{API_BASE}/ingredients/search",
st.session_state.ingredient_cas = cas_input json={"cas": cas_input, "force": force_refresh},
st.success(f"Ingrediente {cas_input} trovato") timeout=120,
else: )
st.session_state.ingredient_data = None result = resp.json()
st.error(result)
if result.get("success") and result.get("data"):
st.session_state.ingredient_data = result["data"]
st.session_state.ingredient_cas = cas_input
st.success(f"Ingrediente {cas_input} trovato")
else:
st.session_state.ingredient_data = None
st.error(result.get("error", f"Nessun dato per CAS {cas_input}"))
except requests.ConnectionError:
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo.")
except Exception as e:
st.error(f"Errore: {e}")
data = st.session_state.ingredient_data data = st.session_state.ingredient_data
@ -38,8 +54,8 @@ else:
# --- Header con INCI e data --- # --- Header con INCI e data ---
col_h1, col_h2 = st.columns(2) col_h1, col_h2 = st.columns(2)
with col_h1: with col_h1:
inci = st.session_state.selected_inci or "Non disponibile" inci = data.get("inci") or []
st.markdown(f"**INCI:** {inci if inci else 'N/A'}") st.markdown(f"**INCI:** {', '.join(inci) if inci else 'N/A'}")
with col_h2: with col_h2:
st.markdown(f"**Data creazione:** {data.get('creation_date', 'N/A')}") st.markdown(f"**Data creazione:** {data.get('creation_date', 'N/A')}")
@ -89,17 +105,18 @@ else:
ref_no = cosing.get("reference", "") ref_no = cosing.get("reference", "")
if ref_no: if ref_no:
pdf_bytes = cosing_download(ref_no) pdf_bytes = cosing_download(ref_no)
if isinstance(pdf_bytes, bytes): if isinstance(pdf_bytes, bytes):
st.download_button( st.download_button(
label="Download CosIng PDF", label="Download CosIng PDF",
data=pdf_bytes, data=pdf_bytes,
file_name=f"{cas_input}_cosing.pdf", file_name=f"{cas_input}_cosing.pdf",
mime="application/pdf", mime="application/pdf"
key=f"download_cosing_{ref_no}",
) )
else: else:
st.error(pdf_bytes) st.error(pdf_bytes)
with col_c2: with col_c2:
annex = cosing.get("annex", []) annex = cosing.get("annex", [])
if annex: if annex:
@ -123,7 +140,7 @@ else:
if link_opinions: if link_opinions:
st.markdown("**SCCS Opinions:**") st.markdown("**SCCS Opinions:**")
for url in link_opinions: for url in link_opinions:
st.markdown(f"[View SCCS Opinion]({url})") st.link_button("View SCCS Opinion", url)
if i < len(cosing_list) - 1: if i < len(cosing_list) - 1:
st.divider() st.divider()
@ -136,18 +153,21 @@ else:
st.subheader("Tossicologia") st.subheader("Tossicologia")
tox = data.get("toxicity") tox = data.get("toxicity")
if tox: if tox:
# Best case highlight
best = tox.get("best_case") best = tox.get("best_case")
if best: if best:
st.success( st.success(
f"Miglior indicatore: **{best['indicator']}** = {best['value']} {best['unit']} " f"Miglior indicatore: **{best['indicator']}** = {best['value']} {best['unit']} "
f"({best['route']}) — Fattore: {tox.get('factor', 'N/A')}" f"({best['route']}) — Fattore: {tox.get('factor', 'N/A')}"
) )
download_pdf( download_pdf(
casNo=cas_input, casNo=cas_input,
origin=best.get("source"), origin=best.get("source"),
link=best.get("ref"), link=best.get("ref")
) )
# Indicators table
indicators = tox.get("indicators", []) indicators = tox.get("indicators", [])
if indicators: if indicators:
df = pd.DataFrame(indicators) df = pd.DataFrame(indicators)

View file

@ -1,17 +1,107 @@
import pandas as pd
import streamlit as st import streamlit as st
import requests
import pandas as pd
from datetime import datetime, date from datetime import datetime, date
from functions import ( API_BASE = "http://localhost:8000/api/v1"
STATUS_MAP,
api_delete_order, STATUS_MAP = {
api_retry_order, 1: ("Ricevuto", "🔵"),
download_excel, 2: ("Validato", "🟡"),
download_sources, 3: ("Compilazione", "🟠"),
fetch_order_detail, 5: ("Arricchito", "🟢"),
fetch_orders, 6: ("Calcolo", "🔵"),
status_label, 8: ("Completato", ""),
) 9: ("Errore", "🔴"),
}
def status_label(stato_ordine):
"""Ritorna label con emoji per lo stato."""
name, emoji = STATUS_MAP.get(stato_ordine, ("Sconosciuto", ""))
return f"{emoji} {name}"
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def fetch_orders():
"""Recupera la lista ordini dall'API."""
try:
resp = requests.get(f"{API_BASE}/orders/list", timeout=15)
data = resp.json()
if data.get("success"):
return data.get("data", [])
return []
except requests.ConnectionError:
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo su localhost:8000.")
return []
except Exception as e:
st.error(f"Errore nel caricamento degli ordini: {e}")
return []
def fetch_order_detail(id_ordine):
"""Recupera il dettaglio completo di un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/detail/{id_ordine}", timeout=15)
data = resp.json()
if data.get("success"):
return data.get("order")
st.error(data.get("detail", "Errore nel recupero dettaglio ordine"))
return None
except requests.ConnectionError:
st.error("Impossibile connettersi all'API.")
return None
except Exception as e:
st.error(f"Errore: {e}")
return None
def api_retry_order(id_ordine):
"""Chiama POST /orders/retry/{id_ordine}."""
try:
resp = requests.post(f"{API_BASE}/orders/retry/{id_ordine}", timeout=15)
return resp.json()
except Exception as e:
return {"success": False, "error": str(e)}
def api_delete_order(id_ordine):
"""Chiama DELETE /orders/{id_ordine}."""
try:
resp = requests.delete(f"{API_BASE}/orders/{id_ordine}", timeout=15)
return resp.json()
except Exception as e:
return {"success": False, "error": str(e)}
def download_excel(id_ordine):
"""Scarica il file Excel per un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/export/{id_ordine}", timeout=120)
if resp.status_code == 200:
return resp.content
st.error(f"Errore download Excel: {resp.json().get('detail', resp.status_code)}")
return None
except Exception as e:
st.error(f"Errore download: {e}")
return None
def download_sources(id_ordine):
"""Scarica lo ZIP delle fonti PDF per un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/export-sources/{id_ordine}", timeout=300)
if resp.status_code == 200:
return resp.content
st.error(f"Errore download fonti: {resp.json().get('detail', resp.status_code)}")
return None
except Exception as e:
st.error(f"Errore download: {e}")
return None
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Session state init # Session state init
@ -24,6 +114,7 @@ if "confirm_delete" not in st.session_state:
if "orders_cache" not in st.session_state: if "orders_cache" not in st.session_state:
st.session_state.orders_cache = None st.session_state.orders_cache = None
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Page config # Page config
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -58,6 +149,7 @@ with st.expander(" Come usare questa pagina"):
""") """)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Detail View # Detail View
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -65,6 +157,7 @@ with st.expander(" Come usare questa pagina"):
def show_order_detail(id_ordine): def show_order_detail(id_ordine):
"""Mostra il dettaglio di un ordine selezionato.""" """Mostra il dettaglio di un ordine selezionato."""
# Bottone per tornare alla lista
if st.button("← Torna alla lista", key="back_btn"): if st.button("← Torna alla lista", key="back_btn"):
st.session_state.selected_order_id = None st.session_state.selected_order_id = None
st.session_state.confirm_delete = False st.session_state.confirm_delete = False
@ -124,11 +217,12 @@ def show_order_detail(id_ordine):
df_display = df[display_cols].rename(columns=col_rename) df_display = df[display_cols].rename(columns=col_rename)
# Mappa booleani a Si/No
for col_name in ["Colorante", "Skip Tox"]: for col_name in ["Colorante", "Skip Tox"]:
if col_name in df_display.columns: if col_name in df_display.columns:
df_display[col_name] = df_display[col_name].map({True: "", False: "-"}) df_display[col_name] = df_display[col_name].map({True: "", False: "-"})
st.dataframe(df_display, width="stretch", hide_index=True) st.dataframe(df_display, use_container_width=True, hide_index=True)
else: else:
st.info("Nessun ingrediente disponibile") st.info("Nessun ingrediente disponibile")
@ -138,13 +232,15 @@ def show_order_detail(id_ordine):
st.subheader("Azioni") st.subheader("Azioni")
btn_cols = st.columns(5) btn_cols = st.columns(5)
# 1. Aggiorna stato
with btn_cols[0]: with btn_cols[0]:
if st.button("🔄 Aggiorna", key="refresh_btn", width="stretch"): if st.button("🔄 Aggiorna", key="refresh_btn", use_container_width=True):
st.rerun() st.rerun()
# 2. Retry (solo se ERRORE)
with btn_cols[1]: with btn_cols[1]:
if stato == 9: if stato == 9:
if st.button("🔁 Retry", key="retry_btn", type="primary", width="stretch"): if st.button("🔁 Retry", key="retry_btn", type="primary", use_container_width=True):
with st.spinner("Retry in corso..."): with st.spinner("Retry in corso..."):
result = api_retry_order(id_ordine) result = api_retry_order(id_ordine)
if result.get("success"): if result.get("success"):
@ -153,12 +249,13 @@ def show_order_detail(id_ordine):
else: else:
st.error(result.get("error") or result.get("detail", "Errore retry")) st.error(result.get("error") or result.get("detail", "Errore retry"))
else: else:
st.button("🔁 Retry", key="retry_btn_disabled", disabled=True, width="stretch") st.button("🔁 Retry", key="retry_btn_disabled", disabled=True, use_container_width=True)
# 3. Scarica Excel
with btn_cols[2]: with btn_cols[2]:
if has_project: if has_project:
excel_data = None excel_data = None
if st.button("📊 Scarica Excel", key="excel_prep_btn", width="stretch"): if st.button("📊 Scarica Excel", key="excel_prep_btn", use_container_width=True):
with st.spinner("Generazione Excel..."): with st.spinner("Generazione Excel..."):
excel_data = download_excel(id_ordine) excel_data = download_excel(id_ordine)
if excel_data: if excel_data:
@ -167,15 +264,16 @@ def show_order_detail(id_ordine):
data=excel_data, data=excel_data,
file_name=f"progetto_{id_ordine}.xlsx", file_name=f"progetto_{id_ordine}.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
key="excel_download", key="excel_download"
) )
else: else:
st.button("📊 Scarica Excel", key="excel_disabled", disabled=True, st.button("📊 Scarica Excel", key="excel_disabled", disabled=True,
width="stretch", help="Progetto non ancora disponibile") use_container_width=True, help="Progetto non ancora disponibile")
# 4. Scarica Fonti PDF
with btn_cols[3]: with btn_cols[3]:
if has_project: if has_project:
if st.button("📄 Scarica Fonti PDF", key="pdf_prep_btn", width="stretch"): if st.button("📄 Scarica Fonti PDF", key="pdf_prep_btn", use_container_width=True):
with st.spinner("Generazione PDF fonti (può richiedere tempo)..."): with st.spinner("Generazione PDF fonti (può richiedere tempo)..."):
zip_data = download_sources(id_ordine) zip_data = download_sources(id_ordine)
if zip_data: if zip_data:
@ -184,22 +282,23 @@ def show_order_detail(id_ordine):
data=zip_data, data=zip_data,
file_name=f"fonti_ordine_{id_ordine}.zip", file_name=f"fonti_ordine_{id_ordine}.zip",
mime="application/zip", mime="application/zip",
key="pdf_download", key="pdf_download"
) )
else: else:
st.button("📄 Scarica Fonti PDF", key="pdf_disabled", disabled=True, st.button("📄 Scarica Fonti PDF", key="pdf_disabled", disabled=True,
width="stretch", help="Progetto non ancora disponibile") use_container_width=True, help="Progetto non ancora disponibile")
# 5. Elimina ordine
with btn_cols[4]: with btn_cols[4]:
if not st.session_state.confirm_delete: if not st.session_state.confirm_delete:
if st.button("🗑️ Elimina", key="delete_btn", width="stretch"): if st.button("🗑️ Elimina", key="delete_btn", use_container_width=True):
st.session_state.confirm_delete = True st.session_state.confirm_delete = True
st.rerun() st.rerun()
else: else:
st.warning("Confermi l'eliminazione?") st.warning("Confermi l'eliminazione?")
sub_cols = st.columns(2) sub_cols = st.columns(2)
with sub_cols[0]: with sub_cols[0]:
if st.button("✅ Conferma", key="confirm_yes", type="primary", width="stretch"): if st.button("✅ Conferma", key="confirm_yes", type="primary", use_container_width=True):
with st.spinner("Eliminazione in corso..."): with st.spinner("Eliminazione in corso..."):
result = api_delete_order(id_ordine) result = api_delete_order(id_ordine)
if result.get("success"): if result.get("success"):
@ -211,7 +310,7 @@ def show_order_detail(id_ordine):
else: else:
st.error(result.get("detail") or result.get("error", "Errore eliminazione")) st.error(result.get("detail") or result.get("error", "Errore eliminazione"))
with sub_cols[1]: with sub_cols[1]:
if st.button("❌ Annulla", key="confirm_no", width="stretch"): if st.button("❌ Annulla", key="confirm_no", use_container_width=True):
st.session_state.confirm_delete = False st.session_state.confirm_delete = False
st.rerun() st.rerun()
@ -223,6 +322,7 @@ def show_order_detail(id_ordine):
def show_orders_list(): def show_orders_list():
"""Mostra la lista degli ordini con filtri.""" """Mostra la lista degli ordini con filtri."""
# Filtri
st.subheader("Filtri") st.subheader("Filtri")
filter_cols = st.columns([2, 2, 2, 1]) filter_cols = st.columns([2, 2, 2, 1])
@ -231,7 +331,7 @@ def show_orders_list():
"Intervallo date", "Intervallo date",
value=[], value=[],
key="date_filter", key="date_filter",
help="Filtra per data ordine", help="Filtra per data ordine"
) )
with filter_cols[1]: with filter_cols[1]:
@ -240,17 +340,19 @@ def show_orders_list():
with filter_cols[2]: with filter_cols[2]:
status_options = [f"{v[1]} {v[0]}" for v in STATUS_MAP.values()] status_options = [f"{v[1]} {v[0]}" for v in STATUS_MAP.values()]
status_keys = list(STATUS_MAP.keys())
selected_statuses = st.multiselect("Stato", status_options, key="status_filter") selected_statuses = st.multiselect("Stato", status_options, key="status_filter")
with filter_cols[3]: with filter_cols[3]:
st.write("") # spacer
st.write("") st.write("")
st.write("") if st.button("🔄 Aggiorna", key="refresh_list", use_container_width=True):
if st.button("🔄 Aggiorna", key="refresh_list", width="stretch"):
st.session_state.orders_cache = None st.session_state.orders_cache = None
st.rerun() st.rerun()
st.divider() st.divider()
# Carica ordini
if st.session_state.orders_cache is None: if st.session_state.orders_cache is None:
with st.spinner("Caricamento ordini..."): with st.spinner("Caricamento ordini..."):
st.session_state.orders_cache = fetch_orders() st.session_state.orders_cache = fetch_orders()
@ -264,6 +366,7 @@ def show_orders_list():
# Applica filtri # Applica filtri
filtered = orders filtered = orders
# Filtro data
if date_range and len(date_range) == 2: if date_range and len(date_range) == 2:
start_date, end_date = date_range start_date, end_date = date_range
filtered = [ filtered = [
@ -272,6 +375,7 @@ def show_orders_list():
start_date <= datetime.fromisoformat(o["data_ordine"]).date() <= end_date start_date <= datetime.fromisoformat(o["data_ordine"]).date() <= end_date
] ]
# Filtro cliente
if client_search: if client_search:
search_lower = client_search.lower() search_lower = client_search.lower()
filtered = [ filtered = [
@ -279,11 +383,13 @@ def show_orders_list():
if o.get("nome_cliente") and search_lower in o["nome_cliente"].lower() if o.get("nome_cliente") and search_lower in o["nome_cliente"].lower()
] ]
# Filtro stato
if selected_statuses: if selected_statuses:
selected_ids = [ selected_ids = []
k for k, v in STATUS_MAP.items() for s in selected_statuses:
if f"{v[1]} {v[0]}" in selected_statuses for k, v in STATUS_MAP.items():
] if f"{v[1]} {v[0]}" == s:
selected_ids.append(k)
filtered = [o for o in filtered if o.get("stato_ordine") in selected_ids] filtered = [o for o in filtered if o.get("stato_ordine") in selected_ids]
if not filtered: if not filtered:
@ -292,6 +398,7 @@ def show_orders_list():
st.caption(f"{len(filtered)} ordini trovati") st.caption(f"{len(filtered)} ordini trovati")
# Mostra lista
for order in filtered: for order in filtered:
id_ord = order.get("id_ordine") id_ord = order.get("id_ordine")
product = order.get("product_name", "N/D") or "N/D" product = order.get("product_name", "N/D") or "N/D"
@ -307,7 +414,7 @@ def show_orders_list():
col_btn, col_info = st.columns([1, 5]) col_btn, col_info = st.columns([1, 5])
with col_btn: with col_btn:
if st.button(f"#{id_ord}", key=f"order_{id_ord}", width="stretch"): if st.button(f"#{id_ord}", key=f"order_{id_ord}", use_container_width=True):
st.session_state.selected_order_id = id_ord st.session_state.selected_order_id = id_ord
st.session_state.confirm_delete = False st.session_state.confirm_delete = False
st.rerun() st.rerun()

View file

@ -1,18 +1,234 @@
import requests
import streamlit as st import streamlit as st
import requests
import pandas as pd
import re
from functions import ( API_BASE = "http://localhost:8000/api/v1"
API_BASE,
_auth_headers, # CAS validation: 2-7 digits, dash, 2 digits, dash, 1 check digit
build_order_payload, CAS_PATTERN = re.compile(r"^\d{2,7}-\d{2}-\d$")
create_client,
fetch_clients, # Percentuale target e tolleranza
fetch_presets, PERCENTAGE_TARGET = 100.0
is_water_inci, PERCENTAGE_TOLERANCE = 0.01
make_empty_ingredient_df,
validate_order, # INCI che indicano acqua (case-insensitive)
) WATER_INCI = {"aqua", "water", "eau", "aqua/water", "water/aqua", "aqua/eau"}
from functions_ui import display_orderData
def is_water_inci(inci_value: str) -> bool:
"""Controlla se l'INCI corrisponde ad acqua o varianti."""
if not inci_value or not isinstance(inci_value, str):
return False
return inci_value.strip().lower() in WATER_INCI
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def fetch_presets() -> list[str]:
"""Recupera i nomi dei preset di esposizione dall'API."""
try:
resp = requests.get(f"{API_BASE}/esposition/presets", timeout=10)
data = resp.json()
if data.get("success") and data.get("data"):
return [p["preset_name"] for p in data["data"]]
return []
except requests.ConnectionError:
st.error("Impossibile connettersi all'API per caricare i preset. Verifica che il server sia attivo.")
return []
except Exception as e:
st.error(f"Errore nel caricamento dei preset: {e}")
return []
def fetch_clients() -> list[dict]:
"""Recupera la lista clienti dall'API. Ritorna lista di {id_cliente, nome_cliente}."""
try:
resp = requests.get(f"{API_BASE}/ingredients/clients", timeout=10)
data = resp.json()
if data.get("success") and data.get("data"):
return data["data"]
return []
except requests.ConnectionError:
return []
except Exception:
return []
def create_client(nome_cliente: str) -> bool:
"""Crea un nuovo cliente via API. Ritorna True se riuscito."""
try:
resp = requests.post(
f"{API_BASE}/ingredients/clients",
json={"nome_cliente": nome_cliente},
timeout=10,
)
data = resp.json()
return data.get("success", False)
except Exception:
return False
# ---------------------------------------------------------------------------
# Validation
# ---------------------------------------------------------------------------
def validate_order(
client_name: str,
product_name: str,
selected_preset: str | None,
df: pd.DataFrame,
) -> list[str]:
"""Valida tutti i campi dell'ordine. Ritorna lista di errori (vuota = valido)."""
errors = []
if not client_name or not client_name.strip():
errors.append("Il campo 'Nome del cliente' e obbligatorio.")
if not product_name or not product_name.strip():
errors.append("Il campo 'Nome del prodotto' e obbligatorio.")
if not selected_preset:
errors.append("Selezionare un preset di esposizione.")
# Normalizza colonne stringa (data_editor puo generare NaN)
str_df = df.copy()
str_df["inci"] = str_df["inci"].fillna("")
str_df["cas"] = str_df["cas"].fillna("")
# Righe attive: CAS compilato oppure percentuale > 0
active = str_df[(str_df["cas"].str.strip() != "") | (str_df["percentage"] > 0)]
if active.empty:
errors.append("Inserire almeno un ingrediente.")
return errors
# CAS format (solo righe non-colorante e non-acqua con CAS compilato)
for idx, row in active.iterrows():
cas_val = row["cas"].strip()
inci_val = row["inci"].strip()
is_col = bool(row["is_colorante"])
is_aqua = is_water_inci(inci_val)
# Se acqua, CAS puo essere vuoto
if is_aqua and cas_val == "":
continue
# Se colorante, skip validazione formato
if is_col:
continue
# CAS vuoto su riga non-acqua e non-colorante
if cas_val == "":
errors.append(f"Riga {idx + 1}: inserire un CAS number oppure selezionare 'Colorante'.")
continue
# Formato CAS
if not CAS_PATTERN.match(cas_val):
hint = f" ({inci_val})" if inci_val else ""
errors.append(
f"Formato CAS non valido alla riga {idx + 1}{hint}: '{cas_val}'. "
"Formato atteso: XX-XX-X (es. 56-81-5)."
)
# Somma percentuali
total_pct = active["percentage"].sum()
if abs(total_pct - PERCENTAGE_TARGET) > PERCENTAGE_TOLERANCE:
errors.append(
f"La somma delle percentuali e {total_pct:.6f}%, "
f"ma deve essere 100% (tolleranza +/- {PERCENTAGE_TOLERANCE}%)."
)
return errors
# ---------------------------------------------------------------------------
# Build payload
# ---------------------------------------------------------------------------
def build_order_payload(
client_name: str,
product_name: str,
preset_name: str,
df: pd.DataFrame,
) -> dict:
"""Costruisce il JSON dell'ordine a partire dai dati del form."""
str_df = df.copy()
str_df["inci"] = str_df["inci"].fillna("")
str_df["cas"] = str_df["cas"].fillna("")
active = str_df[(str_df["cas"].str.strip() != "") | (str_df["percentage"] > 0)]
ingredients = []
for _, row in active.iterrows():
inci_val = row["inci"].strip()
skip = bool(row["skip_tox"]) or is_water_inci(inci_val)
ingredients.append({
"inci": inci_val if inci_val else None,
"cas": row["cas"].strip(),
"percentage": round(float(row["percentage"]), 6),
"is_colorante": bool(row["is_colorante"]),
"skip_tox": skip,
})
return {
"client_name": client_name.strip(),
"product_name": product_name.strip(),
"preset_esposizione": preset_name,
"ingredients": ingredients,
}
# ---------------------------------------------------------------------------
# Display summary (reusable)
# ---------------------------------------------------------------------------
def display_orderData(order_data: dict):
"""Mostra un riepilogo leggibile dell'ordine (non JSON)."""
st.markdown("### Riepilogo Ordine")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Cliente", order_data["client_name"])
with col2:
st.metric("Prodotto", order_data["product_name"])
with col3:
st.metric("Preset Esposizione", order_data["preset_esposizione"])
ingredients = order_data.get("ingredients", [])
total_pct = sum(i["percentage"] for i in ingredients)
col4, col5, col6 = st.columns(3)
with col4:
st.metric("Numero ingredienti", len(ingredients))
with col5:
st.metric("Percentuale totale", f"{total_pct:.6f}%")
with col6:
n_col = sum(1 for i in ingredients if i["is_colorante"])
n_skip = sum(1 for i in ingredients if i["skip_tox"])
parts = []
if n_col > 0:
parts.append(f"{n_col} colorante/i")
if n_skip > 0:
parts.append(f"{n_skip} senza tox")
st.metric("Flag attivi", ", ".join(parts) if parts else "Nessuno")
st.markdown("**Ingredienti:**")
display_rows = []
for i in ingredients:
display_rows.append({
"INCI": i["inci"] if i["inci"] else "-",
"CAS": i["cas"] if i["cas"] else "-",
"Percentuale (%)": f"{i['percentage']:.6f}",
"Colorante": "Si" if i["is_colorante"] else "-",
"Salta Tox": "Si" if i["skip_tox"] else "-",
})
st.dataframe(pd.DataFrame(display_rows), use_container_width=True, hide_index=True)
# ===========================================================================
# PAGE
# ===========================================================================
st.set_page_config(page_title="Creazione Ordine", layout="wide") st.set_page_config(page_title="Creazione Ordine", layout="wide")
st.title("Creazione Ordine") st.title("Creazione Ordine")
@ -27,7 +243,13 @@ if "order_submitted" not in st.session_state:
if "order_result" not in st.session_state: if "order_result" not in st.session_state:
st.session_state.order_result = None st.session_state.order_result = None
if "ingredient_df" not in st.session_state: if "ingredient_df" not in st.session_state:
st.session_state.ingredient_df = make_empty_ingredient_df() st.session_state.ingredient_df = pd.DataFrame({
"inci": [""] * 5,
"cas": [""] * 5,
"percentage": [0.0] * 5,
"is_colorante": [False] * 5,
"skip_tox": [False] * 5,
})
if "new_client_mode" not in st.session_state: if "new_client_mode" not in st.session_state:
st.session_state.new_client_mode = False st.session_state.new_client_mode = False
@ -113,6 +335,7 @@ st.subheader("Dati ordine")
col_left, col_right = st.columns(2) col_left, col_right = st.columns(2)
with col_left: with col_left:
# Client dropdown con opzione "Nuovo cliente"
dropdown_options = client_names + ["+ Nuovo cliente..."] dropdown_options = client_names + ["+ Nuovo cliente..."]
selected_client_option = st.selectbox("Nome del cliente *", options=dropdown_options) selected_client_option = st.selectbox("Nome del cliente *", options=dropdown_options)
@ -197,12 +420,14 @@ edited_df = st.data_editor(
st.session_state.ingredient_df, st.session_state.ingredient_df,
column_config=column_config, column_config=column_config,
num_rows="dynamic", num_rows="dynamic",
width="stretch", use_container_width=True,
hide_index=True, hide_index=True,
key="ingredients_editor", key="ingredients_editor",
) )
_aqua_count = sum(is_water_inci(v) for v in edited_df["inci"].fillna("")) # Info per l'utente sulla rilevazione automatica AQUA/Water
_inci_col = edited_df["inci"].fillna("")
_aqua_count = sum(is_water_inci(v) for v in _inci_col)
if _aqua_count > 0: if _aqua_count > 0:
st.info(f"{_aqua_count} ingrediente/i rilevato/i come AQUA/Water: tossicologia saltata automaticamente.") st.info(f"{_aqua_count} ingrediente/i rilevato/i come AQUA/Water: tossicologia saltata automaticamente.")
@ -232,7 +457,6 @@ if st.button("Invia Ordine", type="primary", disabled=not can_submit):
resp = requests.post( resp = requests.post(
f"{API_BASE}/orders/create", f"{API_BASE}/orders/create",
json=payload, json=payload,
headers=_auth_headers(),
timeout=30, timeout=30,
) )
result = resp.json() result = resp.json()
@ -246,7 +470,7 @@ if st.button("Invia Ordine", type="primary", disabled=not can_submit):
st.error(result.get("error") or result.get("detail", "Errore nella creazione dell'ordine")) st.error(result.get("error") or result.get("detail", "Errore nella creazione dell'ordine"))
except requests.ConnectionError: except requests.ConnectionError:
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo.") st.error("Impossibile connettersi all'API. Verifica che il server sia attivo su localhost:8000.")
except Exception as e: except Exception as e:
st.error(f"Errore: {e}") st.error(f"Errore: {e}")
@ -264,7 +488,14 @@ st.markdown("---")
if st.button("Nuovo Ordine", key="reset_order"): if st.button("Nuovo Ordine", key="reset_order"):
st.session_state.order_submitted = False st.session_state.order_submitted = False
st.session_state.order_result = None st.session_state.order_result = None
st.session_state.ingredient_df = make_empty_ingredient_df() st.session_state.ingredient_df = pd.DataFrame({
"inci": [""] * 5,
"cas": [""] * 5,
"percentage": [0.0] * 5,
"is_colorante": [False] * 5,
"skip_tox": [False] * 5,
})
# Pulisci lo stato interno del widget data_editor
if "ingredients_editor" in st.session_state: if "ingredients_editor" in st.session_state:
del st.session_state["ingredients_editor"] del st.session_state["ingredients_editor"]
st.rerun() st.rerun()

View file

@ -1,159 +0,0 @@
import pandas as pd
import streamlit as st
from functions import (
add_tox_indicator,
delete_client,
fetch_all_ingredients,
fetch_clients,
)
st.title("Impostazioni")
tab_tox, tab_ingredienti, tab_clienti = st.tabs([
"Indicatore Tox Custom",
"Inventario Ingredienti",
"Gestione Clienti",
])
# ---------------------------------------------------------------------------
# TAB 1 — Indicatore Tox Custom
# ---------------------------------------------------------------------------
with tab_tox:
st.subheader("Aggiungi indicatore tossicologico custom")
with st.form("form_tox_indicator"):
cas = st.text_input("CAS Number", placeholder="es. 56-81-5")
col1, col2 = st.columns(2)
with col1:
indicator = st.selectbox("Indicatore", ["NOAEL", "LOAEL", "LD50"])
value = st.number_input("Valore", min_value=0.0, step=0.1)
unit = st.text_input("Unità", placeholder="es. mg/kg bw/day")
with col2:
route = st.text_input("Via di esposizione", placeholder="es. oral, dermal")
toxicity_type = st.selectbox(
"Tipo tossicità",
["", "repeated_dose_toxicity", "acute_toxicity"],
format_func=lambda x: x if x else "— non specificato —"
)
ref = st.text_input("Riferimento / Fonte", placeholder="es. Studio interno 2024")
submitted = st.form_submit_button("Aggiungi indicatore", type="primary")
if submitted:
if not cas or not unit or not route:
st.error("CAS, unità e via di esposizione sono obbligatori.")
else:
payload = {
"cas": cas,
"indicator": indicator,
"value": value,
"unit": unit,
"route": route,
"toxicity_type": toxicity_type or None,
"ref": ref or None,
}
try:
resp = add_tox_indicator(payload)
if resp.status_code == 200:
data = resp.json()
tox = data.get("data", {}).get("toxicity", {})
best = tox.get("best_case")
st.success(f"Indicatore aggiunto per CAS {cas}.")
if best:
st.info(f"Best case aggiornato: **{best['indicator']}** = {best['value']} {best['unit']} ({best['route']})")
elif resp.status_code == 404:
st.error(f"CAS {cas} non trovato in cache. Esegui prima una ricerca nella pagina Ingredienti.")
else:
st.error(f"Errore {resp.status_code}: {resp.json().get('detail', 'errore sconosciuto')}")
except Exception as e:
st.error(f"Errore: {e}")
# ---------------------------------------------------------------------------
# TAB 2 — Inventario Ingredienti
# ---------------------------------------------------------------------------
with tab_ingredienti:
st.subheader("Ingredienti nel database")
if st.button("Aggiorna", key="refresh_ingredienti"):
st.rerun()
try:
result = fetch_all_ingredients()
if result.get("success") and result.get("data"):
st.caption(f"{result['total']} ingredienti totali")
df = pd.DataFrame(result["data"])
df["dap"] = df["dap"].map({True: "OK", False: "-"})
df["cosing"] = df["cosing"].map({True: "OK", False: "-"})
df["tox"] = df["tox"].map({True: "OK", False: "-"})
df = df.rename(columns={
"cas": "CAS",
"dap": "DAP",
"cosing": "COSING",
"tox": "TOX",
"created_at": "Data Acquisizione",
})
st.dataframe(
df[["CAS", "DAP", "COSING", "TOX", "Data Acquisizione"]],
use_container_width=True,
hide_index=True,
)
else:
st.info("Nessun ingrediente trovato nel database.")
except Exception as e:
st.error(f"Errore nel caricamento: {e}")
# ---------------------------------------------------------------------------
# TAB 3 — Gestione Clienti
# ---------------------------------------------------------------------------
with tab_clienti:
st.subheader("Clienti registrati")
if st.button("Aggiorna", key="refresh_clienti"):
st.rerun()
clienti = fetch_clients()
if not clienti:
st.info("Nessun cliente trovato.")
else:
for cliente in clienti:
col_nome, col_id, col_btn = st.columns([4, 1, 1])
with col_nome:
st.write(cliente["nome_cliente"])
with col_id:
st.caption(f"id: {cliente['id_cliente']}")
with col_btn:
key = f"del_{cliente['id_cliente']}"
if st.button("Elimina", key=key, type="secondary"):
st.session_state[f"confirm_{cliente['id_cliente']}"] = True
if st.session_state.get(f"confirm_{cliente['id_cliente']}"):
nome = cliente["nome_cliente"]
st.warning(f"Confermi l'eliminazione di **{nome}**?")
col_si, col_no, _ = st.columns([1, 1, 6])
with col_si:
if st.button("Sì, elimina", key=f"yes_{cliente['id_cliente']}", type="primary"):
try:
r = delete_client(nome)
if r.status_code == 200:
st.success(f"Cliente '{nome}' eliminato.")
st.session_state.pop(f"confirm_{cliente['id_cliente']}", None)
st.rerun()
elif r.status_code == 409:
st.error(r.json().get("detail", "Il cliente ha ordini collegati."))
st.session_state.pop(f"confirm_{cliente['id_cliente']}", None)
else:
st.error(f"Errore {r.status_code}: {r.json().get('detail', '')}")
st.session_state.pop(f"confirm_{cliente['id_cliente']}", None)
except Exception as e:
st.error(f"Errore: {e}")
with col_no:
if st.button("Annulla", key=f"no_{cliente['id_cliente']}"):
st.session_state.pop(f"confirm_{cliente['id_cliente']}", None)
st.rerun()

View file

@ -1,56 +0,0 @@
import streamlit as st
from functions import send_segnalazione
st.set_page_config(page_title="Segnala un problema", layout="centered")
st.title("Segnala un problema")
st.caption("Usa questo form per segnalare un errore, un dato mancante o qualsiasi problema riscontrato.")
with st.form("ticket_form", clear_on_submit=True):
page = st.selectbox(
"Pagina *",
options=["Cerca", "Ingrediente", "Nuovo PIF", "Ordini PIF", "Esposizione", "Impostazioni", "ECHA", "Altro"],
)
cas = st.text_input(
"CAS (opzionale)",
value=st.session_state.get("selected_cas") or "",
placeholder="es. 56-81-5",
help="Inserire il CAS coinvolto nel problema, se applicabile.",
)
description = st.text_area(
"Descrizione *",
placeholder="Descrivi il problema in modo chiaro e dettagliato.",
height=120,
)
error = st.text_area(
"Messaggio di errore (opzionale)",
placeholder="Incolla qui il messaggio di errore visualizzato, se presente.",
height=80,
)
priority = st.radio(
"Priorità *",
options=["bassa", "media", "alta"],
index=1,
horizontal=True,
captions=["Problema minore", "Impatta l'utilizzo", "Blocca il lavoro"],
)
submitted = st.form_submit_button("Invia segnalazione", type="primary", use_container_width=True)
if submitted:
if not description.strip():
st.error("La descrizione è obbligatoria.")
else:
ok = send_segnalazione(
page=page,
description=description.strip(),
priority=priority,
cas=cas.strip() or None,
error=error.strip() or None,
)
if ok:
st.success("Segnalazione inviata. Grazie per il feedback!")

View file

@ -6,8 +6,6 @@ readme = "README.md"
requires-python = ">=3.12" requires-python = ">=3.12"
dependencies = [ dependencies = [
"duckdb>=1.4.3", "duckdb>=1.4.3",
"extra-streamlit-components>=0.1.71", "marimo>=0.18.0",
"pandas>=2.0.0",
"requests>=2.32.5",
"streamlit>=1.51.0", "streamlit>=1.51.0",
] ]