Da MariaDB a Parquet con Java
In questo articolo vediamo come creare un file Parquet con Java e Apache Arrow.
I dati li prenderemo da un database MariaDB.
Devo dire che rispetto a Python in Java è stato decisamente più complicato.
Prima di tutto le dipendenze: se usate Maven:
<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
</dependencies>
Qui sotto un pò di codice:
package org.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import java.io.IOException;
import java.sql.*;
public class Main {
private static final String DB_URL = "jdbc:mysql://localhost:3306/Sql592686_5";
private static final String DB_USER = "root";
private static final String DB_PASSWORD = "9211";
public static void main(String[] args) {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
ParquetWriter<Group> writer = null;
try {
String schemaString =
"message Movimento { " +
" required binary mov_tipo (UTF8); " +
" required double mov_valore; " +
" required int64 mov_data; " +
" required binary causale_nome (UTF8); " +
"}";
MessageType schema = MessageTypeParser.parseMessageType(schemaString);
Path outputPath = new Path("movimenti.parquet");
Configuration conf = new Configuration();
GroupWriteSupport.setSchema(schema, conf);
writer = new ParquetWriter<>(
outputPath,
new GroupWriteSupport(),
CompressionCodecName.SNAPPY,
ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE,
true,
false,
ParquetWriter.DEFAULT_WRITER_VERSION,
conf
);
SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
Class.forName("com.mysql.cj.jdbc.Driver");
conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
stmt = conn.createStatement();
String query = "SELECT mov_tipo, " +
" mov_valore, " +
" mov_data, " +
" causale_nome " +
"FROM movimenti " +
"INNER JOIN causali ON causale_id = mov_causale_fk";
rs = stmt.executeQuery(query);
int recordCount = 0;
while (rs.next()) {
String movTipo = rs.getString("mov_tipo");
double movValore = rs.getDouble("mov_valore");
Timestamp movData = rs.getTimestamp("mov_data");
String causalNome = rs.getString("causale_nome");
Group record = groupFactory.newGroup()
.append("mov_tipo", movTipo)
.append("mov_valore", movValore)
.append("mov_data", movData.getTime())
.append("causale_nome", causalNome);
writer.write(record);
recordCount++;
if (recordCount % 1000 == 0) {
System.out.println("Scritti " + recordCount + " record...");
}
}
System.out.println("Totale record esportati: " + recordCount);
} catch (Exception e) {
System.err.println(e.getMessage());
} finally {
try {
if (rs != null) rs.close();
if (stmt != null) stmt.close();
if (conn != null) conn.close();
} catch (SQLException e) {
System.err.println(e.getMessage());
}
try {
if (writer != null) writer.close();
} catch (IOException e) {
System.err.println(e.getMessage());
}
}
}
}
Come versione del JDK ho usato la 21, ed è importante, perchè con le versioni più nuove mi ha dato un sacco di problemi.
Enjoy!
java maven apache arrow mariadb hadoop
Commentami!