Da MariaDB a Parquet con Java

Mattepuffo's logo
Da MariaDB a Parquet con Java

Da MariaDB a Parquet con Java

In questo articolo vediamo come creare un file Parquet con Java e Apache Arrow.

I dati li prenderemo da un database MariaDB.

Devo dire che rispetto a Python in Java è stato decisamente più complicato.

Prima di tutto le dipendenze: se usate Maven:

    <dependencies>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop</artifactId>
            <version>1.13.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.3.6</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>
    </dependencies>

Qui sotto un pò di codice:

package org.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.example.GroupWriteSupport;

import java.io.IOException;
import java.sql.*;

public class Main {
    private static final String DB_URL = "jdbc:mysql://localhost:3306/Sql592686_5";
    private static final String DB_USER = "root";
    private static final String DB_PASSWORD = "9211";

    public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;
        ResultSet rs = null;
        ParquetWriter<Group> writer = null;

        try {
            String schemaString =
                    "message Movimento { " +
                            "  required binary mov_tipo (UTF8); " +
                            "  required double mov_valore; " +
                            "  required int64 mov_data; " +
                            "  required binary causale_nome (UTF8); " +
                            "}";

            MessageType schema = MessageTypeParser.parseMessageType(schemaString);

            Path outputPath = new Path("movimenti.parquet");
            Configuration conf = new Configuration();
            GroupWriteSupport.setSchema(schema, conf);

            writer = new ParquetWriter<>(
                    outputPath,
                    new GroupWriteSupport(),
                    CompressionCodecName.SNAPPY,
                    ParquetWriter.DEFAULT_BLOCK_SIZE,
                    ParquetWriter.DEFAULT_PAGE_SIZE,
                    ParquetWriter.DEFAULT_PAGE_SIZE,
                    true,
                    false,
                    ParquetWriter.DEFAULT_WRITER_VERSION,
                    conf
            );

            SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);

            Class.forName("com.mysql.cj.jdbc.Driver");
            conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
            stmt = conn.createStatement();

            String query = "SELECT mov_tipo, " +
                    "       mov_valore, " +
                    "       mov_data, " +
                    "       causale_nome " +
                    "FROM movimenti " +
                    "INNER JOIN causali ON causale_id = mov_causale_fk";

            rs = stmt.executeQuery(query);

            int recordCount = 0;

            while (rs.next()) {
                String movTipo = rs.getString("mov_tipo");
                double movValore = rs.getDouble("mov_valore");
                Timestamp movData = rs.getTimestamp("mov_data");
                String causalNome = rs.getString("causale_nome");

                Group record = groupFactory.newGroup()
                        .append("mov_tipo", movTipo)
                        .append("mov_valore", movValore)
                        .append("mov_data", movData.getTime())
                        .append("causale_nome", causalNome);

                writer.write(record);
                recordCount++;

                if (recordCount % 1000 == 0) {
                    System.out.println("Scritti " + recordCount + " record...");
                }
            }

            System.out.println("Totale record esportati: " + recordCount);
        } catch (Exception e) {
            System.err.println(e.getMessage());
        } finally {
            try {
                if (rs != null) rs.close();
                if (stmt != null) stmt.close();
                if (conn != null) conn.close();
            } catch (SQLException e) {
                System.err.println(e.getMessage());
            }

            try {
                if (writer != null) writer.close();
            } catch (IOException e) {
                System.err.println(e.getMessage());
            }
        }
    }
}

Come versione del JDK ho usato la 21, ed è importante, perchè con le versioni più nuove mi ha dato un sacco di problemi.

Enjoy!


Condividi

Commentami!