Inviare dati a Elasticsearch con Python da MariaDB

Mattepuffo's logo
Inviare dati a Elasticsearch con Python da MariaDB

Inviare dati a Elasticsearch con Python da MariaDB

Avevo installato tutto lo stack ELK, ma non è affatto facile da configurare.

Inoltre così non si capisce bene il funzionamento di Elasticsearch, quindi ho deciso di usarlo in maniera indipendente.

In questo articolo vediamo come usare Pyhon per inviargli dati da un db MariaDB; abbiamo anche una funzione per la ricerca.

Do per scontato che abbiate Elasticsearch installato; io ho usato Docker.

Prima di tutto le dipendenze; possiamo usare pip:

pip install "elasticsearch<8.13.0" pymysql

Per fare funzionare tutto ho dovuto installare una versione più vecchia.

Qui sotto il codice:

import pymysql
from elasticsearch import Elasticsearch, helpers
import sys

def main():
    es = Elasticsearch(
        "http://localhost:9200",
        headers={"Accept": "application/vnd.elasticsearch+json; compatible-with=8"}
    )

    try:
        conn = pymysql.connect(
            host="172.17.0.6",
            user="root",
            password="9211",
            database="Sql592686_5",
            port=3306,
            cursorclass=pymysql.cursors.DictCursor
        )
    except Exception as e:
        print(f"Errore nella connessione a MariaDB: {e}")
        sys.exit(1)

    try:
        with conn.cursor() as cur:
            query = """
                SELECT mov_tipo, mov_valore, mov_data, causale_nome 
                FROM movimenti 
                INNER JOIN causali ON causale_id = mov_causale_fk
            """
            cur.execute(query)

            def generate_data():
                for row in cur:
                    yield {
                        "_index": "movimenti",
                        "_source": {
                            "tipo": row["mov_tipo"],
                            "valore": float(row["mov_valore"]),  # Assicuriamoci sia un float
                            "data": row["mov_data"].isoformat() if hasattr(row["mov_data"], 'isoformat') else str(
                                row["mov_data"]),
                            "causale": row["causale_nome"]
                        }
                    }

            print("Invio dati a Elasticsearch...")
            success, failed = helpers.bulk(es, generate_data())
            print(f"Operazione completata. Successi: {success}, Fallimenti: {failed}")

            es.indices.refresh(index="movimenti")

            print("n--- Eseguo ricerca di test ---")
            res = es.search(
                index="movimenti",
                query={
                    "bool": {
                        "must": [{"match": {"causale": "Stipendio"}}],
                        "filter": [{"range": {"valore": {"gt": 1000}}}]
                    }
                }
            )

            print(f"Trovati {res['hits']['total']['value']} risultati:")
            for hit in res['hits']['hits']:
                print(f" - Documento: {hit['_source']}")

    finally:
        conn.close()
        print("nConnessioni chiuse. Script terminato.")

if __name__ == "__main__":
    main()

Enjoy!


Condividi

Commentami!