Inviare dati a Elasticsearch con Java da MariaDB
Avevo installato tutto lo stack ELK, ma non è affatto facile da configurare.
Inoltre così non si capisce bene il funzionamento di Elasticsearch, quindi ho deciso di usarlo in maniera indipendente.
In questo articolo vediamo come usare Java per inviargli dati da un db MariaDB; abbiamo anche una funzione per la ricerca.
Do per scontato che abbiate Elasticsearch installato; io ho usato Docker.
Prima di tutto le dipendenze; se usate Maven:
<dependencies>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.12.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.17.0</version>
</dependency>
</dependencies>
Qui sotto un pò di codice:
package org.example;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import java.io.IOException;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;
public class Main {
public static void main(String[] args) {
try (RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build()) {
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient esClient = new ElasticsearchClient(transport);
String dbUrl = "jdbc:mariadb://localhost:3306/Sql592686_5";
String query = "SELECT mov_tipo, mov_valore, mov_data, causale_nome " +
"FROM movimenti INNER JOIN causali ON causale_id = mov_causale_fk";
try (Connection conn = DriverManager.getConnection(dbUrl, "root", "9211")) {
try (Statement stmt = conn.createStatement()) {
try (ResultSet rs = stmt.executeQuery(query)) {
BulkRequest.Builder br = new BulkRequest.Builder();
int count = 0;
int batchSize = 500;
while (rs.next()) {
Map<String, Object> doc = new HashMap<>();
doc.put("tipo", rs.getString("mov_tipo"));
doc.put("valore", rs.getDouble("mov_valore"));
doc.put("data", rs.getTimestamp("mov_data").toInstant().toString());
doc.put("causale", rs.getString("causale_nome"));
br.operations(op -> op
.index(idx -> idx
.index("movimenti")
.document(doc)
)
);
if (++count % batchSize == 0) {
processBulk(esClient, br);
br = new BulkRequest.Builder();
}
}
if (count % batchSize != 0) {
processBulk(esClient, br);
}
System.out.println("Totale documenti indicizzati: " + count);
esClient.indices().refresh(r -> r.index("movimenti"));
System.out.println("--- Avvio Ricerca di Test ---");
cercaMovimenti(esClient, "Stipendio", 1000.0);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
} catch (SQLException e) {
System.out.println(e.getMessage());
}
} catch (IOException e) {
System.out.println(e.getMessage());
}
}
private static void processBulk(ElasticsearchClient client, BulkRequest.Builder br) throws Exception {
BulkResponse result = client.bulk(br.build());
if (result.errors()) {
System.out.println("Alcuni errori si sono verificati durante il bulk:");
for (BulkResponseItem item : result.items()) {
if (item.error() != null) {
System.out.println(item.error().reason());
}
}
}
}
private static void cercaMovimenti(ElasticsearchClient esClient, String causale, double valoreMinimo) {
try {
SearchResponse<Map> response = esClient.search(s -> s
.index("movimenti")
.query(q -> q
.bool(b -> b
.must(m -> m
.match(t -> t
.field("causale")
.query(causale)
)
)
.filter(f -> f
.range(r -> r
.field("valore")
.gt(JsonData.of(valoreMinimo))
)
)
)
),
Map.class
);
System.out.println("Trovati " + response.hits().total().value() + " risultati:");
for (Hit<Map> hit : response.hits().hits()) {
System.out.println("Documento ID: " + hit.id() + " -> " + hit.source());
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
Enjoy!
java maven mariadb elastisearch
Commentami!