跳到主要内容

java脚本开发

业务场景

使用java脚本将ElasticSearch数据库中数据导入至平台仓内。(当前平台仅提供ES导出能力,此时可以通过脚本满足)

前提条件

1)java类型: 本实践文件中java代码为启动类, 因此必须包含main函数入口
2)jar类型: 主jar中需要包含MANIFEST.MF文件(如图4), 且需要在该文件中指定Main-Class, 即主程序入口

使用限制

1)支持jdk21及以下版本;
2)尽管 jdk 保证对已发布 API 的兼容性,但也有些类和方法会在新版本中被标记为弃用,并最终被移除。如果你使用了某些已经从新版本的 JDK 中删除的类或方法,你可能会遇到 ClassNotFoundException 或 NoSuchMethodException 等异常。

操作流程

第一步:上传资源文件

1)在“资源管理 > 资源文件”中上传相关的依赖资源包文件。 最佳实践

注意
  • 若java节点配置中希望直接上传jar包执行,则必须先将主程序jar上传至资源文件;

第二步:创建数据开发作业

1)在数据开发 > 批量数据 > 数据加工中新建数据加工作业,并在画布中拖入【java脚本】节点。 最佳实践

2)【java脚本】提供两种运行类型,结合是否需要依赖,相关关键配置说明如下。详见下方脚本示例。更多细节配置可查看python脚本节点介绍。

注意
  • 资源: 所有选择的关联资源文件,在程序启动前会下载在到路径(/app/user)下,编写脚本时可使用该路径;
  • 启动参数: 包含java启动命令及main方法入参。如需要执行jvm堆内存大小并传入参数: -Xmx1024m testParam1 testParam2, testParam1 testParam2即为main方法入参;不要在启动参数中指定classpath(-cp -classpath)
  • 前置处理:如上图中可通过shell命令快速解压,则后续无需在脚本中定义解压方法和解压资源

最佳实践

运行类型一:JAVA(不依赖资源文件)
直接在脚本配置中写入代码即可。执行过程中,会根据类名在执行路径中生成对应java文件,如下图: 最佳实践

运行类型二:JAVA(依赖资源文件)
在脚本配置中写入代码,依赖的三方jar需要通过资源上传后引用。

运行类型三:JAR(不依赖资源文件)
当前的主程序包的jar中不包含依赖的jar包。需要在MANIFEST.MF中添加classpath路径(../lib/);以maven为例, 使用插件maven-jar-plugin可以实现 最佳实践 最佳实践

运行类型四:JAR(依赖资源文件)
代码及依赖的jar都打包到一个jar中,以maven为例,使用插件maven-assembly-plugin, spring-boot-maven-plugin等均可以实现 最佳实践 最佳实践

3)点击节点配置界面的“立即运行”,执行完成后可以点击查看日志。 最佳实践

第三步:运维监控
1)调度管理:在运维监控 > 调度管理找到对应作业,点击【查看作业任务】可查看运行详情,主要信息包含作业任务调度明细、各调度的导入数据量、日志下载、节点任务血缘等。 最佳实践 最佳实践 2)告警通知:若需监控作业状态,特别是运行失败,可在个人中心 > 消息通知中配置告警,支持邮件、钉钉群通知。对象类型选择“数据加工”、对象名称填写需监控的作业名称,通知类型选择“作业运行”,作业运行失败时,可发送消息通知。 最佳实践

脚本示例

示例场景:使用java脚本将ElasticSearch数据库中数据导入至平台仓内,以下示例脚本代码不包含所需依赖jar包内容。

# 定义解压方法
package com.hexadb.luban.example;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

public class Main {
public static void main(String[] args) {
String esHost = "172.16.0.159";
int port = 30092;
String esUsername = "elastic";
String esPassword = "y3H*******pxk_XI";

ESReader esReader = new ESReader(esHost, port, esUsername, esPassword);
DBWriter dbWriter = new DBWriter("jdbc:hexadb://172.16.0.105:26001/es_imp_example?currentSchema=stg", "slx_test_4_1", "1q****SX");

try {
// 确保表存在,如果不存在则创建
dbWriter.createTableIfNotExists();

// 从 Elasticsearch 获取数据(返回 Record 对象列表)
List<Record> records = esReader.fetchDataFromES("test_index");

// 将数据写入数据库
dbWriter.writeDataToDB(records);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
esReader.close();
dbWriter.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static class DBWriter {
private HikariDataSource dataSource;

public DBWriter(String dbUrl, String dbUser, String dbPassword) {
// 配置 HikariCP 数据库连接池
HikariConfig config = new HikariConfig();
config.setJdbcUrl(dbUrl);
config.setUsername(dbUser);
config.setPassword(dbPassword);
this.dataSource = new HikariDataSource(config);
}

// 确保表存在,如果不存在则创建
public void createTableIfNotExists() {
String createTableQuery = "CREATE TABLE IF NOT EXISTS tmp (" +
"id SERIAL PRIMARY KEY, " +
"name VARCHAR(255), " + // name 字段
"age INT, " + // age 字段
"sex VARCHAR(10), " + // sex 字段
"content text" + // json_data 字段
");";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(createTableQuery)) {
ps.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
}

// 将数据写入数据库
public void writeDataToDB(List<Record> records) {
String insertQuery = "INSERT INTO tmp (name, age, sex, content) VALUES (?, ?, ?, ?)";

try (Connection conn = dataSource.getConnection()) {
for (Record record : records) {
try (PreparedStatement ps = conn.prepareStatement(insertQuery)) {
ps.setString(1, record.getName());
ps.setInt(2, record.getAge());
ps.setString(3, record.getSex());
ps.setString(4, record.getContent());
ps.executeUpdate();
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}

public void close() {
if (dataSource != null) {
dataSource.close();
}
}
}

public static class ESReader {

private final RestHighLevelClient client;

public ESReader(String esHost, int port, String esUsername, String esPassword) {
// 配置 Elasticsearch 客户端连接
HttpHost httpHost = new HttpHost(esHost, port, "http");
// 创建 RestHighLevelClient
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esUsername, esPassword));
client = new RestHighLevelClient(RestClient.builder(httpHost).setHttpClientConfigCallback(
httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));
}

public List<Record> fetchDataFromES(String index) throws IOException {
// 创建搜索请求
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery()); // 查询所有数据
sourceBuilder.from(0); // 从第一个数据开始
sourceBuilder.size(100); // 获取前 100 条数据
searchRequest.source(sourceBuilder);

// 执行查询
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

List<Record> records = new ArrayList<>();

for (SearchHit hit : searchResponse.getHits()) {
// 从 Elasticsearch 返回的每条记录中获取字段数据
String name = (String) hit.getSourceAsMap().get("name");
Integer age = (Integer) hit.getSourceAsMap().get("age");
String sex = (String) hit.getSourceAsMap().get("id");
String jsonData = hit.getSourceAsString();

// 创建 Record 对象并添加到列表中
records.add(new Record(name, age != null ? age : 0, sex, jsonData));
}

return records;
}

public void close() throws IOException {
// 关闭客户端连接
client.close();
}
}


public static class Record {
private String name;
private int age;
private String sex;
private String content;

// 构造函数
public Record(String name, int age, String sex, String content) {
this.name = name;
this.age = age;
this.sex = sex;
this.content = content;
}

// Getter 和 Setter 方法
public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public String getSex() {
return sex;
}

public void setSex(String sex) {
this.sex = sex;
}

public String getContent() {
return content;
}

public void setContent(String content) {
this.content = content;
}
}

}