631 lines
22 KiB
Python
631 lines
22 KiB
Python
import re
|
|
import time
|
|
from typing import Any, Dict, Optional
|
|
|
|
import extra_streamlit_components as stx
|
|
import pandas as pd
|
|
import requests
|
|
import streamlit as st
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
API_BASE = "http://localhost:8000/api/v1"
|
|
AUTH_BASE = "http://localhost:8000/api/v1"
|
|
|
|
CAS_PATTERN = re.compile(r"^\d{2,7}-\d{2}-\d$")
|
|
PERCENTAGE_TARGET = 100.0
|
|
PERCENTAGE_TOLERANCE = 0.01
|
|
|
|
WATER_INCI = {"aqua", "water", "eau", "aqua/water", "water/aqua", "aqua/eau", "acqua"}
|
|
|
|
STATUS_MAP = {
|
|
1: ("Ricevuto", "🔵"),
|
|
2: ("Validato", "🟡"),
|
|
3: ("Compilazione", "🟠"),
|
|
5: ("Arricchito", "🟢"),
|
|
6: ("Calcolo", "🔵"),
|
|
8: ("Completato", "✅"),
|
|
9: ("Errore", "🔴"),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# General utilities
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def is_water_inci(inci_value: str) -> bool:
|
|
"""Controlla se l'INCI corrisponde ad acqua o varianti."""
|
|
if not inci_value or not isinstance(inci_value, str):
|
|
return False
|
|
return inci_value.strip().lower() in WATER_INCI
|
|
|
|
|
|
def status_label(stato_ordine: int) -> str:
|
|
"""Ritorna label con emoji per lo stato."""
|
|
name, emoji = STATUS_MAP.get(stato_ordine, ("Sconosciuto", "⚪"))
|
|
return f"{emoji} {name}"
|
|
|
|
|
|
def make_empty_ingredient_df(n: int = 5) -> pd.DataFrame:
|
|
"""Crea un DataFrame vuoto per la tabella ingredienti."""
|
|
return pd.DataFrame({
|
|
"inci": [""] * n,
|
|
"cas": [""] * n,
|
|
"percentage": [0.0] * n,
|
|
"is_colorante": [False] * n,
|
|
"skip_tox": [False] * n,
|
|
})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cookie
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_COOKIE_RT = "pif_rt"
|
|
_COOKIE_MAX_AGE = 7 * 24 * 3600 # 7 giorni in secondi
|
|
|
|
|
|
def get_cookie_manager() -> stx.CookieManager:
|
|
return stx.CookieManager(key="pif_cookies")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def do_login(email: str, password: str) -> bool:
|
|
"""Chiama l'endpoint di login e salva i token in session_state. Ritorna True se ok."""
|
|
try:
|
|
resp = requests.post(
|
|
f"{AUTH_BASE}/auth/login",
|
|
json={"email": email, "password": password},
|
|
timeout=10,
|
|
)
|
|
except requests.RequestException as e:
|
|
st.error(f"Errore di rete: {e}")
|
|
return False
|
|
|
|
if resp.status_code == 401:
|
|
st.error("Credenziali non valide.")
|
|
return False
|
|
if resp.status_code != 200:
|
|
st.error(f"Errore del server ({resp.status_code}). Riprova più tardi.")
|
|
return False
|
|
|
|
data = resp.json()
|
|
st.session_state["access_token"] = data["access_token"]
|
|
st.session_state["refresh_token"] = data["refresh_token"]
|
|
st.session_state["expires_at"] = time.time() + data["expires_in"] - 30
|
|
get_cookie_manager().set(_COOKIE_RT, data["refresh_token"], max_age=_COOKIE_MAX_AGE, key="set_rt_login")
|
|
_fetch_user_info()
|
|
return True
|
|
|
|
|
|
def do_refresh() -> bool:
|
|
"""Rinnova l'access_token. Ritorna False se il refresh fallisce (forza re-login)."""
|
|
try:
|
|
resp = requests.post(
|
|
f"{AUTH_BASE}/auth/refresh",
|
|
json={"refresh_token": st.session_state.get("refresh_token", "")},
|
|
timeout=10,
|
|
)
|
|
except requests.RequestException:
|
|
return False
|
|
|
|
if resp.status_code != 200:
|
|
return False
|
|
|
|
data = resp.json()
|
|
st.session_state["access_token"] = data["access_token"]
|
|
st.session_state["refresh_token"] = data["refresh_token"]
|
|
st.session_state["expires_at"] = time.time() + data["expires_in"] - 30
|
|
get_cookie_manager().set(_COOKIE_RT, data["refresh_token"], max_age=_COOKIE_MAX_AGE, key="set_rt_refresh")
|
|
return True
|
|
|
|
|
|
def do_logout():
|
|
"""Chiama il logout sull'API e pulisce la session_state."""
|
|
refresh_token = st.session_state.get("refresh_token", "")
|
|
if refresh_token:
|
|
try:
|
|
requests.post(
|
|
f"{AUTH_BASE}/auth/logout",
|
|
json={"refresh_token": refresh_token},
|
|
timeout=5,
|
|
)
|
|
except requests.RequestException:
|
|
pass
|
|
for key in ["access_token", "refresh_token", "expires_at", "user_name", "user_email", "user_id"]:
|
|
st.session_state.pop(key, None)
|
|
get_cookie_manager().delete(_COOKIE_RT, key="del_rt_logout")
|
|
|
|
|
|
def check_auth() -> bool:
|
|
"""Ritorna True se l'utente è autenticato e il token è valido.
|
|
Se il token è assente dalla session_state tenta il restore dal cookie.
|
|
Se il token è scaduto tenta il refresh."""
|
|
if "access_token" in st.session_state:
|
|
if time.time() < st.session_state.get("expires_at", 0):
|
|
return True
|
|
if do_refresh():
|
|
return True
|
|
do_logout()
|
|
return False
|
|
|
|
# Nessun token in memoria: prova il restore dal cookie
|
|
cm = get_cookie_manager()
|
|
rt = cm.get(_COOKIE_RT)
|
|
if rt:
|
|
st.session_state["refresh_token"] = rt
|
|
if do_refresh():
|
|
_fetch_user_info()
|
|
return True
|
|
# Cookie scaduto o invalidato: rimuovilo
|
|
cm.delete(_COOKIE_RT, key="del_rt_expired")
|
|
|
|
return False
|
|
|
|
|
|
def _auth_headers() -> dict:
|
|
"""Ritorna gli headers Authorization con il Bearer token corrente."""
|
|
token = st.session_state.get("access_token", "")
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
def _fetch_user_info() -> None:
|
|
"""Chiama /auth/me e salva email e nome in session_state."""
|
|
try:
|
|
resp = requests.get(f"{AUTH_BASE}/auth/me", headers=_auth_headers(), timeout=5)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
st.session_state["user_email"] = data.get("email")
|
|
st.session_state["user_name"] = data.get("name")
|
|
st.session_state["user_id"] = data.get("id")
|
|
except requests.RequestException:
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Order validation and building
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def validate_order(
|
|
client_name: str,
|
|
product_name: str,
|
|
selected_preset: str | None,
|
|
df: pd.DataFrame,
|
|
) -> list[str]:
|
|
"""Valida tutti i campi dell'ordine. Ritorna lista di errori (vuota = valido)."""
|
|
errors = []
|
|
|
|
if not client_name or not client_name.strip():
|
|
errors.append("Il campo 'Nome del cliente' e obbligatorio.")
|
|
if not product_name or not product_name.strip():
|
|
errors.append("Il campo 'Nome del prodotto' e obbligatorio.")
|
|
if not selected_preset:
|
|
errors.append("Selezionare un preset di esposizione.")
|
|
|
|
str_df = df.copy()
|
|
str_df["inci"] = str_df["inci"].fillna("")
|
|
str_df["cas"] = str_df["cas"].fillna("")
|
|
|
|
active = str_df[(str_df["cas"].str.strip() != "") | (str_df["percentage"] > 0)]
|
|
|
|
if active.empty:
|
|
errors.append("Inserire almeno un ingrediente.")
|
|
return errors
|
|
|
|
for idx, row in active.iterrows():
|
|
cas_val = row["cas"].strip()
|
|
inci_val = row["inci"].strip()
|
|
is_col = bool(row["is_colorante"])
|
|
|
|
if is_water_inci(inci_val) and cas_val == "":
|
|
continue
|
|
if is_col:
|
|
continue
|
|
if cas_val == "":
|
|
errors.append(f"Riga {idx + 1}: inserire un CAS number oppure selezionare 'Colorante'.")
|
|
continue
|
|
if not CAS_PATTERN.match(cas_val):
|
|
hint = f" ({inci_val})" if inci_val else ""
|
|
errors.append(
|
|
f"Formato CAS non valido alla riga {idx + 1}{hint}: '{cas_val}'. "
|
|
"Formato atteso: XX-XX-X (es. 56-81-5)."
|
|
)
|
|
|
|
total_pct = active["percentage"].sum()
|
|
if abs(total_pct - PERCENTAGE_TARGET) > PERCENTAGE_TOLERANCE:
|
|
errors.append(
|
|
f"La somma delle percentuali e {total_pct:.6f}%, "
|
|
f"ma deve essere 100% (tolleranza +/- {PERCENTAGE_TOLERANCE}%)."
|
|
)
|
|
|
|
return errors
|
|
|
|
|
|
def build_order_payload(
|
|
client_name: str,
|
|
product_name: str,
|
|
preset_name: str,
|
|
df: pd.DataFrame,
|
|
) -> dict:
|
|
"""Costruisce il JSON dell'ordine a partire dai dati del form."""
|
|
str_df = df.copy()
|
|
str_df["inci"] = str_df["inci"].fillna("")
|
|
str_df["cas"] = str_df["cas"].fillna("")
|
|
|
|
active = str_df[(str_df["cas"].str.strip() != "") | (str_df["percentage"] > 0)]
|
|
|
|
ingredients = []
|
|
for _, row in active.iterrows():
|
|
inci_val = row["inci"].strip()
|
|
skip = bool(row["skip_tox"]) or is_water_inci(inci_val)
|
|
ingredients.append({
|
|
"inci": inci_val if inci_val else None,
|
|
"cas": row["cas"].strip(),
|
|
"percentage": round(float(row["percentage"]), 6),
|
|
"is_colorante": bool(row["is_colorante"]),
|
|
"skip_tox": skip,
|
|
})
|
|
|
|
return {
|
|
"client_name": client_name.strip(),
|
|
"product_name": product_name.strip(),
|
|
"preset_esposizione": preset_name,
|
|
"ingredients": ingredients,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Ingredients API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def fetch_ingredient(cas: str, force: bool = False) -> tuple[bool, Any]:
|
|
"""Cerca un ingrediente per CAS. Ritorna (success, data) oppure (False, error_message)."""
|
|
try:
|
|
resp = requests.post(
|
|
f"{API_BASE}/ingredients/search",
|
|
json={"cas": cas, "force": force},
|
|
headers=_auth_headers(),
|
|
timeout=120,
|
|
)
|
|
result = resp.json()
|
|
if result.get("success") and result.get("data"):
|
|
return True, result["data"]
|
|
return False, result.get("error", f"Nessun dato per CAS {cas}")
|
|
except requests.ConnectionError:
|
|
return False, "Impossibile connettersi all'API. Verifica che il server sia attivo."
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
def add_tox_indicator(payload: dict) -> requests.Response:
|
|
"""Aggiunge un indicatore tossicologico custom."""
|
|
return requests.post(f"{API_BASE}/ingredients/add-tox-indicator", json=payload, headers=_auth_headers(), timeout=30)
|
|
|
|
|
|
def fetch_all_ingredients() -> dict:
|
|
"""Recupera tutti gli ingredienti dal database. Ritorna il JSON grezzo."""
|
|
resp = requests.get(f"{API_BASE}/ingredients/list", headers=_auth_headers(), timeout=10)
|
|
return resp.json()
|
|
|
|
|
|
def fetch_clients() -> list[dict]:
|
|
"""Recupera la lista clienti. Ritorna lista di {id_cliente, nome_cliente}."""
|
|
try:
|
|
resp = requests.get(f"{API_BASE}/ingredients/clients", headers=_auth_headers(), timeout=10)
|
|
data = resp.json()
|
|
if data.get("success") and data.get("data"):
|
|
return data["data"]
|
|
return []
|
|
except requests.ConnectionError:
|
|
return []
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def create_client(nome_cliente: str) -> bool:
|
|
"""Crea un nuovo cliente via API. Ritorna True se riuscito."""
|
|
try:
|
|
resp = requests.post(
|
|
f"{API_BASE}/ingredients/clients",
|
|
json={"nome_cliente": nome_cliente},
|
|
headers=_auth_headers(),
|
|
timeout=10,
|
|
)
|
|
return resp.json().get("success", False)
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def delete_client(nome: str) -> requests.Response:
|
|
"""Elimina un cliente per nome. Ritorna la Response grezza."""
|
|
return requests.delete(f"{API_BASE}/ingredients/clients/{nome}", headers=_auth_headers(), timeout=10)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Esposizione / Presets API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def fetch_presets() -> list[str]:
|
|
"""Recupera i nomi dei preset di esposizione dall'API."""
|
|
try:
|
|
resp = requests.get(f"{API_BASE}/esposition/presets", headers=_auth_headers(), timeout=10)
|
|
data = resp.json()
|
|
if data.get("success") and data.get("data"):
|
|
return [p["preset_name"] for p in data["data"]]
|
|
return []
|
|
except requests.ConnectionError:
|
|
st.error("Impossibile connettersi all'API per caricare i preset. Verifica che il server sia attivo.")
|
|
return []
|
|
except Exception as e:
|
|
st.error(f"Errore nel caricamento dei preset: {e}")
|
|
return []
|
|
|
|
|
|
def fetch_all_presets() -> dict:
|
|
"""Recupera tutti i preset con dati completi. Ritorna il JSON grezzo."""
|
|
resp = requests.get(f"{API_BASE}/esposition/presets", headers=_auth_headers(), timeout=10)
|
|
return resp.json()
|
|
|
|
|
|
def create_exposition_preset(payload: dict) -> dict:
|
|
"""Crea un nuovo preset di esposizione. Ritorna il JSON della risposta."""
|
|
resp = requests.post(f"{API_BASE}/esposition/create", json=payload, headers=_auth_headers(), timeout=10)
|
|
return resp.json()
|
|
|
|
|
|
def delete_exposition_preset(name: str) -> dict:
|
|
"""Elimina un preset di esposizione per nome. Ritorna il JSON della risposta."""
|
|
resp = requests.delete(f"{API_BASE}/esposition/delete/{name}", headers=_auth_headers(), timeout=10)
|
|
return resp.json()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Orders API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def fetch_orders() -> list:
|
|
"""Recupera la lista ordini dall'API."""
|
|
try:
|
|
resp = requests.get(f"{API_BASE}/orders/list", headers=_auth_headers(), timeout=15)
|
|
data = resp.json()
|
|
if data.get("success"):
|
|
return data.get("data", [])
|
|
return []
|
|
except requests.ConnectionError:
|
|
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo.")
|
|
return []
|
|
except Exception as e:
|
|
st.error(f"Errore nel caricamento degli ordini: {e}")
|
|
return []
|
|
|
|
|
|
def fetch_order_detail(id_ordine: int) -> Optional[dict]:
|
|
"""Recupera il dettaglio completo di un ordine."""
|
|
try:
|
|
resp = requests.get(f"{API_BASE}/orders/detail/{id_ordine}", headers=_auth_headers(), timeout=15)
|
|
data = resp.json()
|
|
if data.get("success"):
|
|
return data.get("order")
|
|
st.error(data.get("detail", "Errore nel recupero dettaglio ordine"))
|
|
return None
|
|
except requests.ConnectionError:
|
|
st.error("Impossibile connettersi all'API.")
|
|
return None
|
|
except Exception as e:
|
|
st.error(f"Errore: {e}")
|
|
return None
|
|
|
|
|
|
def api_retry_order(id_ordine: int) -> dict:
|
|
"""Chiama POST /orders/retry/{id_ordine}."""
|
|
try:
|
|
resp = requests.post(f"{API_BASE}/orders/retry/{id_ordine}", headers=_auth_headers(), timeout=15)
|
|
return resp.json()
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
def api_delete_order(id_ordine: int) -> dict:
|
|
"""Chiama DELETE /orders/{id_ordine}."""
|
|
try:
|
|
resp = requests.delete(f"{API_BASE}/orders/{id_ordine}", headers=_auth_headers(), timeout=15)
|
|
return resp.json()
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
def download_excel(id_ordine: int) -> Optional[bytes]:
|
|
"""Scarica il file Excel per un ordine."""
|
|
try:
|
|
resp = requests.get(f"{API_BASE}/orders/export/{id_ordine}", headers=_auth_headers(), timeout=120)
|
|
if resp.status_code == 200:
|
|
return resp.content
|
|
st.error(f"Errore download Excel: {resp.json().get('detail', resp.status_code)}")
|
|
return None
|
|
except Exception as e:
|
|
st.error(f"Errore download: {e}")
|
|
return None
|
|
|
|
|
|
def download_sources(id_ordine: int) -> Optional[bytes]:
|
|
"""Scarica lo ZIP delle fonti PDF per un ordine."""
|
|
try:
|
|
resp = requests.get(f"{API_BASE}/orders/export-sources/{id_ordine}", headers=_auth_headers(), timeout=300)
|
|
if resp.status_code == 200:
|
|
return resp.content
|
|
st.error(f"Errore download fonti: {resp.json().get('detail', resp.status_code)}")
|
|
return None
|
|
except Exception as e:
|
|
st.error(f"Errore download: {e}")
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Segnalazioni (ticket)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def send_segnalazione(
|
|
page: str,
|
|
description: str,
|
|
priority: str,
|
|
cas: Optional[str] = None,
|
|
error: Optional[str] = None,
|
|
) -> bool:
|
|
"""Invia una segnalazione all'API. Ritorna True se salvata con successo."""
|
|
payload = {
|
|
"page": page,
|
|
"description": description,
|
|
"priority": priority,
|
|
}
|
|
if cas:
|
|
payload["cas"] = cas
|
|
if error:
|
|
payload["error"] = error
|
|
try:
|
|
resp = requests.post(
|
|
f"{API_BASE}/common/segnalazione",
|
|
json=payload,
|
|
headers=_auth_headers(),
|
|
timeout=10,
|
|
)
|
|
if resp.status_code == 200 and resp.json().get("success"):
|
|
return True
|
|
st.error(resp.json().get("detail", "Errore nell'invio della segnalazione"))
|
|
return False
|
|
except Exception as e:
|
|
st.error(f"Errore di connessione: {e}")
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ECHA data extraction (for echa.py page)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def extract_tox_info_values(data: dict) -> list:
|
|
"""Extract DNEL values from toxicological information."""
|
|
rows = []
|
|
sections = data.get("toxicological_information", {}).get("sections", [])
|
|
for section in sections:
|
|
label = section.get("label", "")
|
|
if "subsections" in section:
|
|
for subsec in section["subsections"]:
|
|
effect_type = subsec.get("label", "")
|
|
if "subsections" in subsec:
|
|
for sub2 in subsec["subsections"]:
|
|
dose = sub2.get("StDose", {})
|
|
if isinstance(dose, dict) and dose.get("value"):
|
|
rows.append({
|
|
"Population/Route": label,
|
|
"Effect Type": effect_type,
|
|
"Exposure": sub2.get("label", ""),
|
|
"Assessment": sub2.get("HazardAssessment", ""),
|
|
"Value numerical": dose.get("value", ""),
|
|
"Unit": dose.get("unit", ""),
|
|
"Endpoint": sub2.get("MostSensitiveEndpoint", "")
|
|
})
|
|
return rows
|
|
|
|
|
|
def extract_acute_values(data: dict) -> list:
|
|
"""Extract acute toxicity values."""
|
|
rows = []
|
|
sections = data.get("acute_toxicity", {}).get("sections", [])
|
|
for section in sections:
|
|
if section.get("label") == "Key value for assessment":
|
|
for subsec in section.get("subsections", []):
|
|
if subsec.get("EffectLevelValue"):
|
|
rows.append({
|
|
"Route": subsec.get("label", "").replace("Acute toxicity: ", ""),
|
|
"Endpoint": subsec.get("EffectLevelUnit", ""),
|
|
"Value": subsec.get("EffectLevelValue", ""),
|
|
"Conclusion": subsec.get("EndpointConclusion", "")
|
|
})
|
|
return rows
|
|
|
|
|
|
def extract_repeated_values(data: dict) -> list:
|
|
"""Extract repeated dose toxicity values."""
|
|
rows = []
|
|
sections = data.get("repeated_dose_toxicity", {}).get("sections", [])
|
|
for section in sections:
|
|
if section.get("label") == "Key value for assessment":
|
|
for subsec in section.get("subsections", []):
|
|
study_type = subsec.get("label", "")
|
|
for sub2 in subsec.get("subsections", []):
|
|
if sub2.get("EffectLevelValue"):
|
|
rows.append({
|
|
"Study Type": study_type,
|
|
"Route": sub2.get("label", ""),
|
|
"Endpoint": sub2.get("EffectLevelUnit", ""),
|
|
"Value": sub2.get("EffectLevelValue", ""),
|
|
"Species": sub2.get("Species", "-")
|
|
})
|
|
return rows
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Legacy / original API functions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def echa_request(cas_num: str) -> Dict[str, Any]:
|
|
"""Recupera i dati ECHA per un numero CAS."""
|
|
response = requests.post(f"{API_BASE}/echa/search", json={"cas": cas_num}, headers=_auth_headers())
|
|
data = response.json()
|
|
return data["data"] if data["success"] else data["error"]
|
|
|
|
|
|
def cosing_request(cas_num: str) -> Dict[str, Any]:
|
|
"""Recupera i dati COSING per un numero CAS."""
|
|
response = requests.post(
|
|
f"{API_BASE}/cosing/search",
|
|
json={"full": True, "mode": "cas", "text": cas_num},
|
|
headers=_auth_headers(),
|
|
)
|
|
data = response.json()
|
|
return data["data"] if data["success"] else data["error"]
|
|
|
|
|
|
def generate_pdf_download(cas, origin, link):
|
|
"""Genera e scarica un PDF tramite l'API."""
|
|
name = f"{cas}_{origin}"
|
|
if link is not None:
|
|
response = requests.post(
|
|
f"{API_BASE}/common/generate-pdf",
|
|
json={"link": link, "name": name},
|
|
headers=_auth_headers(),
|
|
)
|
|
data = response.json()
|
|
else:
|
|
data = {"success": False, "error": "No dossier exists for this origin."}
|
|
|
|
if data["success"]:
|
|
response = requests.get(f"{API_BASE}/common/download-pdf/{name}", headers=_auth_headers())
|
|
response.raise_for_status()
|
|
return response.content
|
|
else:
|
|
return data["error"]
|
|
|
|
|
|
def cosing_download(ref_no: str):
|
|
"""Scarica il PDF ufficiale COSING per un numero di riferimento."""
|
|
url = f"https://api.tech.ec.europa.eu/cosing20/1.0/api/cosmetics/{ref_no}/export-pdf"
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0",
|
|
"Accept": "application/json, text/plain, */*",
|
|
"Accept-Language": "it-IT,it;q=0.9",
|
|
"Cache-Control": "No-Cache",
|
|
"Origin": "https://ec.europa.eu",
|
|
"Referer": "https://ec.europa.eu/",
|
|
"Sec-Fetch-Dest": "empty",
|
|
"Sec-Fetch-Mode": "cors",
|
|
"Sec-Fetch-Site": "same-site",
|
|
}
|
|
response = requests.get(url, headers=headers)
|
|
if response.status_code == 200:
|
|
return response.content
|
|
return f"Error: {response.status_code} - {response.text}"
|