Importare dati in FalkorDB da MariaDB con Python

Mattepuffo's logo
Importare dati in FalkorDB da MariaDB con Python

Importare dati in FalkorDB da MariaDB con Python

FalkorDB è un graph database compatibile con Neo4j.

In questo articolo vediamo come importare i dati prendendoli da MariaDB usando Python.

Io ho installato FalkorDB in Docker:

docker run -p 6379:6379 -p 3000:3000 -v $(pwd)/falkordb_data:/var/lib/falkordb/data --name falkor-test -d falkordb/falkordb:latest

Usando pip installiamo queste due librerie:

pip install mysql-connector-python falkordb

Qui sotto il codice:

import mysql.connector
from falkordb import FalkorDB

MARIADB_CONFIG = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "9211",
    "database": "Sql592686_5",
}

FALKOR_CONFIG = {
    "host": "localhost",
    "port": 6379,
}

GRAPH_NAME = "movimenti_graph"

MARIADB_QUERY = """
SELECT mov_tipo,
       mov_valore,
       YEAR(mov_data) AS anno,
       causale_nome,
       conto_nome,
       conto_iban
FROM movimenti
         INNER JOIN causali ON causale_id = mov_causale_fk
         INNER JOIN conti ON conto_id = mov_conto_fk;
"""

def fetch_from_mariadb():
    print("Connessione a MariaDB...")

    conn = mysql.connector.connect(**MARIADB_CONFIG)
    cursor = conn.cursor(dictionary=True)
    cursor.execute(MARIADB_QUERY)
    rows = cursor.fetchall()
    cursor.close()
    conn.close()

    print(f"Recuperati {len(rows)} record da MariaDB")

    return rows

def load_into_falkor(rows: list[dict]):
    print("Connessione a FalkorDB...")

    db = FalkorDB(**FALKOR_CONFIG)
    graph = db.select_graph(GRAPH_NAME)

    try:
        graph.delete()
        print("Grafo precedente eliminato")
    except Exception:
        pass

    graph = db.select_graph(GRAPH_NAME)

    print("Caricamento nodi e relazioni...")

    for row in rows:
        causale_nome = row["causale_nome"].replace("'", "\'")
        conto_nome = row["conto_nome"].replace("'", "\'")
        conto_iban = row["conto_iban"].replace("'", "\'") if row["conto_iban"] else ""
        mov_tipo = row["mov_tipo"].replace("'", "\'") if row["mov_tipo"] else ""
        mov_valore = float(row["mov_valore"])
        anno = int(row["anno"])

        graph.query(f"""
            MERGE (c:Causale {{nome: '{causale_nome}'}})
        """)

        graph.query(f"""
            MERGE (ct:Conto {{nome: '{conto_nome}', iban: '{conto_iban}'}})
        """)

        graph.query(f"""
            MATCH (c:Causale {{nome: '{causale_nome}'}})
            MATCH (ct:Conto {{nome: '{conto_nome}'}})
            CREATE (m:Movimento {{
                tipo: '{mov_tipo}',
                valore: {mov_valore},
                anno: {anno}
            }})
            CREATE (m)-[:HA_CAUSALE]->(c)
            CREATE (m)-[:SU_CONTO]->(ct)
        """)

    print(f"Caricati {len(rows)} movimenti nel grafo '{GRAPH_NAME}'")

def query_falkor():
    db = FalkorDB(**FALKOR_CONFIG)
    graph = db.select_graph(GRAPH_NAME)

    print("Query su FalkorDB:")

    print("── Totale movimenti per causale ──")

    result = graph.query("""
        MATCH (m:Movimento)-[:HA_CAUSALE]->(c:Causale)
        RETURN c.nome AS causale,
               COUNT(m) AS num_movimenti,
               SUM(m.valore) AS totale
        ORDER BY totale DESC
    """)
    for record in result.result_set:
        print(f"  {record[0]:30s}  n={record[1]:4d}  totale={record[2]:.2f}")

    print("── Movimenti per conto e anno ──")

    result = graph.query("""
        MATCH (m:Movimento)-[:SU_CONTO]->(ct:Conto)
        RETURN ct.nome AS conto, m.anno AS anno, SUM(m.valore) AS totale
        ORDER BY ct.nome, m.anno
    """)

    for record in result.result_set:
        print(f"  {record[0]:30s}  {record[1]}  totale={record[2]:.2f}")

    print("── Conti collegati alla causale 'Stipendio' (esempio) ──")
    
    result = graph.query("""
        MATCH (m:Movimento)-[:HA_CAUSALE]->(c:Causale {nome: 'Stipendio'}),
              (m)-[:SU_CONTO]->(ct:Conto)
        RETURN ct.nome AS conto, ct.iban AS iban, SUM(m.valore) AS totale
    """)

    if result.result_set:
        for record in result.result_set:
            print(f"  {record[0]:30s}  {record[1]}  totale={record[2]:.2f}")
    else:
        print("  (nessun risultato — adatta il nome della causale)")

if __name__ == "__main__":
    rows = fetch_from_mariadb()
    load_into_falkor(rows)
    query_falkor()

Visto che sto facendo un pò di test, ogni volta ripulisco il grafo; vedete voi se farlo o no.

Enjoy!


Condividi

Commentami!