调用BulkloadAPI
加载配置文件
驱动提供两种不同的方法加载bulkload的配置文件参数。
连接串参数
在连接串中通过bulkloadCtlFile参数配置bulkload的配置文件的路径。
例如:
String url = "jdbc:postgres://[ip]:[port]/[dbname]?user=xxx&password=xxxx&bulkloadCtlFile=E:\\bulkload.ctl"
setBulkloadInfo()
通过调用setBulkloadInfo方法设置bulkload参数。
例如:
String url = "jdbc:postgres://[ip]:[port]/[dbname]?user=xxx&password=xxxx"
Connection conn = DriverManager.getConnection(url);
BulkloadManager bulkloadAPI = ((PGconnection) conn).getBulkloadAPI();
BulkloadProperties bulkloadInfo = new BulkloadProperties();
bulkloadInfo.put("INPUT","stdin");
bulkloadInfo.put("PARSE_ADFILE","/home/vastbase/tbl_bulkload.txt");
bulkloadInfo.put("LIMIT","INFINITE");
bulkloadInfo.put("PARSE_ERRORS","0");
bulkloadInfo.put("TYPE","CSV");
bulkloadInfo.put("SKIP","0");
bulkloadInfo.put("OUTPUT","tbl_buklload");
BulkloadAPI.setBulkloadInfo(bulkloadInfo);
获取bulkload管理器
BulkloadManager bulkloadAPI = ((PGconnection) conn).getBulkloadAPI();
使用bulkload管理器
通过调用bulkloadIn()方法将数据导入数据库
bulkloadIn()
当bulkloadInfo中的INPUT参数为path/function时,可直接调用bulkloadIn()方法调用数据库的bulkload接口
例如:
public void testBulkloadWithServerLocalFileInput() throws SQLException, IOExecption {
bulkloadAPI.getBulkloadInfo().put("INPUT","/home/vastbase/tbl_bulkload_output.txt");
try {
ResultSet result = bulkloadAPI.bulkloadIn();
// ResultSet result = bulkloadAPI.bulkloadIn((InputStream)null);
While(result.next()) {
for(int i = 1; i< result.getMetaData().getColumnCount() + 1; i ++) {
System.out.println(result.getMetaData().getColumnLabel(i) + result.getObject(result.getMetaData().getColumnLabel(i)));
}
}
}
}
bulkloadIn(InputStream from)
当bulkloadInfo中的INPUT参数为path/function时,用此方法会直接调研数据库的bulkload接口,忽略入参from的输入, 当INPUT参数为stdin时,JDBC驱动会读取form的输入,然后发送至数据库。
例如:
Public void testBulkloadWithJDBCRemoteFileInput() throws SQLException, IOExecption {
bulkloadAPI.getBulkloadInfo().put("INPUT","stdin");
InputStream is = null;
try {
Is = readfile()
ResultSet result = bulkloadAPI.bulkloadIn(is);
// ResultSet result = bulkloadAPI.bulkloadIn(is, 655360);
While(result.next()) {
for(int i = 1; i< result.getMetaData().getColumnCount() + 1; i ++) {
System.out.println(result.getMetaData().getColumnLabel(i) + result.getObject(result.getMetaData().getColumnLabel(i)));
}
}
} finally {
if (is != null) {
is.close();
}
}
}
bulkloadIn(String folderPath, Pattern fileNamePattern)
当bulkloadInfo中的INPUT参数为path/function时,用此方法会直接调研数据库的bulkload接口,忽略入参folerPath和fileNamePattern,当INPUT参数为stdin时,JDBC驱动会根据folderPath和fileNamePattern参数读取指定文件内容,发送至数据库。
例如:
Public void testBulkloadWithFolderPathAndPattern() throws SQLException, IOExecption {
bulkloadAPI.getBulkloadInfo().put("INPUT","stdin");
File file = new File("src/test/resources");
String folderPath = file.getAbsolutePath();
Pattern pattern = Pattern.compile("tbl_bulkload_small*", Pattern.CASE_INSENSITIVE);
try {
Is = readfile()
ResultSet result = bulkloadAPI.bulkloadIn(folderPath, pattern);
While(result.next()) {
for(int i = 1; i< result.getMetaData().getColumnCount() + 1; i ++) {
System.out.println(result.getMetaData().getColumnLabel(i) + result.getObject(result.getMetaData().getColumnLabel(i)));
}
}
}
}
bulkloadIn(File[] files)
当bulkloadInfo中的INPUT参数为path/function时,用此方法会直接调研数据库的bulkload接口,忽略入参files,当INPUT参数为stdin时,JDBC驱动会根据files读取指定文件内容,发送至数据库。
例如:
Public void testBulkloadWithFolderPathAndPattern() throws SQLException, IOExecption {
bulkloadAPI.getBulkloadInfo().put("INPUT","stdin");
File[] file = new File[] {
new File("src/test/resources/tbl_bulkload_small_1.txt"),
new File("src/test/resources/tbl_bulkload_small_3.txt"),
}
try {
Is = readfile()
ResultSet result = bulkloadAPI.bulkloadIn(files);
While(result.next()) {
for(int i = 1; i< result.getMetaData().getColumnCount() + 1; i ++) {
System.out.println(result.getMetaData().getColumnLabel(i) + result.getObject(result.getMetaData().getColumnLabel(i)));
}
}
}
}
使用批量接口
是用bulkloadAPI的接口,可以将内存数据批量加载至数据库中
例如:
Connection conn = DriverManager.getConnection(ulr);
BulkloadManager bulkloadAPI = ((PGConnection) conn).getBulkloadAPI();
int count = 0;
Record record;
While((record = S1Loader.parseFile(file))!= null) {
count++;
bulkloadAPI.setString(1,(String)record.get(0));
bulkloadAPI.setNumber(2,(Number)record.get(1));
bulkloadAPI.addBatch();
if(count % 10000 == 0) {
ResultSet rs = bulkloadAPI.executeBatch();
Count = 0;
}
}
ResultSet res = bulkloadAPI.executeBatch();