JDBC访问AWS Athena

JDBC访问AWS Athena

摘要

JDBC访问AWS Athena

AWS Athena

AWS Athena 是什么呢?
它提供sql方式访问s3数据的能力,而s3的数据量可以无限大.

code

package net.javablog;

import org.nutz.dao.entity.Record;
import org.nutz.json.Json;
import org.nutz.lang.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.Properties;


public class AthenaJDBCDemo {

    static final String ak = "ak";
    static final String sk = "sk";
    static final String athenaUrl = "jdbc:awsathena://athena.ap-southeast-1.amazonaws.com:443";
    static final Logger log = LoggerFactory.getLogger(AthenaJDBCDemo.class);



//  下载驱动   http://docs.aws.amazon.com/athena/latest/ug/connect-with-jdbc.html#jdbc-credentials-provider

 // 需要指定database的名字.  这里是test , 表名也是test
    public static void main(String[] args) {

        String sql = "select * from test.test  limit 10";

        query(sql, new Callback<ResultSet>() {
            @Override
            public void invoke(ResultSet rs) {

                Record rec = rs2rec(rs);
                System.out.println(Json.toJson(rec));

            }
        });

    }

    private static void query(String sql ,Callback<ResultSet> callback) {
        Connection conn = null;
        Statement statement = null;

        try {

            Class.forName("com.amazonaws.athena.jdbc.AthenaDriver");
            Properties info = new Properties();
            info.put("s3_staging_dir", "s3://aws-athena-query-results-616550600412-ap-southeast-1/daily_me/");
            info.put("log_path", "C:\\athenajdbc.log");
            info.put("aws_credentials_provider_class", "net.javablog.CustomSessionCredentialsProvider");
            String providerArgs = ak + "," + "" + sk;
            info.put("aws_credentials_provider_arguments", providerArgs);

            System.out.println("Connecting to Athena...");
            conn = DriverManager.getConnection(athenaUrl, info);

//            String sql = "show tables in test";
            statement = conn.createStatement();
            ResultSet rs = statement.executeQuery(sql);
            int index = 0;
            while (rs.next()) {
                log.info(String.valueOf(index++));
                callback.invoke(rs);
            }
            rs.close();
            conn.close();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            try {
                if (statement != null)
                    statement.close();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            try {
                if (conn != null)
                    conn.close();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
        System.out.printf("Finished connectivity test.");
    }

    public static Record rs2rec(ResultSet rs) {
        Record rec = new Record();
        try {
            ResultSetMetaData rsmd = rs.getMetaData();
            for (int i = 1; i <= rsmd.getColumnCount(); i++) {
                String name = rsmd.getColumnName(i);
                rec.set(name, rs.getObject(name));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return rec;
    }
}
package net.javablog;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;

/**
 * Created by zsl on 2017/8/2.
 */
public class CustomSessionCredentialsProvider implements
        AWSCredentialsProvider {

    private AWSCredentials credentials;

    public CustomSessionCredentialsProvider(String ak, String sk) {
        this.credentials = new BasicAWSCredentials(ak, sk);
    }

    @Override
    public AWSCredentials getCredentials() {
        return credentials;
    }

    @Override
    public void refresh() {

    }
}

驱动下载地址

http://docs.aws.amazon.com/athena/latest/ug/connect-with-jdbc.html#jdbc-credentials-provider

完整demo地址

https://github.com/daodaovps/testAWSAthena