cosmoguard-bd/src/pif_compiler/functions/db_utils.py
2026-02-22 19:44:55 +01:00

439 lines
No EOL
16 KiB
Python

import os
from urllib.parse import quote_plus
from dotenv import load_dotenv
import psycopg2
from pymongo import MongoClient
from pif_compiler.functions.common_log import get_logger
# config log and env
logger = get_logger()
load_dotenv()
def get_client():
ADMIN_USER = os.getenv("ADMIN_USER")
ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD")
MONGO_HOST = os.getenv("MONGO_HOST")
MONGO_PORT = os.getenv("MONGO_PORT")
ADMIN_PASSWORD = quote_plus(ADMIN_PASSWORD)
ADMIN_USER = quote_plus(ADMIN_USER)
client = MongoClient(
f"mongodb://{ADMIN_USER}:{ADMIN_PASSWORD}@{MONGO_HOST}:{MONGO_PORT}/?authSource=admin",
serverSelectionTimeoutMS=5000
)
return client if client else None
def db_connect(db_name : str = 'toxinfo', collection_name : str = 'substance_index') -> dict:
"""
Connect to the MongoDB database and return the specified collection.
"""
try:
client = get_client()
db = client.get_database(name=db_name)
collection = db.get_collection(collection_name)
except Exception as e:
logger.error(f"Error connecting to MongoDB: {e}")
return None
return collection
def postgres_connect():
DATABASE_URL = os.getenv("DATABASE_URL")
with psycopg2.connect(DATABASE_URL) as conn:
return conn
def insert_compilatore(nome_compilatore):
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute("INSERT INTO compilatori (nome_compilatore) VALUES (%s)", (nome_compilatore,))
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Error: {e}")
def aggiorna_stato_ordine(id_ordine, nuovo_stato):
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"UPDATE ordini SET stato_ordine = %s WHERE id_ordine = %s",
(int(nuovo_stato), id_ordine)
)
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Error updating stato ordine {id_ordine}: {e}")
def upsert_ingrediente(cas, mongo_id, dap=False, cosing=False, tox=False):
"""Inserisce o aggiorna un ingrediente nella tabella ingredienti di PostgreSQL."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute("""
INSERT INTO ingredienti (cas, mongo_id, dap, cosing, tox)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (cas) DO UPDATE SET
mongo_id = EXCLUDED.mongo_id,
dap = EXCLUDED.dap,
cosing = EXCLUDED.cosing,
tox = EXCLUDED.tox
RETURNING id;
""", (cas, mongo_id, dap, cosing, tox))
result = cur.fetchone()
conn.commit()
conn.close()
return result[0] if result else None
except Exception as e:
logger.error(f"Errore upsert ingrediente {cas}: {e}")
return None
def get_ingrediente_by_cas(cas):
"""Recupera un ingrediente dalla tabella ingredienti di PostgreSQL tramite CAS."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"SELECT id, cas, mongo_id, dap, cosing, tox FROM ingredienti WHERE cas = %s",
(cas,)
)
result = cur.fetchone()
conn.close()
return result
except Exception as e:
logger.error(f"Errore recupero ingrediente {cas}: {e}")
return None
def get_all_ingredienti():
"""Recupera tutti gli ingredienti dalla tabella ingredienti di PostgreSQL."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute("SELECT id, cas, mongo_id, dap, cosing, tox, created_at FROM ingredienti ORDER BY created_at DESC")
results = cur.fetchall()
conn.close()
return results if results else []
except Exception as e:
logger.error(f"Errore recupero ingredienti: {e}")
return []
def upsert_cliente(nome_cliente):
"""Inserisce o recupera un cliente. Ritorna id_cliente."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"INSERT INTO clienti (nome_cliente) VALUES (%s) ON CONFLICT (nome_cliente) DO NOTHING",
(nome_cliente,)
)
conn.commit()
cur.execute("SELECT id_cliente FROM clienti WHERE nome_cliente = %s", (nome_cliente,))
result = cur.fetchone()
conn.close()
return result[0] if result else None
except Exception as e:
logger.error(f"Errore upsert cliente {nome_cliente}: {e}")
return None
def upsert_compilatore(nome_compilatore):
"""Inserisce o recupera un compilatore. Ritorna id_compilatore."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"INSERT INTO compilatori (nome_compilatore) VALUES (%s) ON CONFLICT (nome_compilatore) DO NOTHING",
(nome_compilatore,)
)
conn.commit()
cur.execute("SELECT id_compilatore FROM compilatori WHERE nome_compilatore = %s", (nome_compilatore,))
result = cur.fetchone()
conn.close()
return result[0] if result else None
except Exception as e:
logger.error(f"Errore upsert compilatore {nome_compilatore}: {e}")
return None
def get_all_clienti():
"""Recupera tutti i clienti dalla tabella clienti."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute("SELECT id_cliente, nome_cliente FROM clienti ORDER BY nome_cliente")
results = cur.fetchall()
conn.close()
return results if results else []
except Exception as e:
logger.error(f"Errore recupero clienti: {e}")
return []
def log_ricerche(cas, target, esito):
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute("INSERT INTO logs.search_history (cas_ricercato, target, esito) VALUES (%s, %s, %s)", (cas, target, esito))
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Error: {e}")
return
def insert_ordine(uuid_ordine, id_cliente=None):
"""Inserisce un nuovo ordine nella tabella ordini. Ritorna id_ordine."""
from datetime import datetime as dt
from pif_compiler.classes.models import StatoOrdine
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO ordini (uuid_ordine, id_cliente, data_ordine, stato_ordine)
VALUES (%s, %s, %s, %s) RETURNING id_ordine;""",
(uuid_ordine, id_cliente, dt.now(), int(StatoOrdine.RICEVUTO))
)
result = cur.fetchone()
conn.commit()
conn.close()
return result[0] if result else None
except Exception as e:
logger.error(f"Errore inserimento ordine: {e}")
return None
def get_oldest_pending_order():
"""Recupera l'ordine più vecchio con stato_ordine = RICEVUTO (1)."""
from pif_compiler.classes.models import StatoOrdine
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"""SELECT id_ordine, id_cliente, id_compilatore, uuid_ordine,
uuid_progetto, data_ordine, stato_ordine, note
FROM ordini
WHERE stato_ordine = %s
ORDER BY data_ordine ASC
LIMIT 1""",
(int(StatoOrdine.RICEVUTO),)
)
result = cur.fetchone()
conn.close()
return result
except Exception as e:
logger.error(f"Errore recupero ordine pendente: {e}")
return None
def update_ordine_cliente(id_ordine, id_cliente):
"""Aggiorna id_cliente sull'ordine."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"UPDATE ordini SET id_cliente = %s WHERE id_ordine = %s",
(id_cliente, id_ordine)
)
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Errore aggiornamento cliente ordine {id_ordine}: {e}")
def update_ordine_progetto(id_ordine, uuid_progetto):
"""Aggiorna uuid_progetto sull'ordine."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"UPDATE ordini SET uuid_progetto = %s WHERE id_ordine = %s",
(uuid_progetto, id_ordine)
)
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Errore aggiornamento progetto ordine {id_ordine}: {e}")
def update_ordine_note(id_ordine, note):
"""Aggiorna il campo note sull'ordine."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"UPDATE ordini SET note = %s WHERE id_ordine = %s",
(note, id_ordine)
)
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Errore aggiornamento note ordine {id_ordine}: {e}")
def get_ordine_by_id(id_ordine):
"""Recupera un ordine per id_ordine."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"""SELECT id_ordine, id_cliente, id_compilatore, uuid_ordine,
uuid_progetto, data_ordine, stato_ordine, note
FROM ordini WHERE id_ordine = %s""",
(id_ordine,)
)
result = cur.fetchone()
conn.close()
return result
except Exception as e:
logger.error(f"Errore recupero ordine {id_ordine}: {e}")
return None
def reset_ordine_per_retry(id_ordine):
"""Resetta un ordine in stato ERRORE a RICEVUTO, pulisce note e uuid_progetto."""
from pif_compiler.classes.models import StatoOrdine
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"""UPDATE ordini
SET stato_ordine = %s, note = NULL, uuid_progetto = NULL
WHERE id_ordine = %s AND stato_ordine = %s
RETURNING id_ordine;""",
(int(StatoOrdine.RICEVUTO), id_ordine, int(StatoOrdine.ERRORE))
)
result = cur.fetchone()
conn.commit()
conn.close()
return result[0] if result else None
except Exception as e:
logger.error(f"Errore reset ordine {id_ordine}: {e}")
return None
def get_preset_id_by_name(preset_name):
"""Recupera l'id_preset dalla tabella tipi_prodotti per nome."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"SELECT id_preset FROM tipi_prodotti WHERE preset_name = %s",
(preset_name,)
)
result = cur.fetchone()
conn.close()
return result[0] if result else None
except Exception as e:
logger.error(f"Errore recupero id preset '{preset_name}': {e}")
return None
def insert_progetto(mongo_id, id_preset):
"""Inserisce un nuovo progetto nella tabella progetti. Ritorna id."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO progetti (mongo_id, preset_tipo_prodotto)
VALUES (%s, %s) RETURNING id;""",
(mongo_id, id_preset)
)
result = cur.fetchone()
conn.commit()
conn.close()
return result[0] if result else None
except Exception as e:
logger.error(f"Errore inserimento progetto: {e}")
return None
def insert_ingredient_lineage(id_progetto, id_ingrediente):
"""Inserisce la relazione progetto-ingrediente nella tabella ingredients_lineage."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO ingredients_lineage (id_progetto, id_ingrediente)
VALUES (%s, %s);""",
(id_progetto, id_ingrediente)
)
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Errore inserimento lineage progetto={id_progetto}, ingrediente={id_ingrediente}: {e}")
def get_all_ordini():
"""Recupera tutti gli ordini con JOIN a clienti, compilatori, stati_ordini."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute("""
SELECT o.id_ordine, o.uuid_ordine, o.uuid_progetto, o.data_ordine,
o.stato_ordine, o.note, c.nome_cliente, comp.nome_compilatore, s.nome_stato
FROM ordini o
LEFT JOIN clienti c ON o.id_cliente = c.id_cliente
LEFT JOIN compilatori comp ON o.id_compilatore = comp.id_compilatore
LEFT JOIN stati_ordini s ON o.stato_ordine = s.id_stato
ORDER BY o.data_ordine DESC
""")
results = cur.fetchall()
conn.close()
return results if results else []
except Exception as e:
logger.error(f"Errore recupero ordini: {e}")
return []
def delete_ordine(id_ordine):
"""Elimina un ordine e dati correlati da PostgreSQL e MongoDB."""
try:
row = get_ordine_by_id(id_ordine)
if not row:
logger.warning(f"Ordine {id_ordine} non trovato per eliminazione")
return False
uuid_ordine = row[3]
uuid_progetto = row[4]
conn = postgres_connect()
with conn.cursor() as cur:
if uuid_progetto:
cur.execute("SELECT id FROM progetti WHERE mongo_id = %s", (uuid_progetto,))
prog_row = cur.fetchone()
if prog_row:
cur.execute("DELETE FROM ingredients_lineage WHERE id_progetto = %s", (prog_row[0],))
cur.execute("DELETE FROM progetti WHERE id = %s", (prog_row[0],))
cur.execute("DELETE FROM ordini WHERE id_ordine = %s", (id_ordine,))
conn.commit()
conn.close()
from bson import ObjectId
orders_col = db_connect(collection_name='orders')
if uuid_ordine:
try:
orders_col.delete_one({"_id": ObjectId(uuid_ordine)})
except Exception:
logger.warning(f"Documento MongoDB ordine non trovato: {uuid_ordine}")
if uuid_progetto:
projects_col = db_connect(collection_name='projects')
try:
projects_col.delete_one({"_id": ObjectId(uuid_progetto)})
except Exception:
logger.warning(f"Documento MongoDB progetto non trovato: {uuid_progetto}")
logger.info(f"Ordine {id_ordine} eliminato completamente")
return True
except Exception as e:
logger.error(f"Errore eliminazione ordine {id_ordine}: {e}")
return False
def get_ingrediente_id_by_cas(cas):
"""Recupera l'ID PostgreSQL di un ingrediente tramite CAS."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute("SELECT id FROM ingredienti WHERE cas = %s", (cas,))
result = cur.fetchone()
conn.close()
return result[0] if result else None
except Exception as e:
logger.error(f"Errore recupero id ingrediente {cas}: {e}")
return None
if __name__ == "__main__":
log_ricerche("123-45-6", "ECHA", True)