Compare commits

...

10 commits

Author SHA1 Message Date
adish-rmr
2da0f315c6 fix 2026-03-14 00:01:51 +01:00
adish-rmr
feb58b7821 update fronted big 2026-03-13 23:54:16 +01:00
adish-rmr
83c28bff2f added settings 2026-03-01 00:29:58 +01:00
adish-rmr
44bcb38488 modifica pagina esposizioni 2026-02-27 23:29:19 +01:00
adish-rmr
72121c5293 Merge branches 'main' and 'main' of https://github.com/adish-rmr/cosmoguard_frontend 2026-02-26 16:09:45 +01:00
adish-rmr
9151537d16 fix ingredient page 2026-02-26 16:09:40 +01:00
Adish
445a66df65 ingredient 2026-02-22 20:00:48 +01:00
Adish
d334a554c9 fix 2026-02-22 19:58:10 +01:00
Adish
54dc962229 allign 2026-02-22 19:56:51 +01:00
adish-rmr
584de757bb fix 2026-02-22 19:44:27 +01:00
14 changed files with 13613 additions and 956 deletions

119
app.py
View file

@ -1,8 +1,10 @@
import re import re
import extra_streamlit_components as stx
import streamlit as st import streamlit as st
from functions_ui import search_cas_inci from functions import check_auth, do_logout
from functions_ui import search_cas_inci, search_cir, show_login_page
# Configure page # Configure page
st.set_page_config( st.set_page_config(
@ -11,69 +13,45 @@ st.set_page_config(
layout="wide" layout="wide"
) )
# Password protection st.session_state["_cm"] = stx.CookieManager(key="pif_cookies") # una sola volta per rerun
def check_password():
"""Returns `True` if the user had the correct password."""
def password_entered(): if not check_auth():
"""Checks whether a password entered by the user is correct.""" show_login_page()
if st.session_state["password"] == st.secrets["passwords"]["app_password"]:
st.session_state["password_correct"] = True
del st.session_state["password"] # Don't store password
else:
st.session_state["password_correct"] = False
# First run, show input for password
if "password_correct" not in st.session_state:
st.text_input(
"Password", type="password", on_change=password_entered, key="password"
)
return False
# Password not correct, show input + error
elif not st.session_state["password_correct"]:
st.text_input(
"Password", type="password", on_change=password_entered, key="password"
)
st.error("😕 Password incorrect")
return False
# Password correct
else:
return True
if not check_password():
st.stop() st.stop()
# Define home page function # Define home page function
def home(): def home():
name = st.session_state.get("user_name") or st.session_state.get("user_email") or ""
greeting = f"Benvenuto, {name}" if name else "Benvenuto"
st.title("LMB App: PIF & Database Tossicologico") st.title("LMB App: PIF & Database Tossicologico")
st.caption(greeting)
# Inizializza session_state per il CAS number se non esiste if "selected_cas" not in st.session_state:
if 'selected_cas' not in st.session_state:
st.session_state.selected_cas = None st.session_state.selected_cas = None
if "selected_inci" not in st.session_state:
st.session_state.selected_inci = None
with st.container(border=True): with st.container(border=True):
col_left, col_right = st.columns([2, 1]) col_left, col_right = st.columns([2, 1])
with col_left: with col_left:
type = st.radio("Cerca per:", ("CAS", "INCI"), index=0, key="search_mode", horizontal=True) search_type = st.radio("Cerca per:", ("CAS", "INCI"), index=0, key="search_mode", horizontal=True)
input = st.text_input("Inserisci:", "") search_input = st.text_input("Inserisci:", "")
with col_right: with col_right:
real_time = st.checkbox("Ricerca online", value=False, key="real_time_search") real_time = st.checkbox("Ricerca online", value=False, key="real_time_search")
force_refresh = st.checkbox("Aggiorna Ingrediente", value=False) force_refresh = st.checkbox("Aggiorna Ingrediente", value=False)
search_cir_enabled = st.checkbox("Cerca in CIR", value=False, key="search_cir")
if input: if search_input:
st.caption(f"Ricerca per {input}: trovati i seguenti ingredienti.") st.caption(f"Ricerca per {search_input}: trovati i seguenti ingredienti.")
if type == "CAS": results = search_cas_inci(search_input, type="cas" if search_type == "CAS" else "inci")
results = search_cas_inci(input, type='cas')
else:
results = search_cas_inci(input, type='inci')
if results: if results:
display_options = [f"{cas} - {inci}" for cas, inci in results] display_options = [f"{cas} - {inci}" for cas, inci in results]
selected_display = st.selectbox("Risultati", options=[""] + display_options, key="cas_selectbox") selected_display = st.selectbox("Risultati", options=[""] + display_options, key="cas_selectbox")
if selected_display and selected_display != "": if selected_display:
cas_pattern = r'\b\d{2,7}-\d{2}-\d\b' found_cas = re.findall(r"\b\d{2,7}-\d{2}-\d\b", selected_display)
found_cas = re.findall(cas_pattern, selected_display)
unique_cas = list(dict.fromkeys(found_cas)) unique_cas = list(dict.fromkeys(found_cas))
if not unique_cas: if not unique_cas:
@ -81,20 +59,22 @@ def home():
selected_cas = None selected_cas = None
elif len(unique_cas) == 1: elif len(unique_cas) == 1:
selected_cas = unique_cas[0] selected_cas = unique_cas[0]
#st.info(f"CAS rilevato: {selected_cas}")
else: else:
selected_cas = st.selectbox( selected_cas = st.selectbox(
label="Sono stati rilevati più CAS. Selezionane uno:", label="Sono stati rilevati più CAS. Selezionane uno:",
options=unique_cas options=unique_cas
) )
if selected_cas: if selected_cas:
st.session_state.selected_cas = selected_cas st.session_state.selected_cas = selected_cas
#st.success(f"CAS selezionato: {selected_cas}") match = next((inci for cas, inci in results if cas == selected_cas), None)
st.session_state.selected_inci = match
else: else:
st.warning("Nessun risultato trovato nel database.") st.warning("Nessun risultato trovato nel database.")
if st.button("Usa questo CAS") and type == "CAS": if st.button("Usa questo CAS") and search_type == "CAS":
st.session_state.selected_cas = input.strip() st.session_state.selected_cas = search_input.strip()
st.success(f"CAS salvato: {input}") st.session_state.selected_inci = None
st.success(f"CAS salvato: {search_input}")
else: else:
st.info("INCI non trovato, cerca per CAS o modifica l'input.") st.info("INCI non trovato, cerca per CAS o modifica l'input.")
@ -108,7 +88,17 @@ def home():
st.session_state.force_refresh = force_refresh st.session_state.force_refresh = force_refresh
# CIR search results
if search_cir_enabled and search_input:
st.markdown("---")
st.markdown("**Rapporti CIR:**")
cir_results = search_cir(search_input)
if cir_results:
for name, inci, url in cir_results:
label = name if name == inci else f"{name} ({inci})"
st.markdown(f"- [{label}]({url})")
else:
st.caption("Nessun rapporto CIR trovato.")
# Guide section # Guide section
st.divider() st.divider()
@ -168,8 +158,24 @@ def home():
# Changelog section # Changelog section
with st.expander("📝 Registro degli aggiornamenti"): with st.expander("📝 Registro degli aggiornamenti"):
# Placeholder for future versions
st.markdown(""" st.markdown("""
### v1.0
- Aggiunta autenticazione con mail e password
- Modificato il backend per maggiore sicurezza
- Ottimizzazioni generali e refactoring del codice
- Aggiunto form per segnalare errori
- Migliorato l'algoritmo per scegliere l'indicatore tossicologico migliore:
* Ora si preferiscono indicatori con route più rilevante per il MoS (es. NOAEL dermal > NOAEL orale)
* In caso di più indicatori con stesso endpoint, si sceglie quello con la dose più bassa (solo per NOAEL/LOAEL)
### v0.9 - 2026-03-01
- Ottimizzazione della pagina 'Ingrediente', risolto un bug che impediva di caricare i dati CosIng
- Modificato la validazione dei dati in esposizione
- Creata una pagina di impostazione per:
* Creazione di parametri custom di tossicologia per un ingrediente
* Cancellazione di clienti (e relativi dati)
* Visualizzare gli ingredienti salvati nel database
### v0.8 ### v0.8
*v0.8.0 - 2026-02-22 *v0.8.0 - 2026-02-22
- Versione iniziale (v0.8.0) rilasciata per test interno e feedback - Versione iniziale (v0.8.0) rilasciata per test interno e feedback
@ -223,18 +229,23 @@ def home():
# Navigation # Navigation
home_page = st.Page(home, title="Cerca", icon="🏠", default=True) home_page = st.Page(home, title="Cerca", icon="🏠", default=True)
echa_page = st.Page("pages/echa.py", title="ECHA Database", icon="🧪") echa_page = st.Page("pages/echa.py", title="ECHA (legacy)", icon="🧪")
#cosing_page = st.Page("pages/cosing.py", title="CosIng", icon="💄")
#dap_page = st.Page("pages/dap.py", title="DAP", icon="🧬")
exposition_page = st.Page("pages/exposition_page.py", title="Esposizione", icon="☀️") exposition_page = st.Page("pages/exposition_page.py", title="Esposizione", icon="☀️")
ingredients_page = st.Page("pages/ingredients_page.py", title="Ingrediente", icon="📋") ingredients_page = st.Page("pages/ingredients_page.py", title="Ingrediente", icon="📋")
order_page = st.Page("pages/order_page.py", title="Nuovo PIF", icon="🛒") order_page = st.Page("pages/order_page.py", title="Nuovo PIF", icon="🛒")
list_orders = st.Page("pages/list_orders.py", title="Ordini PIF", icon="📦") list_orders = st.Page("pages/list_orders.py", title="Ordini PIF", icon="📦")
settings_page = st.Page("pages/settings_page.py", title="Impostazioni", icon="⚙️")
ticket_page = st.Page("pages/ticket.py", title="Segnala problema", icon="🚩")
pg = st.navigation({ pg = st.navigation({
"Ricerca": [home_page, ingredients_page], "Ricerca": [home_page, ingredients_page],
"PIF": [list_orders, order_page, exposition_page], "PIF": [list_orders, order_page, exposition_page],
"Online": [echa_page], "Utilità": [echa_page, settings_page, ticket_page],
}) })
with st.sidebar:
if st.button("Esci", use_container_width=True):
do_logout()
st.rerun()
pg.run() pg.run()

12480
cir-reports.csv Normal file

File diff suppressed because it is too large Load diff

View file

@ -1,71 +1,632 @@
import json import re
from typing import Any, Dict, List, Union import time
import requests from typing import Any, Dict, Optional
import extra_streamlit_components as stx
import pandas as pd import pandas as pd
import requests
import streamlit as st import streamlit as st
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
API_BASE = "http://localhost:8000/api/v1"
AUTH_BASE = "http://localhost:8000/api/v1"
CAS_PATTERN = re.compile(r"^\d{2,7}-\d{2}-\d$")
PERCENTAGE_TARGET = 100.0
PERCENTAGE_TOLERANCE = 0.01
WATER_INCI = {"aqua", "water", "eau", "aqua/water", "water/aqua", "aqua/eau", "acqua"}
STATUS_MAP = {
1: ("Ricevuto", "🔵"),
2: ("Validato", "🟡"),
3: ("Compilazione", "🟠"),
5: ("Arricchito", "🟢"),
6: ("Calcolo", "🔵"),
8: ("Completato", ""),
9: ("Errore", "🔴"),
}
# ---------------------------------------------------------------------------
# General utilities
# ---------------------------------------------------------------------------
def is_water_inci(inci_value: str) -> bool:
"""Controlla se l'INCI corrisponde ad acqua o varianti."""
if not inci_value or not isinstance(inci_value, str):
return False
return inci_value.strip().lower() in WATER_INCI
def status_label(stato_ordine: int) -> str:
"""Ritorna label con emoji per lo stato."""
name, emoji = STATUS_MAP.get(stato_ordine, ("Sconosciuto", ""))
return f"{emoji} {name}"
def make_empty_ingredient_df(n: int = 5) -> pd.DataFrame:
"""Crea un DataFrame vuoto per la tabella ingredienti."""
return pd.DataFrame({
"inci": [""] * n,
"cas": [""] * n,
"percentage": [0.0] * n,
"is_colorante": [False] * n,
"skip_tox": [False] * n,
})
# ---------------------------------------------------------------------------
# Cookie
# ---------------------------------------------------------------------------
_COOKIE_RT = "pif_rt"
_COOKIE_MAX_AGE = 7 * 24 * 3600 # 7 giorni in secondi
def get_cookie_manager() -> stx.CookieManager:
"""Ritorna il CookieManager della sessione corrente (creato una volta sola per rerun in app.py)."""
return st.session_state.get("_cm")
# ---------------------------------------------------------------------------
# Auth
# ---------------------------------------------------------------------------
def do_login(email: str, password: str) -> bool:
"""Chiama l'endpoint di login e salva i token in session_state. Ritorna True se ok."""
try:
resp = requests.post(
f"{AUTH_BASE}/auth/login",
json={"email": email, "password": password},
timeout=10,
)
except requests.RequestException as e:
st.error(f"Errore di rete: {e}")
return False
if resp.status_code == 401:
st.error("Credenziali non valide.")
return False
if resp.status_code != 200:
st.error(f"Errore del server ({resp.status_code}). Riprova più tardi.")
return False
data = resp.json()
st.session_state["access_token"] = data["access_token"]
st.session_state["refresh_token"] = data["refresh_token"]
st.session_state["expires_at"] = time.time() + data["expires_in"] - 30
get_cookie_manager().set(_COOKIE_RT, data["refresh_token"], max_age=_COOKIE_MAX_AGE, key="set_rt_login")
_fetch_user_info()
return True
def do_refresh() -> bool:
"""Rinnova l'access_token. Ritorna False se il refresh fallisce (forza re-login)."""
try:
resp = requests.post(
f"{AUTH_BASE}/auth/refresh",
json={"refresh_token": st.session_state.get("refresh_token", "")},
timeout=10,
)
except requests.RequestException:
return False
if resp.status_code != 200:
return False
data = resp.json()
st.session_state["access_token"] = data["access_token"]
st.session_state["refresh_token"] = data["refresh_token"]
st.session_state["expires_at"] = time.time() + data["expires_in"] - 30
get_cookie_manager().set(_COOKIE_RT, data["refresh_token"], max_age=_COOKIE_MAX_AGE, key="set_rt_refresh")
return True
def do_logout():
"""Chiama il logout sull'API e pulisce la session_state."""
refresh_token = st.session_state.get("refresh_token", "")
if refresh_token:
try:
requests.post(
f"{AUTH_BASE}/auth/logout",
json={"refresh_token": refresh_token},
timeout=5,
)
except requests.RequestException:
pass
for key in ["access_token", "refresh_token", "expires_at", "user_name", "user_email", "user_id"]:
st.session_state.pop(key, None)
get_cookie_manager().delete(_COOKIE_RT, key="del_rt_logout")
def check_auth() -> bool:
"""Ritorna True se l'utente è autenticato e il token è valido.
Se il token è assente dalla session_state tenta il restore dal cookie.
Se il token è scaduto tenta il refresh."""
if "access_token" in st.session_state:
if time.time() < st.session_state.get("expires_at", 0):
return True
if do_refresh():
return True
do_logout()
return False
# Nessun token in memoria: prova il restore dal cookie
cm = get_cookie_manager()
rt = cm.get(_COOKIE_RT)
if rt:
st.session_state["refresh_token"] = rt
if do_refresh():
_fetch_user_info()
return True
# Cookie scaduto o invalidato: rimuovilo
cm.delete(_COOKIE_RT, key="del_rt_expired")
return False
def _auth_headers() -> dict:
"""Ritorna gli headers Authorization con il Bearer token corrente."""
token = st.session_state.get("access_token", "")
return {"Authorization": f"Bearer {token}"}
def _fetch_user_info() -> None:
"""Chiama /auth/me e salva email e nome in session_state."""
try:
resp = requests.get(f"{AUTH_BASE}/auth/me", headers=_auth_headers(), timeout=5)
if resp.status_code == 200:
data = resp.json()
st.session_state["user_email"] = data.get("email")
st.session_state["user_name"] = data.get("name")
st.session_state["user_id"] = data.get("id")
except requests.RequestException:
pass
# ---------------------------------------------------------------------------
# Order validation and building
# ---------------------------------------------------------------------------
def validate_order(
client_name: str,
product_name: str,
selected_preset: str | None,
df: pd.DataFrame,
) -> list[str]:
"""Valida tutti i campi dell'ordine. Ritorna lista di errori (vuota = valido)."""
errors = []
if not client_name or not client_name.strip():
errors.append("Il campo 'Nome del cliente' e obbligatorio.")
if not product_name or not product_name.strip():
errors.append("Il campo 'Nome del prodotto' e obbligatorio.")
if not selected_preset:
errors.append("Selezionare un preset di esposizione.")
str_df = df.copy()
str_df["inci"] = str_df["inci"].fillna("")
str_df["cas"] = str_df["cas"].fillna("")
active = str_df[(str_df["cas"].str.strip() != "") | (str_df["percentage"] > 0)]
if active.empty:
errors.append("Inserire almeno un ingrediente.")
return errors
for idx, row in active.iterrows():
cas_val = row["cas"].strip()
inci_val = row["inci"].strip()
is_col = bool(row["is_colorante"])
if is_water_inci(inci_val) and cas_val == "":
continue
if is_col:
continue
if cas_val == "":
errors.append(f"Riga {idx + 1}: inserire un CAS number oppure selezionare 'Colorante'.")
continue
if not CAS_PATTERN.match(cas_val):
hint = f" ({inci_val})" if inci_val else ""
errors.append(
f"Formato CAS non valido alla riga {idx + 1}{hint}: '{cas_val}'. "
"Formato atteso: XX-XX-X (es. 56-81-5)."
)
total_pct = active["percentage"].sum()
if abs(total_pct - PERCENTAGE_TARGET) > PERCENTAGE_TOLERANCE:
errors.append(
f"La somma delle percentuali e {total_pct:.6f}%, "
f"ma deve essere 100% (tolleranza +/- {PERCENTAGE_TOLERANCE}%)."
)
return errors
def build_order_payload(
client_name: str,
product_name: str,
preset_name: str,
df: pd.DataFrame,
) -> dict:
"""Costruisce il JSON dell'ordine a partire dai dati del form."""
str_df = df.copy()
str_df["inci"] = str_df["inci"].fillna("")
str_df["cas"] = str_df["cas"].fillna("")
active = str_df[(str_df["cas"].str.strip() != "") | (str_df["percentage"] > 0)]
ingredients = []
for _, row in active.iterrows():
inci_val = row["inci"].strip()
skip = bool(row["skip_tox"]) or is_water_inci(inci_val)
ingredients.append({
"inci": inci_val if inci_val else None,
"cas": row["cas"].strip(),
"percentage": round(float(row["percentage"]), 6),
"is_colorante": bool(row["is_colorante"]),
"skip_tox": skip,
})
return {
"client_name": client_name.strip(),
"product_name": product_name.strip(),
"preset_esposizione": preset_name,
"ingredients": ingredients,
}
# ---------------------------------------------------------------------------
# Ingredients API
# ---------------------------------------------------------------------------
def fetch_ingredient(cas: str, force: bool = False) -> tuple[bool, Any]:
"""Cerca un ingrediente per CAS. Ritorna (success, data) oppure (False, error_message)."""
try:
resp = requests.post(
f"{API_BASE}/ingredients/search",
json={"cas": cas, "force": force},
headers=_auth_headers(),
timeout=120,
)
result = resp.json()
if result.get("success") and result.get("data"):
return True, result["data"]
return False, result.get("error", f"Nessun dato per CAS {cas}")
except requests.ConnectionError:
return False, "Impossibile connettersi all'API. Verifica che il server sia attivo."
except Exception as e:
return False, str(e)
def add_tox_indicator(payload: dict) -> requests.Response:
"""Aggiunge un indicatore tossicologico custom."""
return requests.post(f"{API_BASE}/ingredients/add-tox-indicator", json=payload, headers=_auth_headers(), timeout=30)
def fetch_all_ingredients() -> dict:
"""Recupera tutti gli ingredienti dal database. Ritorna il JSON grezzo."""
resp = requests.get(f"{API_BASE}/ingredients/list", headers=_auth_headers(), timeout=10)
return resp.json()
def fetch_clients() -> list[dict]:
"""Recupera la lista clienti. Ritorna lista di {id_cliente, nome_cliente}."""
try:
resp = requests.get(f"{API_BASE}/ingredients/clients", headers=_auth_headers(), timeout=10)
data = resp.json()
if data.get("success") and data.get("data"):
return data["data"]
return []
except requests.ConnectionError:
return []
except Exception:
return []
def create_client(nome_cliente: str) -> bool:
"""Crea un nuovo cliente via API. Ritorna True se riuscito."""
try:
resp = requests.post(
f"{API_BASE}/ingredients/clients",
json={"nome_cliente": nome_cliente},
headers=_auth_headers(),
timeout=10,
)
return resp.json().get("success", False)
except Exception:
return False
def delete_client(nome: str) -> requests.Response:
"""Elimina un cliente per nome. Ritorna la Response grezza."""
return requests.delete(f"{API_BASE}/ingredients/clients/{nome}", headers=_auth_headers(), timeout=10)
# ---------------------------------------------------------------------------
# Esposizione / Presets API
# ---------------------------------------------------------------------------
def fetch_presets() -> list[str]:
"""Recupera i nomi dei preset di esposizione dall'API."""
try:
resp = requests.get(f"{API_BASE}/esposition/presets", headers=_auth_headers(), timeout=10)
data = resp.json()
if data.get("success") and data.get("data"):
return [p["preset_name"] for p in data["data"]]
return []
except requests.ConnectionError:
st.error("Impossibile connettersi all'API per caricare i preset. Verifica che il server sia attivo.")
return []
except Exception as e:
st.error(f"Errore nel caricamento dei preset: {e}")
return []
def fetch_all_presets() -> dict:
"""Recupera tutti i preset con dati completi. Ritorna il JSON grezzo."""
resp = requests.get(f"{API_BASE}/esposition/presets", headers=_auth_headers(), timeout=10)
return resp.json()
def create_exposition_preset(payload: dict) -> dict:
"""Crea un nuovo preset di esposizione. Ritorna il JSON della risposta."""
resp = requests.post(f"{API_BASE}/esposition/create", json=payload, headers=_auth_headers(), timeout=10)
return resp.json()
def delete_exposition_preset(name: str) -> dict:
"""Elimina un preset di esposizione per nome. Ritorna il JSON della risposta."""
resp = requests.delete(f"{API_BASE}/esposition/delete/{name}", headers=_auth_headers(), timeout=10)
return resp.json()
# ---------------------------------------------------------------------------
# Orders API
# ---------------------------------------------------------------------------
def fetch_orders() -> list:
"""Recupera la lista ordini dall'API."""
try:
resp = requests.get(f"{API_BASE}/orders/list", headers=_auth_headers(), timeout=15)
data = resp.json()
if data.get("success"):
return data.get("data", [])
return []
except requests.ConnectionError:
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo.")
return []
except Exception as e:
st.error(f"Errore nel caricamento degli ordini: {e}")
return []
def fetch_order_detail(id_ordine: int) -> Optional[dict]:
"""Recupera il dettaglio completo di un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/detail/{id_ordine}", headers=_auth_headers(), timeout=15)
data = resp.json()
if data.get("success"):
return data.get("order")
st.error(data.get("detail", "Errore nel recupero dettaglio ordine"))
return None
except requests.ConnectionError:
st.error("Impossibile connettersi all'API.")
return None
except Exception as e:
st.error(f"Errore: {e}")
return None
def api_retry_order(id_ordine: int) -> dict:
"""Chiama POST /orders/retry/{id_ordine}."""
try:
resp = requests.post(f"{API_BASE}/orders/retry/{id_ordine}", headers=_auth_headers(), timeout=15)
return resp.json()
except Exception as e:
return {"success": False, "error": str(e)}
def api_delete_order(id_ordine: int) -> dict:
"""Chiama DELETE /orders/{id_ordine}."""
try:
resp = requests.delete(f"{API_BASE}/orders/{id_ordine}", headers=_auth_headers(), timeout=15)
return resp.json()
except Exception as e:
return {"success": False, "error": str(e)}
def download_excel(id_ordine: int) -> Optional[bytes]:
"""Scarica il file Excel per un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/export/{id_ordine}", headers=_auth_headers(), timeout=120)
if resp.status_code == 200:
return resp.content
st.error(f"Errore download Excel: {resp.json().get('detail', resp.status_code)}")
return None
except Exception as e:
st.error(f"Errore download: {e}")
return None
def download_sources(id_ordine: int) -> Optional[bytes]:
"""Scarica lo ZIP delle fonti PDF per un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/export-sources/{id_ordine}", headers=_auth_headers(), timeout=300)
if resp.status_code == 200:
return resp.content
st.error(f"Errore download fonti: {resp.json().get('detail', resp.status_code)}")
return None
except Exception as e:
st.error(f"Errore download: {e}")
return None
# ---------------------------------------------------------------------------
# Segnalazioni (ticket)
# ---------------------------------------------------------------------------
def send_segnalazione(
page: str,
description: str,
priority: str,
cas: Optional[str] = None,
error: Optional[str] = None,
) -> bool:
"""Invia una segnalazione all'API. Ritorna True se salvata con successo."""
payload = {
"page": page,
"description": description,
"priority": priority,
}
if cas:
payload["cas"] = cas
if error:
payload["error"] = error
try:
resp = requests.post(
f"{API_BASE}/common/segnalazione",
json=payload,
headers=_auth_headers(),
timeout=10,
)
if resp.status_code == 200 and resp.json().get("success"):
return True
st.error(resp.json().get("detail", "Errore nell'invio della segnalazione"))
return False
except Exception as e:
st.error(f"Errore di connessione: {e}")
return False
# ---------------------------------------------------------------------------
# ECHA data extraction (for echa.py page)
# ---------------------------------------------------------------------------
def extract_tox_info_values(data: dict) -> list:
"""Extract DNEL values from toxicological information."""
rows = []
sections = data.get("toxicological_information", {}).get("sections", [])
for section in sections:
label = section.get("label", "")
if "subsections" in section:
for subsec in section["subsections"]:
effect_type = subsec.get("label", "")
if "subsections" in subsec:
for sub2 in subsec["subsections"]:
dose = sub2.get("StDose", {})
if isinstance(dose, dict) and dose.get("value"):
rows.append({
"Population/Route": label,
"Effect Type": effect_type,
"Exposure": sub2.get("label", ""),
"Assessment": sub2.get("HazardAssessment", ""),
"Value numerical": dose.get("value", ""),
"Unit": dose.get("unit", ""),
"Endpoint": sub2.get("MostSensitiveEndpoint", "")
})
return rows
def extract_acute_values(data: dict) -> list:
"""Extract acute toxicity values."""
rows = []
sections = data.get("acute_toxicity", {}).get("sections", [])
for section in sections:
if section.get("label") == "Key value for assessment":
for subsec in section.get("subsections", []):
if subsec.get("EffectLevelValue"):
rows.append({
"Route": subsec.get("label", "").replace("Acute toxicity: ", ""),
"Endpoint": subsec.get("EffectLevelUnit", ""),
"Value": subsec.get("EffectLevelValue", ""),
"Conclusion": subsec.get("EndpointConclusion", "")
})
return rows
def extract_repeated_values(data: dict) -> list:
"""Extract repeated dose toxicity values."""
rows = []
sections = data.get("repeated_dose_toxicity", {}).get("sections", [])
for section in sections:
if section.get("label") == "Key value for assessment":
for subsec in section.get("subsections", []):
study_type = subsec.get("label", "")
for sub2 in subsec.get("subsections", []):
if sub2.get("EffectLevelValue"):
rows.append({
"Study Type": study_type,
"Route": sub2.get("label", ""),
"Endpoint": sub2.get("EffectLevelUnit", ""),
"Value": sub2.get("EffectLevelValue", ""),
"Species": sub2.get("Species", "-")
})
return rows
# ---------------------------------------------------------------------------
# Legacy / original API functions
# ---------------------------------------------------------------------------
def echa_request(cas_num: str) -> Dict[str, Any]: def echa_request(cas_num: str) -> Dict[str, Any]:
url = 'https://api.cosmoguard.it/api/v1/echa/search' """Recupera i dati ECHA per un numero CAS."""
response = requests.post(url, json={'cas': cas_num}) response = requests.post(f"{API_BASE}/echa/search", json={"cas": cas_num}, headers=_auth_headers())
data = response.json() data = response.json()
if data['success'] == True: return data["data"] if data["success"] else data["error"]
return data['data']
else:
return data['error']
def cosing_request(cas_num: str) -> Dict[str, Any]: def cosing_request(cas_num: str) -> Dict[str, Any]:
url = 'https://api.cosmoguard.it/api/v1/cosing/search' """Recupera i dati COSING per un numero CAS."""
response = requests.post(url, json={ response = requests.post(
"full": True, f"{API_BASE}/cosing/search",
"mode": "cas", json={"full": True, "mode": "cas", "text": cas_num},
"text": cas_num headers=_auth_headers(),
}) )
data = response.json() data = response.json()
if data['success'] == True: return data["data"] if data["success"] else data["error"]
return data['data']
else:
return data['error']
def generate_pdf_download(cas, origin, link): def generate_pdf_download(cas, origin, link):
url = 'https://api.cosmoguard.it/api/v1/common/generate-pdf' """Genera e scarica un PDF tramite l'API."""
name = f'{cas}_{origin}' name = f"{cas}_{origin}"
if link is not None: if link is not None:
response = requests.post( response = requests.post(
url, f"{API_BASE}/common/generate-pdf",
json = { json={"link": link, "name": name},
'link': link, headers=_auth_headers(),
'name': name
}
) )
data = response.json() data = response.json()
else: else:
data = { data = {"success": False, "error": "No dossier exists for this origin."}
'success': False,
'error': 'No dossier exists for this origin.' if data["success"]:
} response = requests.get(f"{API_BASE}/common/download-pdf/{name}", headers=_auth_headers())
if data['success'] == True:
url = f'https://api.cosmoguard.it/api/v1/common/download-pdf/{name}'
response = requests.get(url)
response.raise_for_status() response.raise_for_status()
return response.content return response.content
else: else:
return data['error'] return data["error"]
def cosing_download(ref_no: str): def cosing_download(ref_no: str):
url = f'https://api.tech.ec.europa.eu/cosing20/1.0/api/cosmetics/{ref_no}/export-pdf' """Scarica il PDF ufficiale COSING per un numero di riferimento."""
url = f"https://api.tech.ec.europa.eu/cosing20/1.0/api/cosmetics/{ref_no}/export-pdf"
headers = { headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0', "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0",
'Accept': 'application/json, text/plain, */*', "Accept": "application/json, text/plain, */*",
'Accept-Language': 'it-IT,it;q=0.9', "Accept-Language": "it-IT,it;q=0.9",
'Cache-Control': 'No-Cache', "Cache-Control": "No-Cache",
'Origin': 'https://ec.europa.eu', "Origin": "https://ec.europa.eu",
'Referer': 'https://ec.europa.eu/', "Referer": "https://ec.europa.eu/",
'Sec-Fetch-Dest': 'empty', "Sec-Fetch-Dest": "empty",
'Sec-Fetch-Mode': 'cors', "Sec-Fetch-Mode": "cors",
'Sec-Fetch-Site': 'same-site', "Sec-Fetch-Site": "same-site",
} }
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
if response.status_code == 200: if response.status_code == 200:
return response.content return response.content
else:
return f"Error: {response.status_code} - {response.text}" return f"Error: {response.status_code} - {response.text}"

View file

@ -1,51 +1,134 @@
import pandas as pd
import streamlit as st import streamlit as st
import json
import uuid
from functions import generate_pdf_download from functions import do_login, generate_pdf_download
def open_csv_file(file_path): def open_csv_file(file_path):
"""Apre un file CSV e restituisce i dati come dataframe.""" """Apre un file CSV e restituisce una connessione DuckDB in-memory."""
import duckdb import duckdb
con = duckdb.connect(database=":memory:")
# Usa DuckDB per leggere il file CSV con.execute(f"CREATE TABLE index AS SELECT * FROM read_csv_auto('{file_path}')")
con = duckdb.connect(database=':memory:')
query = f"CREATE TABLE index AS SELECT * FROM read_csv_auto('{file_path}')"
con.execute(query)
return con return con
def search_cas_inci(input, type = 'cas'):
"""Cerca un numero CAS nei dati forniti e restituisce CAS e INCI.""" def search_cas_inci(input, type="cas"):
con = open_csv_file('data.csv') """Cerca un numero CAS o INCI nel file CSV. Ritorna lista di tuple (casNo, inciName)."""
if type == 'cas': con = open_csv_file("streamlit\data.csv")
if type == "cas":
query = f"SELECT * FROM index WHERE casNo LIKE '%{input}%'" query = f"SELECT * FROM index WHERE casNo LIKE '%{input}%'"
else: else:
query = f"SELECT * FROM index WHERE inciName ILIKE '%{input}%'" query = f"SELECT * FROM index WHERE inciName ILIKE '%{input}%'"
results = con.execute(query).fetchdf() results = con.execute(query).fetchdf()
# Restituisce una lista di tuple (casNo, inciName)
if not results.empty: if not results.empty:
return list(zip(results['casNo'].tolist(), results['inciName'].tolist())) return list(zip(results["casNo"].tolist(), results["inciName"].tolist()))
return [] return []
def search_cir(input_text: str) -> list[tuple]:
"""Cerca nel CIR database per nome ingrediente o INCI. Ritorna lista di (name, inci, url)."""
con = open_csv_file("streamlit\cir-reports.csv")
query = f"""
SELECT
"tablescraper-selected-row" AS ingredient_name,
"tablescraper-selected-row 2" AS inci_name,
"tablescraper-selected-row href" AS url
FROM index
WHERE (
"tablescraper-selected-row" ILIKE '%{input_text}%'
OR "tablescraper-selected-row 2" ILIKE '%{input_text}%'
)
AND "tablescraper-selected-row" != 'Ingredient Name as Used:'
AND "tablescraper-selected-row href" != ''
LIMIT 20
"""
results = con.execute(query).fetchdf()
if not results.empty:
return list(zip(
results["ingredient_name"].tolist(),
results["inci_name"].tolist(),
results["url"].tolist(),
))
return []
def download_pdf(casNo, origin, link): def download_pdf(casNo, origin, link):
"""Scarica un PDF generato dall'API.""" """Mostra i pulsanti per generare e scaricare un PDF dall'API."""
if st.button("Generate PDF", key=f"gen_{casNo}_{origin}"): if st.button("Generate PDF", key=f"gen_{casNo}_{origin}"):
with st.spinner("Fetching PDF..."): with st.spinner("Fetching PDF..."):
pdf_data = generate_pdf_download( pdf_data = generate_pdf_download(cas=casNo, origin=origin, link=link)
cas=casNo,
origin=origin,
link=link
)
st.download_button( st.download_button(
label="Download PDF", label="Download PDF",
data=pdf_data, data=pdf_data,
file_name=f"{casNo}_{origin}.pdf", file_name=f"{casNo}_{origin}.pdf",
mime="application/pdf", mime="application/pdf",
key=f"dl_{casNo}_{origin}" key=f"dl_{casNo}_{origin}",
) )
def show_login_page():
"""Mostra il form di login."""
_, col, _ = st.columns([1, 1, 1])
with col:
st.title("LMB App")
st.caption("Accedi per continuare")
with st.form("login_form"):
email = st.text_input("Email", placeholder="utente@esempio.it")
password = st.text_input("Password", type="password")
submitted = st.form_submit_button("Accedi", use_container_width=True, type="primary")
if submitted:
if not email or not password:
st.warning("Inserisci email e password.")
else:
with st.spinner("Autenticazione in corso..."):
if do_login(email, password):
st.rerun()
def display_orderData(order_data: dict):
"""Mostra un riepilogo leggibile dell'ordine (non JSON)."""
st.markdown("### Riepilogo Ordine")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Cliente", order_data["client_name"])
with col2:
st.metric("Prodotto", order_data["product_name"])
with col3:
st.metric("Preset Esposizione", order_data["preset_esposizione"])
ingredients = order_data.get("ingredients", [])
total_pct = sum(i["percentage"] for i in ingredients)
col4, col5, col6 = st.columns(3)
with col4:
st.metric("Numero ingredienti", len(ingredients))
with col5:
st.metric("Percentuale totale", f"{total_pct:.6f}%")
with col6:
n_col = sum(1 for i in ingredients if i["is_colorante"])
n_skip = sum(1 for i in ingredients if i["skip_tox"])
parts = []
if n_col > 0:
parts.append(f"{n_col} colorante/i")
if n_skip > 0:
parts.append(f"{n_skip} senza tox")
st.metric("Flag attivi", ", ".join(parts) if parts else "Nessuno")
st.markdown("**Ingredienti:**")
display_rows = [
{
"INCI": i["inci"] if i["inci"] else "-",
"CAS": i["cas"] if i["cas"] else "-",
"Percentuale (%)": f"{i['percentage']:.6f}",
"Colorante": "Si" if i["is_colorante"] else "-",
"Salta Tox": "Si" if i["skip_tox"] else "-",
}
for i in ingredients
]
st.dataframe(pd.DataFrame(display_rows), width="stretch", hide_index=True)
if __name__ == "__main__": if __name__ == "__main__":
data = search_cas_inci('102242-62-6') data = search_cas_inci("102242-62-6")
print(data) print(data)

View file

@ -1,138 +0,0 @@
import streamlit as st
import json
from functions import cosing_request
from functions_ui import download_pdf
st.title("CosIng Database Viewer")
st.set_page_config(
page_title="CosIng",
page_icon="🧪",
layout="wide"
)
def display_ingredient(data: dict, level: int = 0):
"""Display ingredient information using containers and metrics."""
# Get names
name = data.get("commonName") or data.get("inciName") or "Unknown"
item_type = data.get("itemType", "ingredient")
# Header
if level == 0:
st.title(f"🧪 {name}")
else:
st.markdown(f"### {name}")
# Type badge and chemical name
col_header1, col_header2 = st.columns([1, 3])
with col_header1:
if item_type == "substance":
st.caption("🔬 Substance")
else:
st.caption("🧴 Ingredient")
# Identifiers container
cas_numbers = data.get("casNo", [])
ec_numbers = data.get("ecNo", [])
ref_no = data.get("refNo", "")
if cas_numbers or ec_numbers or ref_no:
with st.container(border=True):
cols = st.columns(3)
with cols[0]:
if cas_numbers:
st.metric("CAS", ", ".join(cas_numbers))
else:
st.metric("CAS", "")
with cols[1]:
if ec_numbers:
st.metric("EC", ", ".join(ec_numbers))
else:
st.metric("EC", "")
# Functions
functions = data.get("functionName", [])
if functions:
with st.container(border=True):
st.markdown("**Functions**")
func_cols = st.columns(len(functions))
for i, func in enumerate(functions):
with func_cols[i]:
st.success(func.title())
# Regulatory info
restrictions = data.get("otherRestrictions", [])
annex = data.get("annexNo", [])
regulations = data.get("otherRegulations", [])
opinions = data.get("sccsOpinion", [])
opinion_urls = data.get("sccsOpinionUrls", [])
if restrictions or annex or regulations or opinions or opinion_urls:
with st.container(border=True):
st.markdown("**Regulatory Information**")
if annex:
st.info(f"📋 **Annex {', '.join(annex)}**")
if restrictions:
for r in restrictions:
st.warning(f"⚠️ {r}")
if regulations:
st.write("Other regulations: " + "; ".join(regulations))
if opinions:
for opinion in opinions:
st.write(f"📄 {opinion}")
if opinion_urls:
for url in opinion_urls:
st.link_button("View SCCS Opinion", url)
# Source link
cosing_url = data.get("cosingUrl", "")
if cosing_url:
st.link_button("🔗 View on CosIng", cosing_url)
# Identified Ingredients (recursive)
identified = data.get("identifiedIngredient", [])
if identified:
st.divider()
# Check if it's a list of IDs or full objects
if identified and isinstance(identified[0], dict):
st.subheader(f"🔬 Related Substances ({len(identified)})")
for idx, ing in enumerate(identified):
ing_name = ing.get("commonName") or ing.get("inciName") or f"Substance {idx + 1}"
with st.expander(ing_name):
display_ingredient(ing, level=level + 1)
else:
# List of IDs only
st.subheader(f"🔬 Related Substances ({len(identified)} IDs)")
with st.expander("Show substance IDs"):
# Display in a grid
id_text = ", ".join(str(i) for i in identified[:20])
if len(identified) > 20:
id_text += f"... and {len(identified) - 20} more"
st.code(id_text)
def main():
if st.session_state.get('selected_cas', None) is None:
st.warning("Nessun CAS Number selezionato. Torna alla pagina principale per effettuare una ricerca.")
st.stop()
else:
cas_number = st.session_state.selected_cas
DATA = cosing_request(cas_number)
display_ingredient(DATA)
if __name__ == "__main__":
main()

View file

@ -1,121 +0,0 @@
import streamlit as st
import requests
import json
st.title("PubChem Data Viewer")
if st.session_state.get('selected_cas', None) is None:
st.warning("Nessun CAS Number selezionato. Torna alla pagina principale per effettuare una ricerca.")
st.stop()
else:
cas_number = st.session_state.selected_cas
# Make API request
with st.spinner("Fetching data from PubChem..."):
try:
response = requests.post(
"https://api.cosmoguard.it/api/v1/common/pubchem",
json={"cas": cas_number}
)
response.raise_for_status()
result = response.json()
except requests.exceptions.RequestException as e:
st.error(f"Error fetching data: {e}")
st.stop()
# Check if request was successful
if not result.get("success", False):
st.error(f"API Error: {result.get('error', 'Unknown error')}")
st.stop()
data = result.get("data", {})
# Display substance header
st.subheader(f"{data.get('first_pubchem_name', 'Unknown').title()}")
# Basic info container
with st.container(border=True):
col1, col2, col3 = st.columns(3)
with col1:
st.caption("CAS Number")
st.write(data.get("CAS", ""))
with col2:
st.caption("PubChem CID")
st.write(data.get("CID", ""))
with col3:
if data.get("pubchem_link"):
st.link_button("View on PubChem", data.get("pubchem_link"))
st.divider()
# Physical/Chemical Properties
st.subheader("Physical & Chemical Properties")
with st.container(border=True):
prop_col1, prop_col2, prop_col3, prop_col4 = st.columns(4)
with prop_col1:
st.metric("Molecular Weight", f"{data.get('MolecularWeight', '')} g/mol" if data.get('MolecularWeight') else "")
with prop_col2:
st.metric("XLogP", data.get('XLogP', ''))
with prop_col3:
st.metric("Exact Mass", data.get('ExactMass', ''))
with prop_col4:
st.metric("TPSA", f"{data.get('TPSA', '')} Ų" if data.get('TPSA') else "")
st.divider()
# Melting Point
melting_points = data.get("Melting Point", [])
if melting_points:
st.subheader("Melting Point")
with st.expander(f"View {len(melting_points)} reference(s)", expanded=True):
for idx, mp in enumerate(melting_points):
with st.container(border=True):
st.markdown(f"**Reference {idx + 1}**")
if mp.get("Value"):
st.info(mp.get("Value"))
if mp.get("Reference"):
st.caption(f"📚 {mp.get('Reference')}")
if mp.get("Description"):
st.caption(f" {mp.get('Description')}")
if mp.get("ReferenceNumber"):
st.caption(f"Ref #: {mp.get('ReferenceNumber')}")
# Dissociation Constants
dissociation_constants = data.get("Dissociation Constants", [])
if dissociation_constants:
st.divider()
st.subheader("Dissociation Constants (pKa)")
with st.expander(f"View {len(dissociation_constants)} reference(s)", expanded=True):
for idx, dc in enumerate(dissociation_constants):
with st.container(border=True):
st.markdown(f"**Reference {idx + 1}**")
if dc.get("Value"):
# Check if it's a dictionary or string
value = dc.get("Value")
if isinstance(value, dict):
st.code(json.dumps(value, indent=2))
else:
st.info(value)
if dc.get("Reference"):
st.caption(f"📚 {dc.get('Reference')}")
if dc.get("ReferenceNumber"):
st.caption(f"Ref #: {dc.get('ReferenceNumber')}")
# Raw JSON viewer
st.divider()
with st.expander("View Raw JSON Response"):
st.json(result)

View file

@ -1,99 +1,36 @@
import streamlit as st import streamlit as st
import json
from functions import echa_request from functions import (
echa_request,
extract_acute_values,
extract_repeated_values,
extract_tox_info_values,
)
from functions_ui import download_pdf from functions_ui import download_pdf
st.set_page_config( st.set_page_config(
page_title="ECHA Toxicological Data Viewer", page_title="ECHA Toxicological Data Viewer",
page_icon="🧊", page_icon="🧊",
layout="wide", layout="wide",
initial_sidebar_state="expanded" initial_sidebar_state="expanded",
) )
st.info("Questa pagina mostra i dati ECHA in modo più dettagliato. È consigliato però utilizzare la pagina **Ingrediente** per avere informazioni più complete sull'ingrediente.") st.info("Questa pagina mostra i dati ECHA in modo più dettagliato. È consigliato però utilizzare la pagina **Ingrediente** per avere informazioni più complete sull'ingrediente.")
if st.session_state.get('selected_cas', None) is None: if st.session_state.get("selected_cas") is None:
st.warning("Nessun CAS Number selezionato. Torna alla pagina principale per effettuare una ricerca.") st.warning("Nessun CAS Number selezionato. Torna alla pagina principale per effettuare una ricerca.")
st.stop() st.stop()
else:
cas_number = st.session_state.selected_cas
cas_number = st.session_state.selected_cas
DATA = echa_request(cas_number) DATA = echa_request(cas_number)
if 'substance' not in DATA: if "substance" not in DATA:
st.error(DATA) st.error(DATA)
st.stop() st.stop()
def extract_tox_info_values(data):
"""Extract DNEL values from toxicological information."""
rows = []
sections = data.get("toxicological_information", {}).get("sections", [])
for section in sections:
label = section.get("label", "")
if "subsections" in section:
for subsec in section["subsections"]:
effect_type = subsec.get("label", "")
if "subsections" in subsec:
for sub2 in subsec["subsections"]:
dose = sub2.get("StDose", {})
if isinstance(dose, dict) and dose.get("value"):
rows.append({
"Population/Route": label,
"Effect Type": effect_type,
"Exposure": sub2.get("label", ""),
"Assessment": sub2.get("HazardAssessment", ""),
"Value numerical": dose.get("value", ""),
"Unit": dose.get("unit", ""),
"Endpoint": sub2.get("MostSensitiveEndpoint", "")
})
return rows
def extract_acute_values(data):
"""Extract acute toxicity values."""
rows = []
sections = data.get("acute_toxicity", {}).get("sections", [])
for section in sections:
if section.get("label") == "Key value for assessment":
for subsec in section.get("subsections", []):
if subsec.get("EffectLevelValue"):
rows.append({
"Route": subsec.get("label", "").replace("Acute toxicity: ", ""),
"Endpoint": subsec.get("EffectLevelUnit", ""),
"Value": subsec.get("EffectLevelValue", ""),
"Conclusion": subsec.get("EndpointConclusion", "")
})
return rows
def extract_repeated_values(data):
"""Extract repeated dose toxicity values."""
rows = []
sections = data.get("repeated_dose_toxicity", {}).get("sections", [])
for section in sections:
if section.get("label") == "Key value for assessment":
for subsec in section.get("subsections", []):
study_type = subsec.get("label", "")
for sub2 in subsec.get("subsections", []):
if sub2.get("EffectLevelValue"):
rows.append({
"Study Type": study_type,
"Route": sub2.get("label", ""),
"Endpoint": sub2.get("EffectLevelUnit", ""),
"Value": sub2.get("EffectLevelValue", ""),
"Species": sub2.get("Species", "-")
})
return rows
# App # App
st.title("Toxicological Data Viewer") st.title("Toxicological Data Viewer")
# Substance info # Substance info
substance = DATA["substance"] substance = DATA["substance"]
dossier = DATA["dossier_info"] dossier = DATA["dossier_info"]
@ -104,16 +41,16 @@ with st.container(border=True):
col1, col2, col3, col4 = st.columns(4) col1, col2, col3, col4 = st.columns(4)
with col1: with col1:
st.caption("CAS Number") st.caption("CAS Number")
st.write(substance['rmlCas']) st.write(substance["rmlCas"])
with col2: with col2:
st.caption("EC Number") st.caption("EC Number")
st.write(substance['rmlEc']) st.write(substance["rmlEc"])
with col3: with col3:
st.caption("Status") st.caption("Status")
st.write(dossier['registrationStatus']) st.write(dossier["registrationStatus"])
with col4: with col4:
st.caption("Last Updated") st.caption("Last Updated")
st.write(dossier['lastUpdatedDate']) st.write(dossier["lastUpdatedDate"])
# Tabs # Tabs
tab1, tab2, tab3 = st.tabs(["Toxicological Information", "Acute Toxicity", "Repeated Dose Toxicity"]) tab1, tab2, tab3 = st.tabs(["Toxicological Information", "Acute Toxicity", "Repeated Dose Toxicity"])
@ -127,9 +64,9 @@ with tab1:
st.info("No DNEL values found.") st.info("No DNEL values found.")
download_pdf( download_pdf(
casNo=substance['rmlCas'], casNo=substance["rmlCas"],
origin='echa_tox_info', origin="echa_tox_info",
link=DATA['index']['toxicological_information_link'] link=DATA["index"]["toxicological_information_link"],
) )
with tab2: with tab2:
@ -141,9 +78,9 @@ with tab2:
st.info("No acute toxicity values found.") st.info("No acute toxicity values found.")
download_pdf( download_pdf(
casNo=substance['rmlCas'], casNo=substance["rmlCas"],
origin='echa_acute_tox', origin="echa_acute_tox",
link=DATA['index']['acute_toxicity_link'] link=DATA["index"]["acute_toxicity_link"],
) )
with tab3: with tab3:
@ -155,13 +92,12 @@ with tab3:
st.info("No repeated dose toxicity values found.") st.info("No repeated dose toxicity values found.")
download_pdf( download_pdf(
casNo=substance['rmlCas'], casNo=substance["rmlCas"],
origin='echa_repeated_tox', origin="echa_repeated_tox",
link=DATA['index']['repeated_dose_toxicity_link'] link=DATA["index"]["repeated_dose_toxicity_link"],
) )
# Key Information sections # Key Information sections
st.divider() st.divider()
st.subheader("Key Information") st.subheader("Key Information")

View file

@ -1,19 +1,14 @@
import streamlit as st
import requests
import pandas as pd import pandas as pd
import streamlit as st
API_BASE = "http://localhost:8000/api/v1" from functions import (
create_exposition_preset,
delete_exposition_preset,
fetch_all_presets,
)
EXPOSURE_ROUTES = ["Dermal", "Oral", "Inhalation", "Ocular"] EXPOSURE_ROUTES = ["Dermal", "Oral", "Inhalation", "Ocular"]
RITENZIONE_PRESETS = {
"Leave-on (1.0)": 1.0,
"Rinse-off (0.01)": 0.01,
"Dentifricio (0.05)": 0.05,
"Collutorio (0.10)": 0.10,
"Tintura (0.10)": 0.10,
}
st.set_page_config(page_title="Gestione Esposizione", layout="wide") st.set_page_config(page_title="Gestione Esposizione", layout="wide")
st.title("Gestione Preset Esposizione") st.title("Gestione Preset Esposizione")
@ -51,10 +46,8 @@ with tab_crea:
esp_nano = st.multiselect("Nano", EXPOSURE_ROUTES, default=["Dermal"], key="esp_nano") esp_nano = st.multiselect("Nano", EXPOSURE_ROUTES, default=["Dermal"], key="esp_nano")
st.markdown("---") st.markdown("---")
ritenzione_label = st.selectbox("Fattore di ritenzione", list(RITENZIONE_PRESETS.keys())) ritenzione = st.number_input("Fattore di ritenzione", min_value=0.01, max_value=1.00, value=1.0, step=0.01, format="%.2f")
ritenzione = RITENZIONE_PRESETS[ritenzione_label]
# Preview payload
payload = { payload = {
"preset_name": preset_name, "preset_name": preset_name,
"tipo_prodotto": tipo_prodotto, "tipo_prodotto": tipo_prodotto,
@ -73,14 +66,11 @@ with tab_crea:
if st.button("Crea Preset", type="primary", disabled=not preset_name): if st.button("Crea Preset", type="primary", disabled=not preset_name):
try: try:
resp = requests.post(f"{API_BASE}/esposition/create", json=payload, timeout=10) data = create_exposition_preset(payload)
data = resp.json()
if data.get("success"): if data.get("success"):
st.success(f"Preset '{preset_name}' creato con successo (id: {data['data']['id_preset']})") st.success(f"Preset '{preset_name}' creato con successo (id: {data['data']['id_preset']})")
else: else:
st.error(f"Errore: {data.get('error', 'Sconosciuto')}") st.error(f"Errore: {data.get('error', 'Sconosciuto')}")
except requests.ConnectionError:
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo.")
except Exception as e: except Exception as e:
st.error(f"Errore: {e}") st.error(f"Errore: {e}")
@ -97,8 +87,7 @@ with tab_lista:
st.rerun() st.rerun()
try: try:
resp = requests.get(f"{API_BASE}/esposition/presets", timeout=10) data = fetch_all_presets()
data = resp.json()
if data.get("success") and data.get("data"): if data.get("success") and data.get("data"):
st.info(f"Trovati {data['total']} preset") st.info(f"Trovati {data['total']} preset")
@ -110,7 +99,7 @@ with tab_lista:
"ritenzione", "esposizione_calcolata", "esposizione_relativa", "ritenzione", "esposizione_calcolata", "esposizione_relativa",
] ]
existing = [c for c in display_cols if c in df.columns] existing = [c for c in display_cols if c in df.columns]
st.dataframe(df[existing], use_container_width=True, hide_index=True) st.dataframe(df[existing], width="stretch", hide_index=True)
st.divider() st.divider()
st.subheader("Elimina Preset") st.subheader("Elimina Preset")
@ -124,10 +113,9 @@ with tab_lista:
with col_btn: with col_btn:
sub1, sub2 = st.columns(2) sub1, sub2 = st.columns(2)
with sub1: with sub1:
if st.button("Si", key=f"confirm_{name}", type="primary", use_container_width=True): if st.button("Si", key=f"confirm_{name}", type="primary", width="stretch"):
try: try:
del_resp = requests.delete(f"{API_BASE}/esposition/delete/{name}", timeout=10) del_data = delete_exposition_preset(name)
del_data = del_resp.json()
if del_data.get("success"): if del_data.get("success"):
st.success(f"Preset '{name}' eliminato") st.success(f"Preset '{name}' eliminato")
st.session_state.confirm_delete_preset = None st.session_state.confirm_delete_preset = None
@ -137,18 +125,16 @@ with tab_lista:
except Exception as e: except Exception as e:
st.error(f"Errore: {e}") st.error(f"Errore: {e}")
with sub2: with sub2:
if st.button("No", key=f"cancel_{name}", use_container_width=True): if st.button("No", key=f"cancel_{name}", width="stretch"):
st.session_state.confirm_delete_preset = None st.session_state.confirm_delete_preset = None
st.rerun() st.rerun()
else: else:
with col_btn: with col_btn:
if st.button("Elimina", key=f"del_{name}", use_container_width=True): if st.button("Elimina", key=f"del_{name}", width="stretch"):
st.session_state.confirm_delete_preset = name st.session_state.confirm_delete_preset = name
st.rerun() st.rerun()
else: else:
st.warning("Nessun preset trovato.") st.warning("Nessun preset trovato.")
except requests.ConnectionError:
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo.")
except Exception as e: except Exception as e:
st.error(f"Errore nel caricamento: {e}") st.error(f"Errore nel caricamento: {e}")

View file

@ -1,13 +1,9 @@
import streamlit as st
import requests
import pandas as pd import pandas as pd
import streamlit as st
from functions import cosing_download from functions import cosing_download, fetch_ingredient
from functions_ui import download_pdf from functions_ui import download_pdf
#API_BASE = "https://api.cosmoguard.it/api/v1"
API_BASE = "http://localhost:8000/api/v1"
st.set_page_config(page_title="Ricerca Ingredienti", layout="wide") st.set_page_config(page_title="Ricerca Ingredienti", layout="wide")
st.title("Ricerca Ingredienti per CAS") st.title("Ricerca Ingredienti per CAS")
@ -17,30 +13,18 @@ if "ingredient_cas" not in st.session_state:
st.session_state.ingredient_cas = "" st.session_state.ingredient_cas = ""
cas_input = st.session_state.selected_cas cas_input = st.session_state.selected_cas
force_refresh = st.session_state.force_refresh if "force_refresh" in st.session_state else False force_refresh = st.session_state.get("force_refresh", False)
if cas_input: if cas_input:
with st.spinner(f"Ricerca in corso per {cas_input}..."): with st.spinner(f"Ricerca in corso per {cas_input}..."):
try: success, result = fetch_ingredient(cas_input, force=force_refresh)
resp = requests.post( if success:
f"{API_BASE}/ingredients/search", st.session_state.ingredient_data = result
json={"cas": cas_input, "force": force_refresh},
timeout=120,
)
result = resp.json()
if result.get("success") and result.get("data"):
st.session_state.ingredient_data = result["data"]
st.session_state.ingredient_cas = cas_input st.session_state.ingredient_cas = cas_input
st.success(f"Ingrediente {cas_input} trovato") st.success(f"Ingrediente {cas_input} trovato")
else: else:
st.session_state.ingredient_data = None st.session_state.ingredient_data = None
st.error(result.get("error", f"Nessun dato per CAS {cas_input}")) st.error(result)
except requests.ConnectionError:
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo.")
except Exception as e:
st.error(f"Errore: {e}")
data = st.session_state.ingredient_data data = st.session_state.ingredient_data
@ -54,8 +38,8 @@ else:
# --- Header con INCI e data --- # --- Header con INCI e data ---
col_h1, col_h2 = st.columns(2) col_h1, col_h2 = st.columns(2)
with col_h1: with col_h1:
inci = data.get("inci") or [] inci = st.session_state.selected_inci or "Non disponibile"
st.markdown(f"**INCI:** {', '.join(inci) if inci else 'N/A'}") st.markdown(f"**INCI:** {inci if inci else 'N/A'}")
with col_h2: with col_h2:
st.markdown(f"**Data creazione:** {data.get('creation_date', 'N/A')}") st.markdown(f"**Data creazione:** {data.get('creation_date', 'N/A')}")
@ -105,18 +89,17 @@ else:
ref_no = cosing.get("reference", "") ref_no = cosing.get("reference", "")
if ref_no: if ref_no:
pdf_bytes = cosing_download(ref_no) pdf_bytes = cosing_download(ref_no)
if isinstance(pdf_bytes, bytes): if isinstance(pdf_bytes, bytes):
st.download_button( st.download_button(
label="Download CosIng PDF", label="Download CosIng PDF",
data=pdf_bytes, data=pdf_bytes,
file_name=f"{cas_input}_cosing.pdf", file_name=f"{cas_input}_cosing.pdf",
mime="application/pdf" mime="application/pdf",
key=f"download_cosing_{ref_no}",
) )
else: else:
st.error(pdf_bytes) st.error(pdf_bytes)
with col_c2: with col_c2:
annex = cosing.get("annex", []) annex = cosing.get("annex", [])
if annex: if annex:
@ -140,7 +123,7 @@ else:
if link_opinions: if link_opinions:
st.markdown("**SCCS Opinions:**") st.markdown("**SCCS Opinions:**")
for url in link_opinions: for url in link_opinions:
st.link_button("View SCCS Opinion", url) st.markdown(f"[View SCCS Opinion]({url})")
if i < len(cosing_list) - 1: if i < len(cosing_list) - 1:
st.divider() st.divider()
@ -153,21 +136,18 @@ else:
st.subheader("Tossicologia") st.subheader("Tossicologia")
tox = data.get("toxicity") tox = data.get("toxicity")
if tox: if tox:
# Best case highlight
best = tox.get("best_case") best = tox.get("best_case")
if best: if best:
st.success( st.success(
f"Miglior indicatore: **{best['indicator']}** = {best['value']} {best['unit']} " f"Miglior indicatore: **{best['indicator']}** = {best['value']} {best['unit']} "
f"({best['route']}) — Fattore: {tox.get('factor', 'N/A')}" f"({best['route']}) — Fattore: {tox.get('factor', 'N/A')}"
) )
download_pdf( download_pdf(
casNo=cas_input, casNo=cas_input,
origin=best.get("source"), origin=best.get("source"),
link=best.get("ref") link=best.get("ref"),
) )
# Indicators table
indicators = tox.get("indicators", []) indicators = tox.get("indicators", [])
if indicators: if indicators:
df = pd.DataFrame(indicators) df = pd.DataFrame(indicators)

View file

@ -1,107 +1,17 @@
import streamlit as st
import requests
import pandas as pd import pandas as pd
import streamlit as st
from datetime import datetime, date from datetime import datetime, date
API_BASE = "http://localhost:8000/api/v1" from functions import (
STATUS_MAP,
STATUS_MAP = { api_delete_order,
1: ("Ricevuto", "🔵"), api_retry_order,
2: ("Validato", "🟡"), download_excel,
3: ("Compilazione", "🟠"), download_sources,
5: ("Arricchito", "🟢"), fetch_order_detail,
6: ("Calcolo", "🔵"), fetch_orders,
8: ("Completato", ""), status_label,
9: ("Errore", "🔴"), )
}
def status_label(stato_ordine):
"""Ritorna label con emoji per lo stato."""
name, emoji = STATUS_MAP.get(stato_ordine, ("Sconosciuto", ""))
return f"{emoji} {name}"
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def fetch_orders():
"""Recupera la lista ordini dall'API."""
try:
resp = requests.get(f"{API_BASE}/orders/list", timeout=15)
data = resp.json()
if data.get("success"):
return data.get("data", [])
return []
except requests.ConnectionError:
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo su localhost:8000.")
return []
except Exception as e:
st.error(f"Errore nel caricamento degli ordini: {e}")
return []
def fetch_order_detail(id_ordine):
"""Recupera il dettaglio completo di un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/detail/{id_ordine}", timeout=15)
data = resp.json()
if data.get("success"):
return data.get("order")
st.error(data.get("detail", "Errore nel recupero dettaglio ordine"))
return None
except requests.ConnectionError:
st.error("Impossibile connettersi all'API.")
return None
except Exception as e:
st.error(f"Errore: {e}")
return None
def api_retry_order(id_ordine):
"""Chiama POST /orders/retry/{id_ordine}."""
try:
resp = requests.post(f"{API_BASE}/orders/retry/{id_ordine}", timeout=15)
return resp.json()
except Exception as e:
return {"success": False, "error": str(e)}
def api_delete_order(id_ordine):
"""Chiama DELETE /orders/{id_ordine}."""
try:
resp = requests.delete(f"{API_BASE}/orders/{id_ordine}", timeout=15)
return resp.json()
except Exception as e:
return {"success": False, "error": str(e)}
def download_excel(id_ordine):
"""Scarica il file Excel per un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/export/{id_ordine}", timeout=120)
if resp.status_code == 200:
return resp.content
st.error(f"Errore download Excel: {resp.json().get('detail', resp.status_code)}")
return None
except Exception as e:
st.error(f"Errore download: {e}")
return None
def download_sources(id_ordine):
"""Scarica lo ZIP delle fonti PDF per un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/export-sources/{id_ordine}", timeout=300)
if resp.status_code == 200:
return resp.content
st.error(f"Errore download fonti: {resp.json().get('detail', resp.status_code)}")
return None
except Exception as e:
st.error(f"Errore download: {e}")
return None
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Session state init # Session state init
@ -114,7 +24,6 @@ if "confirm_delete" not in st.session_state:
if "orders_cache" not in st.session_state: if "orders_cache" not in st.session_state:
st.session_state.orders_cache = None st.session_state.orders_cache = None
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Page config # Page config
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -149,7 +58,6 @@ with st.expander(" Come usare questa pagina"):
""") """)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Detail View # Detail View
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -157,7 +65,6 @@ with st.expander(" Come usare questa pagina"):
def show_order_detail(id_ordine): def show_order_detail(id_ordine):
"""Mostra il dettaglio di un ordine selezionato.""" """Mostra il dettaglio di un ordine selezionato."""
# Bottone per tornare alla lista
if st.button("← Torna alla lista", key="back_btn"): if st.button("← Torna alla lista", key="back_btn"):
st.session_state.selected_order_id = None st.session_state.selected_order_id = None
st.session_state.confirm_delete = False st.session_state.confirm_delete = False
@ -217,12 +124,11 @@ def show_order_detail(id_ordine):
df_display = df[display_cols].rename(columns=col_rename) df_display = df[display_cols].rename(columns=col_rename)
# Mappa booleani a Si/No
for col_name in ["Colorante", "Skip Tox"]: for col_name in ["Colorante", "Skip Tox"]:
if col_name in df_display.columns: if col_name in df_display.columns:
df_display[col_name] = df_display[col_name].map({True: "", False: "-"}) df_display[col_name] = df_display[col_name].map({True: "", False: "-"})
st.dataframe(df_display, use_container_width=True, hide_index=True) st.dataframe(df_display, width="stretch", hide_index=True)
else: else:
st.info("Nessun ingrediente disponibile") st.info("Nessun ingrediente disponibile")
@ -232,15 +138,13 @@ def show_order_detail(id_ordine):
st.subheader("Azioni") st.subheader("Azioni")
btn_cols = st.columns(5) btn_cols = st.columns(5)
# 1. Aggiorna stato
with btn_cols[0]: with btn_cols[0]:
if st.button("🔄 Aggiorna", key="refresh_btn", use_container_width=True): if st.button("🔄 Aggiorna", key="refresh_btn", width="stretch"):
st.rerun() st.rerun()
# 2. Retry (solo se ERRORE)
with btn_cols[1]: with btn_cols[1]:
if stato == 9: if stato == 9:
if st.button("🔁 Retry", key="retry_btn", type="primary", use_container_width=True): if st.button("🔁 Retry", key="retry_btn", type="primary", width="stretch"):
with st.spinner("Retry in corso..."): with st.spinner("Retry in corso..."):
result = api_retry_order(id_ordine) result = api_retry_order(id_ordine)
if result.get("success"): if result.get("success"):
@ -249,13 +153,12 @@ def show_order_detail(id_ordine):
else: else:
st.error(result.get("error") or result.get("detail", "Errore retry")) st.error(result.get("error") or result.get("detail", "Errore retry"))
else: else:
st.button("🔁 Retry", key="retry_btn_disabled", disabled=True, use_container_width=True) st.button("🔁 Retry", key="retry_btn_disabled", disabled=True, width="stretch")
# 3. Scarica Excel
with btn_cols[2]: with btn_cols[2]:
if has_project: if has_project:
excel_data = None excel_data = None
if st.button("📊 Scarica Excel", key="excel_prep_btn", use_container_width=True): if st.button("📊 Scarica Excel", key="excel_prep_btn", width="stretch"):
with st.spinner("Generazione Excel..."): with st.spinner("Generazione Excel..."):
excel_data = download_excel(id_ordine) excel_data = download_excel(id_ordine)
if excel_data: if excel_data:
@ -264,16 +167,15 @@ def show_order_detail(id_ordine):
data=excel_data, data=excel_data,
file_name=f"progetto_{id_ordine}.xlsx", file_name=f"progetto_{id_ordine}.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
key="excel_download" key="excel_download",
) )
else: else:
st.button("📊 Scarica Excel", key="excel_disabled", disabled=True, st.button("📊 Scarica Excel", key="excel_disabled", disabled=True,
use_container_width=True, help="Progetto non ancora disponibile") width="stretch", help="Progetto non ancora disponibile")
# 4. Scarica Fonti PDF
with btn_cols[3]: with btn_cols[3]:
if has_project: if has_project:
if st.button("📄 Scarica Fonti PDF", key="pdf_prep_btn", use_container_width=True): if st.button("📄 Scarica Fonti PDF", key="pdf_prep_btn", width="stretch"):
with st.spinner("Generazione PDF fonti (può richiedere tempo)..."): with st.spinner("Generazione PDF fonti (può richiedere tempo)..."):
zip_data = download_sources(id_ordine) zip_data = download_sources(id_ordine)
if zip_data: if zip_data:
@ -282,23 +184,22 @@ def show_order_detail(id_ordine):
data=zip_data, data=zip_data,
file_name=f"fonti_ordine_{id_ordine}.zip", file_name=f"fonti_ordine_{id_ordine}.zip",
mime="application/zip", mime="application/zip",
key="pdf_download" key="pdf_download",
) )
else: else:
st.button("📄 Scarica Fonti PDF", key="pdf_disabled", disabled=True, st.button("📄 Scarica Fonti PDF", key="pdf_disabled", disabled=True,
use_container_width=True, help="Progetto non ancora disponibile") width="stretch", help="Progetto non ancora disponibile")
# 5. Elimina ordine
with btn_cols[4]: with btn_cols[4]:
if not st.session_state.confirm_delete: if not st.session_state.confirm_delete:
if st.button("🗑️ Elimina", key="delete_btn", use_container_width=True): if st.button("🗑️ Elimina", key="delete_btn", width="stretch"):
st.session_state.confirm_delete = True st.session_state.confirm_delete = True
st.rerun() st.rerun()
else: else:
st.warning("Confermi l'eliminazione?") st.warning("Confermi l'eliminazione?")
sub_cols = st.columns(2) sub_cols = st.columns(2)
with sub_cols[0]: with sub_cols[0]:
if st.button("✅ Conferma", key="confirm_yes", type="primary", use_container_width=True): if st.button("✅ Conferma", key="confirm_yes", type="primary", width="stretch"):
with st.spinner("Eliminazione in corso..."): with st.spinner("Eliminazione in corso..."):
result = api_delete_order(id_ordine) result = api_delete_order(id_ordine)
if result.get("success"): if result.get("success"):
@ -310,7 +211,7 @@ def show_order_detail(id_ordine):
else: else:
st.error(result.get("detail") or result.get("error", "Errore eliminazione")) st.error(result.get("detail") or result.get("error", "Errore eliminazione"))
with sub_cols[1]: with sub_cols[1]:
if st.button("❌ Annulla", key="confirm_no", use_container_width=True): if st.button("❌ Annulla", key="confirm_no", width="stretch"):
st.session_state.confirm_delete = False st.session_state.confirm_delete = False
st.rerun() st.rerun()
@ -322,7 +223,6 @@ def show_order_detail(id_ordine):
def show_orders_list(): def show_orders_list():
"""Mostra la lista degli ordini con filtri.""" """Mostra la lista degli ordini con filtri."""
# Filtri
st.subheader("Filtri") st.subheader("Filtri")
filter_cols = st.columns([2, 2, 2, 1]) filter_cols = st.columns([2, 2, 2, 1])
@ -331,7 +231,7 @@ def show_orders_list():
"Intervallo date", "Intervallo date",
value=[], value=[],
key="date_filter", key="date_filter",
help="Filtra per data ordine" help="Filtra per data ordine",
) )
with filter_cols[1]: with filter_cols[1]:
@ -340,19 +240,17 @@ def show_orders_list():
with filter_cols[2]: with filter_cols[2]:
status_options = [f"{v[1]} {v[0]}" for v in STATUS_MAP.values()] status_options = [f"{v[1]} {v[0]}" for v in STATUS_MAP.values()]
status_keys = list(STATUS_MAP.keys())
selected_statuses = st.multiselect("Stato", status_options, key="status_filter") selected_statuses = st.multiselect("Stato", status_options, key="status_filter")
with filter_cols[3]: with filter_cols[3]:
st.write("") # spacer
st.write("") st.write("")
if st.button("🔄 Aggiorna", key="refresh_list", use_container_width=True): st.write("")
if st.button("🔄 Aggiorna", key="refresh_list", width="stretch"):
st.session_state.orders_cache = None st.session_state.orders_cache = None
st.rerun() st.rerun()
st.divider() st.divider()
# Carica ordini
if st.session_state.orders_cache is None: if st.session_state.orders_cache is None:
with st.spinner("Caricamento ordini..."): with st.spinner("Caricamento ordini..."):
st.session_state.orders_cache = fetch_orders() st.session_state.orders_cache = fetch_orders()
@ -366,7 +264,6 @@ def show_orders_list():
# Applica filtri # Applica filtri
filtered = orders filtered = orders
# Filtro data
if date_range and len(date_range) == 2: if date_range and len(date_range) == 2:
start_date, end_date = date_range start_date, end_date = date_range
filtered = [ filtered = [
@ -375,7 +272,6 @@ def show_orders_list():
start_date <= datetime.fromisoformat(o["data_ordine"]).date() <= end_date start_date <= datetime.fromisoformat(o["data_ordine"]).date() <= end_date
] ]
# Filtro cliente
if client_search: if client_search:
search_lower = client_search.lower() search_lower = client_search.lower()
filtered = [ filtered = [
@ -383,13 +279,11 @@ def show_orders_list():
if o.get("nome_cliente") and search_lower in o["nome_cliente"].lower() if o.get("nome_cliente") and search_lower in o["nome_cliente"].lower()
] ]
# Filtro stato
if selected_statuses: if selected_statuses:
selected_ids = [] selected_ids = [
for s in selected_statuses: k for k, v in STATUS_MAP.items()
for k, v in STATUS_MAP.items(): if f"{v[1]} {v[0]}" in selected_statuses
if f"{v[1]} {v[0]}" == s: ]
selected_ids.append(k)
filtered = [o for o in filtered if o.get("stato_ordine") in selected_ids] filtered = [o for o in filtered if o.get("stato_ordine") in selected_ids]
if not filtered: if not filtered:
@ -398,7 +292,6 @@ def show_orders_list():
st.caption(f"{len(filtered)} ordini trovati") st.caption(f"{len(filtered)} ordini trovati")
# Mostra lista
for order in filtered: for order in filtered:
id_ord = order.get("id_ordine") id_ord = order.get("id_ordine")
product = order.get("product_name", "N/D") or "N/D" product = order.get("product_name", "N/D") or "N/D"
@ -414,7 +307,7 @@ def show_orders_list():
col_btn, col_info = st.columns([1, 5]) col_btn, col_info = st.columns([1, 5])
with col_btn: with col_btn:
if st.button(f"#{id_ord}", key=f"order_{id_ord}", use_container_width=True): if st.button(f"#{id_ord}", key=f"order_{id_ord}", width="stretch"):
st.session_state.selected_order_id = id_ord st.session_state.selected_order_id = id_ord
st.session_state.confirm_delete = False st.session_state.confirm_delete = False
st.rerun() st.rerun()

View file

@ -1,234 +1,18 @@
import streamlit as st
import requests import requests
import pandas as pd import streamlit as st
import re
API_BASE = "http://localhost:8000/api/v1" from functions import (
API_BASE,
# CAS validation: 2-7 digits, dash, 2 digits, dash, 1 check digit _auth_headers,
CAS_PATTERN = re.compile(r"^\d{2,7}-\d{2}-\d$") build_order_payload,
create_client,
# Percentuale target e tolleranza fetch_clients,
PERCENTAGE_TARGET = 100.0 fetch_presets,
PERCENTAGE_TOLERANCE = 0.01 is_water_inci,
make_empty_ingredient_df,
# INCI che indicano acqua (case-insensitive) validate_order,
WATER_INCI = {"aqua", "water", "eau", "aqua/water", "water/aqua", "aqua/eau"}
def is_water_inci(inci_value: str) -> bool:
"""Controlla se l'INCI corrisponde ad acqua o varianti."""
if not inci_value or not isinstance(inci_value, str):
return False
return inci_value.strip().lower() in WATER_INCI
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def fetch_presets() -> list[str]:
"""Recupera i nomi dei preset di esposizione dall'API."""
try:
resp = requests.get(f"{API_BASE}/esposition/presets", timeout=10)
data = resp.json()
if data.get("success") and data.get("data"):
return [p["preset_name"] for p in data["data"]]
return []
except requests.ConnectionError:
st.error("Impossibile connettersi all'API per caricare i preset. Verifica che il server sia attivo.")
return []
except Exception as e:
st.error(f"Errore nel caricamento dei preset: {e}")
return []
def fetch_clients() -> list[dict]:
"""Recupera la lista clienti dall'API. Ritorna lista di {id_cliente, nome_cliente}."""
try:
resp = requests.get(f"{API_BASE}/ingredients/clients", timeout=10)
data = resp.json()
if data.get("success") and data.get("data"):
return data["data"]
return []
except requests.ConnectionError:
return []
except Exception:
return []
def create_client(nome_cliente: str) -> bool:
"""Crea un nuovo cliente via API. Ritorna True se riuscito."""
try:
resp = requests.post(
f"{API_BASE}/ingredients/clients",
json={"nome_cliente": nome_cliente},
timeout=10,
) )
data = resp.json() from functions_ui import display_orderData
return data.get("success", False)
except Exception:
return False
# ---------------------------------------------------------------------------
# Validation
# ---------------------------------------------------------------------------
def validate_order(
client_name: str,
product_name: str,
selected_preset: str | None,
df: pd.DataFrame,
) -> list[str]:
"""Valida tutti i campi dell'ordine. Ritorna lista di errori (vuota = valido)."""
errors = []
if not client_name or not client_name.strip():
errors.append("Il campo 'Nome del cliente' e obbligatorio.")
if not product_name or not product_name.strip():
errors.append("Il campo 'Nome del prodotto' e obbligatorio.")
if not selected_preset:
errors.append("Selezionare un preset di esposizione.")
# Normalizza colonne stringa (data_editor puo generare NaN)
str_df = df.copy()
str_df["inci"] = str_df["inci"].fillna("")
str_df["cas"] = str_df["cas"].fillna("")
# Righe attive: CAS compilato oppure percentuale > 0
active = str_df[(str_df["cas"].str.strip() != "") | (str_df["percentage"] > 0)]
if active.empty:
errors.append("Inserire almeno un ingrediente.")
return errors
# CAS format (solo righe non-colorante e non-acqua con CAS compilato)
for idx, row in active.iterrows():
cas_val = row["cas"].strip()
inci_val = row["inci"].strip()
is_col = bool(row["is_colorante"])
is_aqua = is_water_inci(inci_val)
# Se acqua, CAS puo essere vuoto
if is_aqua and cas_val == "":
continue
# Se colorante, skip validazione formato
if is_col:
continue
# CAS vuoto su riga non-acqua e non-colorante
if cas_val == "":
errors.append(f"Riga {idx + 1}: inserire un CAS number oppure selezionare 'Colorante'.")
continue
# Formato CAS
if not CAS_PATTERN.match(cas_val):
hint = f" ({inci_val})" if inci_val else ""
errors.append(
f"Formato CAS non valido alla riga {idx + 1}{hint}: '{cas_val}'. "
"Formato atteso: XX-XX-X (es. 56-81-5)."
)
# Somma percentuali
total_pct = active["percentage"].sum()
if abs(total_pct - PERCENTAGE_TARGET) > PERCENTAGE_TOLERANCE:
errors.append(
f"La somma delle percentuali e {total_pct:.6f}%, "
f"ma deve essere 100% (tolleranza +/- {PERCENTAGE_TOLERANCE}%)."
)
return errors
# ---------------------------------------------------------------------------
# Build payload
# ---------------------------------------------------------------------------
def build_order_payload(
client_name: str,
product_name: str,
preset_name: str,
df: pd.DataFrame,
) -> dict:
"""Costruisce il JSON dell'ordine a partire dai dati del form."""
str_df = df.copy()
str_df["inci"] = str_df["inci"].fillna("")
str_df["cas"] = str_df["cas"].fillna("")
active = str_df[(str_df["cas"].str.strip() != "") | (str_df["percentage"] > 0)]
ingredients = []
for _, row in active.iterrows():
inci_val = row["inci"].strip()
skip = bool(row["skip_tox"]) or is_water_inci(inci_val)
ingredients.append({
"inci": inci_val if inci_val else None,
"cas": row["cas"].strip(),
"percentage": round(float(row["percentage"]), 6),
"is_colorante": bool(row["is_colorante"]),
"skip_tox": skip,
})
return {
"client_name": client_name.strip(),
"product_name": product_name.strip(),
"preset_esposizione": preset_name,
"ingredients": ingredients,
}
# ---------------------------------------------------------------------------
# Display summary (reusable)
# ---------------------------------------------------------------------------
def display_orderData(order_data: dict):
"""Mostra un riepilogo leggibile dell'ordine (non JSON)."""
st.markdown("### Riepilogo Ordine")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Cliente", order_data["client_name"])
with col2:
st.metric("Prodotto", order_data["product_name"])
with col3:
st.metric("Preset Esposizione", order_data["preset_esposizione"])
ingredients = order_data.get("ingredients", [])
total_pct = sum(i["percentage"] for i in ingredients)
col4, col5, col6 = st.columns(3)
with col4:
st.metric("Numero ingredienti", len(ingredients))
with col5:
st.metric("Percentuale totale", f"{total_pct:.6f}%")
with col6:
n_col = sum(1 for i in ingredients if i["is_colorante"])
n_skip = sum(1 for i in ingredients if i["skip_tox"])
parts = []
if n_col > 0:
parts.append(f"{n_col} colorante/i")
if n_skip > 0:
parts.append(f"{n_skip} senza tox")
st.metric("Flag attivi", ", ".join(parts) if parts else "Nessuno")
st.markdown("**Ingredienti:**")
display_rows = []
for i in ingredients:
display_rows.append({
"INCI": i["inci"] if i["inci"] else "-",
"CAS": i["cas"] if i["cas"] else "-",
"Percentuale (%)": f"{i['percentage']:.6f}",
"Colorante": "Si" if i["is_colorante"] else "-",
"Salta Tox": "Si" if i["skip_tox"] else "-",
})
st.dataframe(pd.DataFrame(display_rows), use_container_width=True, hide_index=True)
# ===========================================================================
# PAGE
# ===========================================================================
st.set_page_config(page_title="Creazione Ordine", layout="wide") st.set_page_config(page_title="Creazione Ordine", layout="wide")
st.title("Creazione Ordine") st.title("Creazione Ordine")
@ -243,13 +27,7 @@ if "order_submitted" not in st.session_state:
if "order_result" not in st.session_state: if "order_result" not in st.session_state:
st.session_state.order_result = None st.session_state.order_result = None
if "ingredient_df" not in st.session_state: if "ingredient_df" not in st.session_state:
st.session_state.ingredient_df = pd.DataFrame({ st.session_state.ingredient_df = make_empty_ingredient_df()
"inci": [""] * 5,
"cas": [""] * 5,
"percentage": [0.0] * 5,
"is_colorante": [False] * 5,
"skip_tox": [False] * 5,
})
if "new_client_mode" not in st.session_state: if "new_client_mode" not in st.session_state:
st.session_state.new_client_mode = False st.session_state.new_client_mode = False
@ -335,7 +113,6 @@ st.subheader("Dati ordine")
col_left, col_right = st.columns(2) col_left, col_right = st.columns(2)
with col_left: with col_left:
# Client dropdown con opzione "Nuovo cliente"
dropdown_options = client_names + ["+ Nuovo cliente..."] dropdown_options = client_names + ["+ Nuovo cliente..."]
selected_client_option = st.selectbox("Nome del cliente *", options=dropdown_options) selected_client_option = st.selectbox("Nome del cliente *", options=dropdown_options)
@ -420,14 +197,12 @@ edited_df = st.data_editor(
st.session_state.ingredient_df, st.session_state.ingredient_df,
column_config=column_config, column_config=column_config,
num_rows="dynamic", num_rows="dynamic",
use_container_width=True, width="stretch",
hide_index=True, hide_index=True,
key="ingredients_editor", key="ingredients_editor",
) )
# Info per l'utente sulla rilevazione automatica AQUA/Water _aqua_count = sum(is_water_inci(v) for v in edited_df["inci"].fillna(""))
_inci_col = edited_df["inci"].fillna("")
_aqua_count = sum(is_water_inci(v) for v in _inci_col)
if _aqua_count > 0: if _aqua_count > 0:
st.info(f"{_aqua_count} ingrediente/i rilevato/i come AQUA/Water: tossicologia saltata automaticamente.") st.info(f"{_aqua_count} ingrediente/i rilevato/i come AQUA/Water: tossicologia saltata automaticamente.")
@ -457,6 +232,7 @@ if st.button("Invia Ordine", type="primary", disabled=not can_submit):
resp = requests.post( resp = requests.post(
f"{API_BASE}/orders/create", f"{API_BASE}/orders/create",
json=payload, json=payload,
headers=_auth_headers(),
timeout=30, timeout=30,
) )
result = resp.json() result = resp.json()
@ -470,7 +246,7 @@ if st.button("Invia Ordine", type="primary", disabled=not can_submit):
st.error(result.get("error") or result.get("detail", "Errore nella creazione dell'ordine")) st.error(result.get("error") or result.get("detail", "Errore nella creazione dell'ordine"))
except requests.ConnectionError: except requests.ConnectionError:
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo su localhost:8000.") st.error("Impossibile connettersi all'API. Verifica che il server sia attivo.")
except Exception as e: except Exception as e:
st.error(f"Errore: {e}") st.error(f"Errore: {e}")
@ -488,14 +264,7 @@ st.markdown("---")
if st.button("Nuovo Ordine", key="reset_order"): if st.button("Nuovo Ordine", key="reset_order"):
st.session_state.order_submitted = False st.session_state.order_submitted = False
st.session_state.order_result = None st.session_state.order_result = None
st.session_state.ingredient_df = pd.DataFrame({ st.session_state.ingredient_df = make_empty_ingredient_df()
"inci": [""] * 5,
"cas": [""] * 5,
"percentage": [0.0] * 5,
"is_colorante": [False] * 5,
"skip_tox": [False] * 5,
})
# Pulisci lo stato interno del widget data_editor
if "ingredients_editor" in st.session_state: if "ingredients_editor" in st.session_state:
del st.session_state["ingredients_editor"] del st.session_state["ingredients_editor"]
st.rerun() st.rerun()

159
pages/settings_page.py Normal file
View file

@ -0,0 +1,159 @@
import pandas as pd
import streamlit as st
from functions import (
add_tox_indicator,
delete_client,
fetch_all_ingredients,
fetch_clients,
)
st.title("Impostazioni")
tab_tox, tab_ingredienti, tab_clienti = st.tabs([
"Indicatore Tox Custom",
"Inventario Ingredienti",
"Gestione Clienti",
])
# ---------------------------------------------------------------------------
# TAB 1 — Indicatore Tox Custom
# ---------------------------------------------------------------------------
with tab_tox:
st.subheader("Aggiungi indicatore tossicologico custom")
with st.form("form_tox_indicator"):
cas = st.text_input("CAS Number", placeholder="es. 56-81-5")
col1, col2 = st.columns(2)
with col1:
indicator = st.selectbox("Indicatore", ["NOAEL", "LOAEL", "LD50"])
value = st.number_input("Valore", min_value=0.0, step=0.1)
unit = st.text_input("Unità", placeholder="es. mg/kg bw/day")
with col2:
route = st.text_input("Via di esposizione", placeholder="es. oral, dermal")
toxicity_type = st.selectbox(
"Tipo tossicità",
["", "repeated_dose_toxicity", "acute_toxicity"],
format_func=lambda x: x if x else "— non specificato —"
)
ref = st.text_input("Riferimento / Fonte", placeholder="es. Studio interno 2024")
submitted = st.form_submit_button("Aggiungi indicatore", type="primary")
if submitted:
if not cas or not unit or not route:
st.error("CAS, unità e via di esposizione sono obbligatori.")
else:
payload = {
"cas": cas,
"indicator": indicator,
"value": value,
"unit": unit,
"route": route,
"toxicity_type": toxicity_type or None,
"ref": ref or None,
}
try:
resp = add_tox_indicator(payload)
if resp.status_code == 200:
data = resp.json()
tox = data.get("data", {}).get("toxicity", {})
best = tox.get("best_case")
st.success(f"Indicatore aggiunto per CAS {cas}.")
if best:
st.info(f"Best case aggiornato: **{best['indicator']}** = {best['value']} {best['unit']} ({best['route']})")
elif resp.status_code == 404:
st.error(f"CAS {cas} non trovato in cache. Esegui prima una ricerca nella pagina Ingredienti.")
else:
st.error(f"Errore {resp.status_code}: {resp.json().get('detail', 'errore sconosciuto')}")
except Exception as e:
st.error(f"Errore: {e}")
# ---------------------------------------------------------------------------
# TAB 2 — Inventario Ingredienti
# ---------------------------------------------------------------------------
with tab_ingredienti:
st.subheader("Ingredienti nel database")
if st.button("Aggiorna", key="refresh_ingredienti"):
st.rerun()
try:
result = fetch_all_ingredients()
if result.get("success") and result.get("data"):
st.caption(f"{result['total']} ingredienti totali")
df = pd.DataFrame(result["data"])
df["dap"] = df["dap"].map({True: "OK", False: "-"})
df["cosing"] = df["cosing"].map({True: "OK", False: "-"})
df["tox"] = df["tox"].map({True: "OK", False: "-"})
df = df.rename(columns={
"cas": "CAS",
"dap": "DAP",
"cosing": "COSING",
"tox": "TOX",
"created_at": "Data Acquisizione",
})
st.dataframe(
df[["CAS", "DAP", "COSING", "TOX", "Data Acquisizione"]],
use_container_width=True,
hide_index=True,
)
else:
st.info("Nessun ingrediente trovato nel database.")
except Exception as e:
st.error(f"Errore nel caricamento: {e}")
# ---------------------------------------------------------------------------
# TAB 3 — Gestione Clienti
# ---------------------------------------------------------------------------
with tab_clienti:
st.subheader("Clienti registrati")
if st.button("Aggiorna", key="refresh_clienti"):
st.rerun()
clienti = fetch_clients()
if not clienti:
st.info("Nessun cliente trovato.")
else:
for cliente in clienti:
col_nome, col_id, col_btn = st.columns([4, 1, 1])
with col_nome:
st.write(cliente["nome_cliente"])
with col_id:
st.caption(f"id: {cliente['id_cliente']}")
with col_btn:
key = f"del_{cliente['id_cliente']}"
if st.button("Elimina", key=key, type="secondary"):
st.session_state[f"confirm_{cliente['id_cliente']}"] = True
if st.session_state.get(f"confirm_{cliente['id_cliente']}"):
nome = cliente["nome_cliente"]
st.warning(f"Confermi l'eliminazione di **{nome}**?")
col_si, col_no, _ = st.columns([1, 1, 6])
with col_si:
if st.button("Sì, elimina", key=f"yes_{cliente['id_cliente']}", type="primary"):
try:
r = delete_client(nome)
if r.status_code == 200:
st.success(f"Cliente '{nome}' eliminato.")
st.session_state.pop(f"confirm_{cliente['id_cliente']}", None)
st.rerun()
elif r.status_code == 409:
st.error(r.json().get("detail", "Il cliente ha ordini collegati."))
st.session_state.pop(f"confirm_{cliente['id_cliente']}", None)
else:
st.error(f"Errore {r.status_code}: {r.json().get('detail', '')}")
st.session_state.pop(f"confirm_{cliente['id_cliente']}", None)
except Exception as e:
st.error(f"Errore: {e}")
with col_no:
if st.button("Annulla", key=f"no_{cliente['id_cliente']}"):
st.session_state.pop(f"confirm_{cliente['id_cliente']}", None)
st.rerun()

56
pages/ticket.py Normal file
View file

@ -0,0 +1,56 @@
import streamlit as st
from functions import send_segnalazione
st.set_page_config(page_title="Segnala un problema", layout="centered")
st.title("Segnala un problema")
st.caption("Usa questo form per segnalare un errore, un dato mancante o qualsiasi problema riscontrato.")
with st.form("ticket_form", clear_on_submit=True):
page = st.selectbox(
"Pagina *",
options=["Cerca", "Ingrediente", "Nuovo PIF", "Ordini PIF", "Esposizione", "Impostazioni", "ECHA", "Altro"],
)
cas = st.text_input(
"CAS (opzionale)",
value=st.session_state.get("selected_cas") or "",
placeholder="es. 56-81-5",
help="Inserire il CAS coinvolto nel problema, se applicabile.",
)
description = st.text_area(
"Descrizione *",
placeholder="Descrivi il problema in modo chiaro e dettagliato.",
height=120,
)
error = st.text_area(
"Messaggio di errore (opzionale)",
placeholder="Incolla qui il messaggio di errore visualizzato, se presente.",
height=80,
)
priority = st.radio(
"Priorità *",
options=["bassa", "media", "alta"],
index=1,
horizontal=True,
captions=["Problema minore", "Impatta l'utilizzo", "Blocca il lavoro"],
)
submitted = st.form_submit_button("Invia segnalazione", type="primary", use_container_width=True)
if submitted:
if not description.strip():
st.error("La descrizione è obbligatoria.")
else:
ok = send_segnalazione(
page=page,
description=description.strip(),
priority=priority,
cas=cas.strip() or None,
error=error.strip() or None,
)
if ok:
st.success("Segnalazione inviata. Grazie per il feedback!")

View file

@ -6,6 +6,8 @@ readme = "README.md"
requires-python = ">=3.12" requires-python = ">=3.12"
dependencies = [ dependencies = [
"duckdb>=1.4.3", "duckdb>=1.4.3",
"marimo>=0.18.0", "extra-streamlit-components>=0.1.71",
"pandas>=2.0.0",
"requests>=2.32.5",
"streamlit>=1.51.0", "streamlit>=1.51.0",
] ]