Inviare dati a Elasticsearch con Ruby da MariaDB

Mattepuffo's logo
Inviare dati a Elasticsearch con Ruby da MariaDB

Inviare dati a Elasticsearch con Ruby da MariaDB

Avevo installato tutto lo stack ELK, ma non è affatto facile da configurare.

Inoltre così non si capisce bene il funzionamento di Elasticsearch, quindi ho deciso di usarlo in maniera indipendente.

In questo articolo vediamo come usare Ruby per inviargli dati da un db MariaDB; abbiamo anche una funzione per la ricerca.

Do per scontato che abbiate Elasticsearch installato; io ho usato Docker.

Prima di tutto le dipendenze; possiamo usare gem:

gem install elasticsearch mysql2

Qui sotto il codice:

require 'mysql2'
require 'elasticsearch'
require 'date'

def main
  es = Elasticsearch::Client.new(
    url: 'http://localhost:9200',
    transport_options: {
      headers: { 'Accept' => 'application/vnd.elasticsearch+json; compatible-with=8' }
    }
  )

  begin
    db = Mysql2::Client.new(
      host: "localhost",
      username: "root",
      password: "9211",
      database: "Sql592686_5",
      port: 3306
    )
  rescue Mysql2::Error => e
    puts "Errore connessione MariaDB: #{e.message}"
    exit 1
  end

  query = <<~SQL
    SELECT mov_tipo, mov_valore, mov_data, causale_nome 
    FROM movimenti 
    INNER JOIN causali ON causale_id = mov_causale_fk
  SQL

  puts "Estrazione dati da MariaDB..."
  results = db.query(query)

  bulk_data = []
  results.each do |row|
    bulk_data << { index: { _index: 'movimenti' } }
    bulk_data << {
      tipo: row['mov_tipo'],
      valore: row['mov_valore'].to_f,
      data: row['mov_data'].respond_to?(:iso8601) ? row['mov_data'].iso8601 : row['mov_data'].to_s,
      causale: row['causale_nome']
    }
  end

  if bulk_data.any?
    puts "Invio di #{results.count} documenti a Elasticsearch..."
    es.bulk(body: bulk_data)
    
    es.indices.refresh(index: 'movimenti')
  end

  puts "\n--- Risultati Ricerca ---"
  search_query = {
    query: {
      bool: {
        must: [{ match: { causale: 'Stipendio' } }],
        filter: [{ range: { valore: { gt: 1000 } } }]
      }
    }
  }

  response = es.search(index: 'movimenti', body: search_query)

  puts "Trovati #{response['hits']['total']['value']} risultati:"
  response['hits']['hits'].each do |hit|
    puts " - Documento: #{hit['_source']}"
  end

  db.close
  puts "\nProcesso completato."
end

main if __FILE__ == $0

Enjoy!


Condividi

Commentami!