lmb-fe/pages/list_orders.py
adish-rmr 584de757bb fix
2026-02-22 19:44:27 +01:00

439 lines
16 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import streamlit as st
import requests
import pandas as pd
from datetime import datetime, date
API_BASE = "https://api.cosmoguard.it/api/v1"
STATUS_MAP = {
1: ("Ricevuto", "🔵"),
2: ("Validato", "🟡"),
3: ("Compilazione", "🟠"),
5: ("Arricchito", "🟢"),
6: ("Calcolo", "🔵"),
8: ("Completato", ""),
9: ("Errore", "🔴"),
}
def status_label(stato_ordine):
"""Ritorna label con emoji per lo stato."""
name, emoji = STATUS_MAP.get(stato_ordine, ("Sconosciuto", ""))
return f"{emoji} {name}"
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def fetch_orders():
"""Recupera la lista ordini dall'API."""
try:
resp = requests.get(f"{API_BASE}/orders/list", timeout=15)
data = resp.json()
if data.get("success"):
return data.get("data", [])
return []
except requests.ConnectionError:
st.error("Impossibile connettersi all'API. Verifica che il server sia attivo su localhost:8000.")
return []
except Exception as e:
st.error(f"Errore nel caricamento degli ordini: {e}")
return []
def fetch_order_detail(id_ordine):
"""Recupera il dettaglio completo di un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/detail/{id_ordine}", timeout=15)
data = resp.json()
if data.get("success"):
return data.get("order")
st.error(data.get("detail", "Errore nel recupero dettaglio ordine"))
return None
except requests.ConnectionError:
st.error("Impossibile connettersi all'API.")
return None
except Exception as e:
st.error(f"Errore: {e}")
return None
def api_retry_order(id_ordine):
"""Chiama POST /orders/retry/{id_ordine}."""
try:
resp = requests.post(f"{API_BASE}/orders/retry/{id_ordine}", timeout=15)
return resp.json()
except Exception as e:
return {"success": False, "error": str(e)}
def api_delete_order(id_ordine):
"""Chiama DELETE /orders/{id_ordine}."""
try:
resp = requests.delete(f"{API_BASE}/orders/{id_ordine}", timeout=15)
return resp.json()
except Exception as e:
return {"success": False, "error": str(e)}
def download_excel(id_ordine):
"""Scarica il file Excel per un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/export/{id_ordine}", timeout=120)
if resp.status_code == 200:
return resp.content
st.error(f"Errore download Excel: {resp.json().get('detail', resp.status_code)}")
return None
except Exception as e:
st.error(f"Errore download: {e}")
return None
def download_sources(id_ordine):
"""Scarica lo ZIP delle fonti PDF per un ordine."""
try:
resp = requests.get(f"{API_BASE}/orders/export-sources/{id_ordine}", timeout=300)
if resp.status_code == 200:
return resp.content
st.error(f"Errore download fonti: {resp.json().get('detail', resp.status_code)}")
return None
except Exception as e:
st.error(f"Errore download: {e}")
return None
# ---------------------------------------------------------------------------
# Session state init
# ---------------------------------------------------------------------------
if "selected_order_id" not in st.session_state:
st.session_state.selected_order_id = None
if "confirm_delete" not in st.session_state:
st.session_state.confirm_delete = False
if "orders_cache" not in st.session_state:
st.session_state.orders_cache = None
# ---------------------------------------------------------------------------
# Page config
# ---------------------------------------------------------------------------
st.set_page_config(page_title="Gestione Ordini", page_icon="📋", layout="wide")
st.title("📋 Gestione Ordini")
with st.expander(" Come usare questa pagina"):
st.markdown("""
Questa pagina mostra tutti gli **ordini PIF** effettuati e il loro stato di avanzamento.
#### Cosa puoi fare
- **Visualizzare** la lista di tutti gli ordini con prodotto, cliente, data e stato
- **Aprire un ordine** cliccando sul suo numero (`#ID`) per vedere il dettaglio completo, inclusa la **lista QQ** degli ingredienti
- **Scaricare l'Excel** contenente il SED, il calcolo del MoS e la tabella QQ
- **Scaricare le Fonti PDF** — uno ZIP con tutti i PDF di ECHA e CosIng relativi all'ordine
- **Eliminare** un ordine dal dettaglio (richiede conferma)
#### Stati degli ordini
| Stato | Significato |
|---|---|
| 🔵 Ricevuto | Ordine appena creato, in attesa di elaborazione |
| 🟡 Validato | Dati validati, pronto per la compilazione |
| 🟠 Compilazione | Elaborazione in corso |
| 🟢 Arricchito | Dati arricchiti, pronto per il calcolo |
| 🔵 Calcolo | Calcolo MoS in corso |
| ✅ Completato | Ordine completato, Excel e PDF disponibili |
| 🔴 Errore | Errore durante l'elaborazione — usa **Retry** per riprovare |
#### Filtri
Usa i filtri in cima alla lista per restringere la ricerca per **data**, **cliente** o **stato**.
""")
# ---------------------------------------------------------------------------
# Detail View
# ---------------------------------------------------------------------------
def show_order_detail(id_ordine):
"""Mostra il dettaglio di un ordine selezionato."""
# Bottone per tornare alla lista
if st.button("← Torna alla lista", key="back_btn"):
st.session_state.selected_order_id = None
st.session_state.confirm_delete = False
st.rerun()
st.divider()
with st.spinner("Caricamento dettaglio ordine..."):
order = fetch_order_detail(id_ordine)
if not order:
st.error(f"Impossibile caricare l'ordine {id_ordine}")
return
stato = order.get("stato_ordine", 0)
nome_stato = order.get("nome_stato", "Sconosciuto")
_, emoji = STATUS_MAP.get(stato, ("Sconosciuto", ""))
has_project = order.get("uuid_progetto") is not None
# Header
st.header(f"{order.get('product_name', 'N/D')} — #{id_ordine}")
st.markdown(f"**Stato:** {emoji} {nome_stato}")
# Metriche
col1, col2, col3, col4 = st.columns(4)
col1.metric("Cliente", order.get("nome_cliente", "N/D"))
col2.metric("Compilatore", order.get("nome_compilatore", "N/D") or "N/D")
col3.metric("Preset", order.get("preset_esposizione", "N/D") or "N/D")
col4.metric("Data", order.get("data_ordine", "N/D")[:10] if order.get("data_ordine") else "N/D")
# Note / Log
note = order.get("note")
if note:
with st.expander("📝 Note / Log", expanded=(stato == 9)):
if stato == 9:
st.error(note)
else:
st.info(note)
# Ingredienti
st.subheader("Ingredienti")
ingredients = order.get("ingredients", [])
if ingredients:
df = pd.DataFrame(ingredients)
display_cols = []
col_rename = {}
for col_name in ["inci", "cas", "percentage", "is_colorante", "skip_tox"]:
if col_name in df.columns:
display_cols.append(col_name)
col_rename[col_name] = {
"inci": "INCI",
"cas": "CAS",
"percentage": "Percentuale (%)",
"is_colorante": "Colorante",
"skip_tox": "Skip Tox",
}.get(col_name, col_name)
df_display = df[display_cols].rename(columns=col_rename)
# Mappa booleani a Si/No
for col_name in ["Colorante", "Skip Tox"]:
if col_name in df_display.columns:
df_display[col_name] = df_display[col_name].map({True: "", False: "-"})
st.dataframe(df_display, width="stretch", hide_index=True)
else:
st.info("Nessun ingrediente disponibile")
st.divider()
# Action buttons
st.subheader("Azioni")
btn_cols = st.columns(5)
# 1. Aggiorna stato
with btn_cols[0]:
if st.button("🔄 Aggiorna", key="refresh_btn", width="stretch"):
st.rerun()
# 2. Retry (solo se ERRORE)
with btn_cols[1]:
if stato == 9:
if st.button("🔁 Retry", key="retry_btn", type="primary", width="stretch"):
with st.spinner("Retry in corso..."):
result = api_retry_order(id_ordine)
if result.get("success"):
st.success(result.get("message", "Retry avviato"))
st.rerun()
else:
st.error(result.get("error") or result.get("detail", "Errore retry"))
else:
st.button("🔁 Retry", key="retry_btn_disabled", disabled=True, width="stretch")
# 3. Scarica Excel
with btn_cols[2]:
if has_project:
excel_data = None
if st.button("📊 Scarica Excel", key="excel_prep_btn", width="stretch"):
with st.spinner("Generazione Excel..."):
excel_data = download_excel(id_ordine)
if excel_data:
st.download_button(
label="💾 Salva Excel",
data=excel_data,
file_name=f"progetto_{id_ordine}.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
key="excel_download"
)
else:
st.button("📊 Scarica Excel", key="excel_disabled", disabled=True,
width="stretch", help="Progetto non ancora disponibile")
# 4. Scarica Fonti PDF
with btn_cols[3]:
if has_project:
if st.button("📄 Scarica Fonti PDF", key="pdf_prep_btn", width="stretch"):
with st.spinner("Generazione PDF fonti (può richiedere tempo)..."):
zip_data = download_sources(id_ordine)
if zip_data:
st.download_button(
label="💾 Salva ZIP",
data=zip_data,
file_name=f"fonti_ordine_{id_ordine}.zip",
mime="application/zip",
key="pdf_download"
)
else:
st.button("📄 Scarica Fonti PDF", key="pdf_disabled", disabled=True,
width="stretch", help="Progetto non ancora disponibile")
# 5. Elimina ordine
with btn_cols[4]:
if not st.session_state.confirm_delete:
if st.button("🗑️ Elimina", key="delete_btn", width="stretch"):
st.session_state.confirm_delete = True
st.rerun()
else:
st.warning("Confermi l'eliminazione?")
sub_cols = st.columns(2)
with sub_cols[0]:
if st.button("✅ Conferma", key="confirm_yes", type="primary", width="stretch"):
with st.spinner("Eliminazione in corso..."):
result = api_delete_order(id_ordine)
if result.get("success"):
st.success("Ordine eliminato")
st.session_state.selected_order_id = None
st.session_state.confirm_delete = False
st.session_state.orders_cache = None
st.rerun()
else:
st.error(result.get("detail") or result.get("error", "Errore eliminazione"))
with sub_cols[1]:
if st.button("❌ Annulla", key="confirm_no", width="stretch"):
st.session_state.confirm_delete = False
st.rerun()
# ---------------------------------------------------------------------------
# List View
# ---------------------------------------------------------------------------
def show_orders_list():
"""Mostra la lista degli ordini con filtri."""
# Filtri
st.subheader("Filtri")
filter_cols = st.columns([2, 2, 2, 1])
with filter_cols[0]:
date_range = st.date_input(
"Intervallo date",
value=[],
key="date_filter",
help="Filtra per data ordine"
)
with filter_cols[1]:
client_search = st.text_input("Cerca cliente", key="client_filter",
placeholder="Nome cliente...")
with filter_cols[2]:
status_options = [f"{v[1]} {v[0]}" for v in STATUS_MAP.values()]
status_keys = list(STATUS_MAP.keys())
selected_statuses = st.multiselect("Stato", status_options, key="status_filter")
with filter_cols[3]:
st.write("") # spacer
st.write("")
if st.button("🔄 Aggiorna", key="refresh_list", width="stretch"):
st.session_state.orders_cache = None
st.rerun()
st.divider()
# Carica ordini
if st.session_state.orders_cache is None:
with st.spinner("Caricamento ordini..."):
st.session_state.orders_cache = fetch_orders()
orders = st.session_state.orders_cache
if not orders:
st.info("Nessun ordine trovato")
return
# Applica filtri
filtered = orders
# Filtro data
if date_range and len(date_range) == 2:
start_date, end_date = date_range
filtered = [
o for o in filtered
if o.get("data_ordine") and
start_date <= datetime.fromisoformat(o["data_ordine"]).date() <= end_date
]
# Filtro cliente
if client_search:
search_lower = client_search.lower()
filtered = [
o for o in filtered
if o.get("nome_cliente") and search_lower in o["nome_cliente"].lower()
]
# Filtro stato
if selected_statuses:
selected_ids = []
for s in selected_statuses:
for k, v in STATUS_MAP.items():
if f"{v[1]} {v[0]}" == s:
selected_ids.append(k)
filtered = [o for o in filtered if o.get("stato_ordine") in selected_ids]
if not filtered:
st.warning("Nessun ordine corrisponde ai filtri selezionati")
return
st.caption(f"{len(filtered)} ordini trovati")
# Mostra lista
for order in filtered:
id_ord = order.get("id_ordine")
product = order.get("product_name", "N/D") or "N/D"
client = order.get("nome_cliente", "N/D") or "N/D"
stato = order.get("stato_ordine", 0)
data = order.get("data_ordine", "")
note = order.get("note", "")
data_display = data[:10] if data else "N/D"
stato_display = status_label(stato)
note_preview = (note[:60] + "...") if note and len(note) > 60 else (note or "")
col_btn, col_info = st.columns([1, 5])
with col_btn:
if st.button(f"#{id_ord}", key=f"order_{id_ord}", width="stretch"):
st.session_state.selected_order_id = id_ord
st.session_state.confirm_delete = False
st.rerun()
with col_info:
info_cols = st.columns([3, 2, 1.5, 1.5, 3])
info_cols[0].write(f"**{product}**")
info_cols[1].write(client)
info_cols[2].write(data_display)
info_cols[3].write(stato_display)
if note_preview:
info_cols[4].caption(note_preview)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if st.session_state.selected_order_id is not None:
show_order_detail(st.session_state.selected_order_id)
else:
show_orders_list()