490 lines
No EOL
17 KiB
Python
490 lines
No EOL
17 KiB
Python
import os
|
|
from urllib.parse import quote_plus
|
|
|
|
from dotenv import load_dotenv
|
|
import psycopg2
|
|
from pymongo import MongoClient
|
|
|
|
from pif_compiler.functions.common_log import get_logger
|
|
|
|
# config log and env
|
|
logger = get_logger()
|
|
load_dotenv()
|
|
|
|
def get_client():
|
|
ADMIN_USER = os.getenv("ADMIN_USER")
|
|
ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD")
|
|
MONGO_HOST = os.getenv("MONGO_HOST")
|
|
MONGO_PORT = os.getenv("MONGO_PORT")
|
|
|
|
ADMIN_PASSWORD = quote_plus(ADMIN_PASSWORD)
|
|
ADMIN_USER = quote_plus(ADMIN_USER)
|
|
|
|
client = MongoClient(
|
|
f"mongodb://{ADMIN_USER}:{ADMIN_PASSWORD}@{MONGO_HOST}:{MONGO_PORT}/?authSource=admin",
|
|
serverSelectionTimeoutMS=5000
|
|
)
|
|
|
|
return client if client else None
|
|
|
|
def db_connect(db_name : str = 'toxinfo', collection_name : str = 'substance_index') -> dict:
|
|
"""
|
|
Connect to the MongoDB database and return the specified collection.
|
|
"""
|
|
try:
|
|
client = get_client()
|
|
db = client.get_database(name=db_name)
|
|
collection = db.get_collection(collection_name)
|
|
except Exception as e:
|
|
logger.error(f"Error connecting to MongoDB: {e}")
|
|
return None
|
|
|
|
return collection
|
|
|
|
def postgres_connect():
|
|
DATABASE_URL = os.getenv("DATABASE_URL")
|
|
with psycopg2.connect(DATABASE_URL) as conn:
|
|
return conn
|
|
|
|
def insert_compilatore(nome_compilatore):
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute("INSERT INTO compilatori (nome_compilatore) VALUES (%s)", (nome_compilatore,))
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
|
|
def aggiorna_stato_ordine(id_ordine, nuovo_stato):
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE ordini SET stato_ordine = %s WHERE id_ordine = %s",
|
|
(int(nuovo_stato), id_ordine)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
logger.error(f"Error updating stato ordine {id_ordine}: {e}")
|
|
|
|
def upsert_ingrediente(cas, mongo_id, dap=False, cosing=False, tox=False):
|
|
"""Inserisce o aggiorna un ingrediente nella tabella ingredienti di PostgreSQL."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
INSERT INTO ingredienti (cas, mongo_id, dap, cosing, tox)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
ON CONFLICT (cas) DO UPDATE SET
|
|
mongo_id = EXCLUDED.mongo_id,
|
|
dap = EXCLUDED.dap,
|
|
cosing = EXCLUDED.cosing,
|
|
tox = EXCLUDED.tox
|
|
RETURNING id;
|
|
""", (cas, mongo_id, dap, cosing, tox))
|
|
result = cur.fetchone()
|
|
conn.commit()
|
|
conn.close()
|
|
return result[0] if result else None
|
|
except Exception as e:
|
|
logger.error(f"Errore upsert ingrediente {cas}: {e}")
|
|
return None
|
|
|
|
def get_ingrediente_by_cas(cas):
|
|
"""Recupera un ingrediente dalla tabella ingredienti di PostgreSQL tramite CAS."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT id, cas, mongo_id, dap, cosing, tox FROM ingredienti WHERE cas = %s",
|
|
(cas,)
|
|
)
|
|
result = cur.fetchone()
|
|
conn.close()
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Errore recupero ingrediente {cas}: {e}")
|
|
return None
|
|
|
|
def get_all_ingredienti():
|
|
"""Recupera tutti gli ingredienti dalla tabella ingredienti di PostgreSQL."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT id, cas, mongo_id, dap, cosing, tox, created_at FROM ingredienti ORDER BY created_at DESC")
|
|
results = cur.fetchall()
|
|
conn.close()
|
|
return results if results else []
|
|
except Exception as e:
|
|
logger.error(f"Errore recupero ingredienti: {e}")
|
|
return []
|
|
|
|
def upsert_cliente(nome_cliente):
|
|
"""Inserisce o recupera un cliente. Ritorna id_cliente."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"INSERT INTO clienti (nome_cliente) VALUES (%s) ON CONFLICT (nome_cliente) DO NOTHING",
|
|
(nome_cliente,)
|
|
)
|
|
conn.commit()
|
|
cur.execute("SELECT id_cliente FROM clienti WHERE nome_cliente = %s", (nome_cliente,))
|
|
result = cur.fetchone()
|
|
conn.close()
|
|
return result[0] if result else None
|
|
except Exception as e:
|
|
logger.error(f"Errore upsert cliente {nome_cliente}: {e}")
|
|
return None
|
|
|
|
def upsert_compilatore(nome_compilatore):
|
|
"""Inserisce o recupera un compilatore. Ritorna id_compilatore."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"INSERT INTO compilatori (nome_compilatore) VALUES (%s) ON CONFLICT (nome_compilatore) DO NOTHING",
|
|
(nome_compilatore,)
|
|
)
|
|
conn.commit()
|
|
cur.execute("SELECT id_compilatore FROM compilatori WHERE nome_compilatore = %s", (nome_compilatore,))
|
|
result = cur.fetchone()
|
|
conn.close()
|
|
return result[0] if result else None
|
|
except Exception as e:
|
|
logger.error(f"Errore upsert compilatore {nome_compilatore}: {e}")
|
|
return None
|
|
|
|
def get_all_clienti():
|
|
"""Recupera tutti i clienti dalla tabella clienti."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT id_cliente, nome_cliente FROM clienti ORDER BY nome_cliente")
|
|
results = cur.fetchall()
|
|
conn.close()
|
|
return results if results else []
|
|
except Exception as e:
|
|
logger.error(f"Errore recupero clienti: {e}")
|
|
return []
|
|
|
|
def get_all_compilatori():
|
|
"""Recupera tutti i compilatori dalla tabella compilatori."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT id_compilatore, nome_compilatore FROM compilatori ORDER BY nome_compilatore")
|
|
results = cur.fetchall()
|
|
conn.close()
|
|
return results if results else []
|
|
except Exception as e:
|
|
logger.error(f"Errore recupero compilatori: {e}")
|
|
return []
|
|
|
|
|
|
def delete_cliente(nome_cliente: str) -> bool:
|
|
"""Elimina un cliente per nome. Ritorna None se ha ordini collegati."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT COUNT(*) FROM ordini o JOIN clienti c ON o.id_cliente = c.id_cliente WHERE c.nome_cliente = %s",
|
|
(nome_cliente,)
|
|
)
|
|
count = cur.fetchone()[0]
|
|
if count > 0:
|
|
conn.close()
|
|
return None # segnale: cliente ha ordini, non eliminabile
|
|
cur.execute("DELETE FROM clienti WHERE nome_cliente = %s RETURNING id_cliente", (nome_cliente,))
|
|
deleted = cur.fetchone()
|
|
conn.commit()
|
|
conn.close()
|
|
return deleted is not None
|
|
except Exception as e:
|
|
logger.error(f"Errore eliminazione cliente '{nome_cliente}': {e}")
|
|
return False
|
|
|
|
def log_ricerche(cas, target, esito):
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute("INSERT INTO logs.search_history (cas_ricercato, target, esito) VALUES (%s, %s, %s)", (cas, target, esito))
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return
|
|
|
|
def insert_ordine(uuid_ordine, id_cliente=None, id_compilatore=None):
|
|
"""Inserisce un nuovo ordine nella tabella ordini. Ritorna id_ordine."""
|
|
from datetime import datetime as dt
|
|
from pif_compiler.classes.models import StatoOrdine
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""INSERT INTO ordini (uuid_ordine, id_cliente, id_compilatore, data_ordine, stato_ordine)
|
|
VALUES (%s, %s, %s, %s, %s) RETURNING id_ordine;""",
|
|
(uuid_ordine, id_cliente, id_compilatore, dt.now(), int(StatoOrdine.RICEVUTO))
|
|
)
|
|
result = cur.fetchone()
|
|
conn.commit()
|
|
conn.close()
|
|
return result[0] if result else None
|
|
except Exception as e:
|
|
logger.error(f"Errore inserimento ordine: {e}")
|
|
return None
|
|
|
|
def get_oldest_pending_order():
|
|
"""Recupera l'ordine più vecchio con stato_ordine = RICEVUTO (1)."""
|
|
from pif_compiler.classes.models import StatoOrdine
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT id_ordine, id_cliente, id_compilatore, uuid_ordine,
|
|
uuid_progetto, data_ordine, stato_ordine, note
|
|
FROM ordini
|
|
WHERE stato_ordine = %s
|
|
ORDER BY data_ordine ASC
|
|
LIMIT 1""",
|
|
(int(StatoOrdine.RICEVUTO),)
|
|
)
|
|
result = cur.fetchone()
|
|
conn.close()
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Errore recupero ordine pendente: {e}")
|
|
return None
|
|
|
|
def update_ordine_compilatore(id_ordine, id_compilatore):
|
|
"""Aggiorna id_compilatore sull'ordine."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE ordini SET id_compilatore = %s WHERE id_ordine = %s",
|
|
(id_compilatore, id_ordine)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
logger.error(f"Errore aggiornamento compilatore ordine {id_ordine}: {e}")
|
|
|
|
|
|
def update_ordine_cliente(id_ordine, id_cliente):
|
|
"""Aggiorna id_cliente sull'ordine."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE ordini SET id_cliente = %s WHERE id_ordine = %s",
|
|
(id_cliente, id_ordine)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
logger.error(f"Errore aggiornamento cliente ordine {id_ordine}: {e}")
|
|
|
|
def update_ordine_progetto(id_ordine, uuid_progetto):
|
|
"""Aggiorna uuid_progetto sull'ordine."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE ordini SET uuid_progetto = %s WHERE id_ordine = %s",
|
|
(uuid_progetto, id_ordine)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
logger.error(f"Errore aggiornamento progetto ordine {id_ordine}: {e}")
|
|
|
|
def update_ordine_note(id_ordine, note):
|
|
"""Aggiorna il campo note sull'ordine."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE ordini SET note = %s WHERE id_ordine = %s",
|
|
(note, id_ordine)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
logger.error(f"Errore aggiornamento note ordine {id_ordine}: {e}")
|
|
|
|
def get_ordine_by_id(id_ordine):
|
|
"""Recupera un ordine per id_ordine."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT id_ordine, id_cliente, id_compilatore, uuid_ordine,
|
|
uuid_progetto, data_ordine, stato_ordine, note
|
|
FROM ordini WHERE id_ordine = %s""",
|
|
(id_ordine,)
|
|
)
|
|
result = cur.fetchone()
|
|
conn.close()
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Errore recupero ordine {id_ordine}: {e}")
|
|
return None
|
|
|
|
def reset_ordine_per_retry(id_ordine):
|
|
"""Resetta un ordine in stato ERRORE a RICEVUTO, pulisce note e uuid_progetto."""
|
|
from pif_compiler.classes.models import StatoOrdine
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""UPDATE ordini
|
|
SET stato_ordine = %s, note = NULL, uuid_progetto = NULL
|
|
WHERE id_ordine = %s AND stato_ordine = %s
|
|
RETURNING id_ordine;""",
|
|
(int(StatoOrdine.RICEVUTO), id_ordine, int(StatoOrdine.ERRORE))
|
|
)
|
|
result = cur.fetchone()
|
|
conn.commit()
|
|
conn.close()
|
|
return result[0] if result else None
|
|
except Exception as e:
|
|
logger.error(f"Errore reset ordine {id_ordine}: {e}")
|
|
return None
|
|
|
|
def get_preset_id_by_name(preset_name):
|
|
"""Recupera l'id_preset dalla tabella tipi_prodotti per nome."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT id_preset FROM tipi_prodotti WHERE preset_name = %s",
|
|
(preset_name,)
|
|
)
|
|
result = cur.fetchone()
|
|
conn.close()
|
|
return result[0] if result else None
|
|
except Exception as e:
|
|
logger.error(f"Errore recupero id preset '{preset_name}': {e}")
|
|
return None
|
|
|
|
def insert_progetto(mongo_id, id_preset):
|
|
"""Inserisce un nuovo progetto nella tabella progetti. Ritorna id."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""INSERT INTO progetti (mongo_id, preset_tipo_prodotto)
|
|
VALUES (%s, %s) RETURNING id;""",
|
|
(mongo_id, id_preset)
|
|
)
|
|
result = cur.fetchone()
|
|
conn.commit()
|
|
conn.close()
|
|
return result[0] if result else None
|
|
except Exception as e:
|
|
logger.error(f"Errore inserimento progetto: {e}")
|
|
return None
|
|
|
|
def insert_ingredient_lineage(id_progetto, id_ingrediente):
|
|
"""Inserisce la relazione progetto-ingrediente nella tabella ingredients_lineage."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""INSERT INTO ingredients_lineage (id_progetto, id_ingrediente)
|
|
VALUES (%s, %s);""",
|
|
(id_progetto, id_ingrediente)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as e:
|
|
logger.error(f"Errore inserimento lineage progetto={id_progetto}, ingrediente={id_ingrediente}: {e}")
|
|
|
|
def get_all_ordini():
|
|
"""Recupera tutti gli ordini con JOIN a clienti, compilatori, stati_ordini."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT o.id_ordine, o.uuid_ordine, o.uuid_progetto, o.data_ordine,
|
|
o.stato_ordine, o.note, c.nome_cliente, comp.nome_compilatore, s.nome_stato
|
|
FROM ordini o
|
|
LEFT JOIN clienti c ON o.id_cliente = c.id_cliente
|
|
LEFT JOIN compilatori comp ON o.id_compilatore = comp.id_compilatore
|
|
LEFT JOIN stati_ordini s ON o.stato_ordine = s.id_stato
|
|
ORDER BY o.data_ordine DESC
|
|
""")
|
|
results = cur.fetchall()
|
|
conn.close()
|
|
return results if results else []
|
|
except Exception as e:
|
|
logger.error(f"Errore recupero ordini: {e}")
|
|
return []
|
|
|
|
|
|
def delete_ordine(id_ordine):
|
|
"""Elimina un ordine e dati correlati da PostgreSQL e MongoDB."""
|
|
try:
|
|
row = get_ordine_by_id(id_ordine)
|
|
if not row:
|
|
logger.warning(f"Ordine {id_ordine} non trovato per eliminazione")
|
|
return False
|
|
|
|
uuid_ordine = row[3]
|
|
uuid_progetto = row[4]
|
|
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
if uuid_progetto:
|
|
cur.execute("SELECT id FROM progetti WHERE mongo_id = %s", (uuid_progetto,))
|
|
prog_row = cur.fetchone()
|
|
if prog_row:
|
|
cur.execute("DELETE FROM ingredients_lineage WHERE id_progetto = %s", (prog_row[0],))
|
|
cur.execute("DELETE FROM progetti WHERE id = %s", (prog_row[0],))
|
|
|
|
cur.execute("DELETE FROM ordini WHERE id_ordine = %s", (id_ordine,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
from bson import ObjectId
|
|
orders_col = db_connect(collection_name='orders')
|
|
if uuid_ordine:
|
|
try:
|
|
orders_col.delete_one({"_id": ObjectId(uuid_ordine)})
|
|
except Exception:
|
|
logger.warning(f"Documento MongoDB ordine non trovato: {uuid_ordine}")
|
|
|
|
if uuid_progetto:
|
|
projects_col = db_connect(collection_name='projects')
|
|
try:
|
|
projects_col.delete_one({"_id": ObjectId(uuid_progetto)})
|
|
except Exception:
|
|
logger.warning(f"Documento MongoDB progetto non trovato: {uuid_progetto}")
|
|
|
|
logger.info(f"Ordine {id_ordine} eliminato completamente")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Errore eliminazione ordine {id_ordine}: {e}")
|
|
return False
|
|
|
|
|
|
def get_ingrediente_id_by_cas(cas):
|
|
"""Recupera l'ID PostgreSQL di un ingrediente tramite CAS."""
|
|
try:
|
|
conn = postgres_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT id FROM ingredienti WHERE cas = %s", (cas,))
|
|
result = cur.fetchone()
|
|
conn.close()
|
|
return result[0] if result else None
|
|
except Exception as e:
|
|
logger.error(f"Errore recupero id ingrediente {cas}: {e}")
|
|
return None
|
|
|
|
|
|
if __name__ == "__main__":
|
|
log_ricerche("123-45-6", "ECHA", True) |