VastbaseG100

基于openGauss内核开发的企业级关系型数据库。

Menu

调用BulkloadAPI

加载配置文件

驱动提供两种不同的方法加载bulkload的配置文件参数。

前置步骤

1、在JDBC连接的数据库创建 vb_bulkload 扩展。 2、为连接用户授予 vbbl 模式的权限。 3、已创建 bulkload.ctl

连接串参数

在连接串中通过bulkloadCtlFile参数配置bulkload的配置文件的路径。例如:

String url =
"jdbc:postgres://[ip]:[port]/[dbname]?user=xxx&password=xxxx&bulkloadCtlFile=E:\\bulkload.ctl"

setBulkloadInfo()

通过调用setBulkloadInfo方法设置bulkload参数。例如:

String url = "jdbc:postgres://[ip]:[port]/[dbname]?user=xxx&password=xxxx";
Connection conn = DriverManager.getConnection(url);
BulkloadManager bulkloadAPI = ((PgConnection) conn).getBulkloadAPI();
BulkloadProperties bulkloadInfo = new BulkloadProperties();
bulkloadInfo.put("IMP_FUNC","vb")
bulkloadInfo.put("INPUT","stdin");
bulkloadInfo.put("PARSE_ADFILE","/home/vastbase/tbl_bulkload.txt");
bulkloadInfo.put("LIMIT","INFINITE");
bulkloadInfo.put("PARSE_ERRORS","0");
bulkloadInfo.put("TYPE","CSV");
bulkloadInfo.put("SKIP","0");
bulkloadInfo.put("OUTPUT","public.tbl_buklload");
bulkloadAPI.setBulkloadInfo(bulkloadInfo);

获取bulkload管理器

BulkloadManager bulkloadAPI = ((PgConnection) conn).getBulkloadAPI();

使用bulkload管理器

通过调用bulkloadIn()方法将数据导入数据库。

bulkloadIn()

当bulkloadInfo中的INPUT参数为path/function时,可直接调用bulkloadIn()方法调用数据库的bulkload接口。例如:

public void testBulkloadWithServerLocalFileInput() throws SQLException, IOExecption {
bulkloadAPI.getBulkloadInfo().put("INPUT","/home/omm/tbl_bulkload_output.txt");
try {
     ResultSet result = bulkloadAPI.bulkloadIn();
     // ResultSet result = bulkloadAPI.bulkloadIn((InputStream)null);
     while(result.next()) {
     	  for(int i = 1; i< result.getMetaData().getColumnCount() + 1; i ++) {
           System.out.println(result.getMetaData().getColumnLabel(i) + result.getObject(result.getMetaData().getColumnLabel(i)));
      	  }
		}
	} 
}

bulkloadIn(InputStream from)

当bulkloadInfo中的INPUT参数为path/function时,用此方法会直接调研数据库的bulkload接口,忽略入参from的输入,当INPUT参数为stdin时,JDBC驱动会读取form的输入,然后发送至数据库。例如:

Public void testBulkloadWithJDBCRemoteFileInput() throws SQLException, IOExecption {
bulkloadAPI.getBulkloadInfo().put("INPUT","stdin");
InputStream is = null;
try {
     Is = readfile();
     ResultSet result = bulkloadAPI.bulkloadIn(Is);
     // ResultSet result = bulkloadAPI.bulkloadIn(Is, 655360);
     while(result.next()) {
     	  for(int i = 1; i< result.getMetaData().getColumnCount() + 1; i ++) {
           System.out.println(result.getMetaData().getColumnLabel(i) + result.getObject(result.getMetaData().getColumnLabel(i)));
        }
}
} finally {
   if (is != null) {
       is.close();
}
}
}

bulkloadIn(String folderPath, Pattern fileNamePattern)

当bulkloadInfo中的INPUT参数为path/function时,用此方法会直接调研数据库的bulkload接口,忽略入参folerPath和fileNamePattern,当INPUT参数为stdin时,JDBC驱动会根据folderPath和fileNamePattern参数读取指定文件内容,发送至数据库。例如:

Public void testBulkloadWithFolderPathAndPattern() throws SQLException, IOExecption {
bulkloadAPI.getBulkloadInfo().put("INPUT","stdin");
File file = new File("src/test/resources");
String folderPath = file.getAbsolutePath();
Pattern pattern = Pattern.compile("tbl_bulkload_small*", Pattern.CASE_INSENSITIVE);
try {
     ResultSet result = bulkloadAPI.bulkloadIn(folderPath, pattern);
     while(result.next()) {
     	  for(int i = 1; i< result.getMetaData().getColumnCount() + 1; i ++) {
           System.out.println(result.getMetaData().getColumnLabel(i) + result.getObject(result.getMetaData().getColumnLabel(i)));
        }
}
}
}

bulkloadIn(File[] files)

当bulkloadInfo中的INPUT参数为path/function时,用此方法会直接调研数据库的bulkload接口,忽略入参files,当INPUT参数为stdin时,JDBC驱动会根据files读取指定文件内容,发送至数据库。例如:

Public void testBulkloadWithFolderPathAndPattern() throws SQLException, IOExecption {
bulkloadAPI.getBulkloadInfo().put("INPUT","stdin");
File[] file = new File[] {
   new File("src/test/resources/tbl_bulkload_small_1.txt"),
   new File("src/test/resources/tbl_bulkload_small_3.txt"),
};
try {
     ResultSet result = bulkloadAPI.bulkloadIn(file);
     while(result.next()) {
     	  for(int i = 1; i< result.getMetaData().getColumnCount() + 1; i ++) {
           System.out.println(result.getMetaData().getColumnLabel(i) + result.getObject(result.getMetaData().getColumnLabel(i)));
        }
}
}
}

使用批量接口

使用bulkloadAPI的接口,可以将内存数据批量加载至数据库中。例如:

Connection conn = DriverManager.getConnection(url);
BulkloadManager bulkloadAPI = ((PgConnection) conn).getBulkloadAPI();
int count = 0;
Record record;
while((record = S1Loader.parseFile(file))!= null) {
   count++;
     bulkloadAPI.setString(1,(String)record.get(0));
     bulkloadAPI.setNumber(2,(Number)record.get(1));
bulkloadAPI.addBatch();
if(count % 10000 == 0) {
   ResultSet rs = bulkloadAPI.executeBatch();
   Count = 0;
}
}
ResultSet res = bulkloadAPI.executeBatch();