java脚本开发
业务场景
使用java脚本将ElasticSearch数据库中数据导入至平台仓内。(当前平台仅提供ES导出能力,此时可以通过脚本满足)
前提条件
1)java类型: 本实践文件中java代码为启动类, 因此必须包含main函数入口
2)jar类型: 主jar中需要包含MANIFEST.MF文件(如图4), 且需要在该文件中指定Main-Class, 即主程序入口
使用限制
1)支持jdk21及以下版本;
2)尽管 jdk 保证对已发布 API 的兼容性,但也有些类和方法会在新版本中被标记为弃用,并最终被移除。如果你使用了某些已经从新版本的 JDK 中删除的类或方法,你可能会遇到 ClassNotFoundException 或 NoSuchMethodException 等异常。
操作流程
第一步:上传资源文件
1)在“资源管理 > 资源文件”中上传相关的依赖资源包文件。
- 若java节点配置中希望直接上传jar包执行,则必须先将主程序jar上传至资源文件;
第二步:创建数据开发作业
1)在数据开发 > 批量数据 > 数据加工中新建数据加工作业,并在画布中拖入【java脚本】节点。
2)【java脚本】提供两种运行类型,结合是否需要依赖,相关关键配置说明如下。详见下方脚本示例。更多细节配置可查看python脚本节点介绍。
- 资源: 所有选择的关联资源文件,在程序启动前会下载在到路径(/app/user)下,编写脚本时可使用该路径;
- 启动参数: 包含java启动命令及main方法入参。如需要执行jvm堆内存大小并传入参数: -Xmx1024m testParam1 testParam2, testParam1 testParam2即为main方法入参;不要在启动参数中指定classpath(-cp -classpath)
- 前置处理:如上图中可通过shell命令快速解压,则后续无需在脚本中定义解压方法和解压资源
运行类型一:JAVA(不依赖资源文件)
直接在脚本配置中写入代码即可。执行过程中,会根据类名在执行路径中生成对应java文件,如下图:
运行类型二:JAVA(依赖资源文件)
在脚本配置中写入代码,依赖的三方jar需要通过资源上传后引用。
运行类型三:JAR(不依赖资源文件)
当前的主程序包的jar中不包含依赖的jar包。需要在MANIFEST.MF中添加classpath路径(../lib/);以maven为例, 使用插件maven-jar-plugin可以实现
运行类型四:JAR(依赖资源文件)
代码及依赖的jar都打包到一个jar中,以maven为例,使用插件maven-assembly-plugin, spring-boot-maven-plugin等均可以实现
3)点击节点配置界面的“立即运行”,执行完成后可以点击查看日志。
第三步:运维监控
1)调度管理:在运维监控 > 调度管理找到对应作业,点击【查看作业任务】可查看运行详情,主要信息包含作业任务调度明细、各调度的导入数据量、日志下载、节点任务血缘等。
2)告警通知:若需监控作业状态,特别是运行失败,可在个人中心 > 消息通知中配置告警,支持邮件、钉钉群通知。对象类型选择“数据加工”、对象名称填写需监控的作业名称,通知类型选择“作业运行”,作业运行失败时,可发送消息通知。
脚本示例
示例场景:使用java脚本将ElasticSearch数据库中数据导入至平台仓内,以下示例脚本代码不包含所需依赖jar包内容。
# 定义解压方法
package com.hexadb.luban.example;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class Main {
public static void main(String[] args) {
String esHost = "172.16.0.159";
int port = 30092;
String esUsername = "elastic";
String esPassword = "y3H*******pxk_XI";
ESReader esReader = new ESReader(esHost, port, esUsername, esPassword);
DBWriter dbWriter = new DBWriter("jdbc:hexadb://172.16.0.105:26001/es_imp_example?currentSchema=stg", "slx_test_4_1", "1q****SX");
try {
// 确保表存在,如果不存在则创建
dbWriter.createTableIfNotExists();
// 从 Elasticsearch 获取数据(返回 Record 对象列表)
List<Record> records = esReader.fetchDataFromES("test_index");
// 将数据写入数据库
dbWriter.writeDataToDB(records);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
esReader.close();
dbWriter.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static class DBWriter {
private HikariDataSource dataSource;
public DBWriter(String dbUrl, String dbUser, String dbPassword) {
// 配置 HikariCP 数据库连接池
HikariConfig config = new HikariConfig();
config.setJdbcUrl(dbUrl);
config.setUsername(dbUser);
config.setPassword(dbPassword);
this.dataSource = new HikariDataSource(config);
}
// 确保表存在,如果不存在则创建
public void createTableIfNotExists() {
String createTableQuery = "CREATE TABLE IF NOT EXISTS tmp (" +
"id SERIAL PRIMARY KEY, " +
"name VARCHAR(255), " + // name 字段
"age INT, " + // age 字段
"sex VARCHAR(10), " + // sex 字段
"content text" + // json_data 字段
");";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(createTableQuery)) {
ps.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
}
// 将数据写入数据库
public void writeDataToDB(List<Record> records) {
String insertQuery = "INSERT INTO tmp (name, age, sex, content) VALUES (?, ?, ?, ?)";
try (Connection conn = dataSource.getConnection()) {
for (Record record : records) {
try (PreparedStatement ps = conn.prepareStatement(insertQuery)) {
ps.setString(1, record.getName());
ps.setInt(2, record.getAge());
ps.setString(3, record.getSex());
ps.setString(4, record.getContent());
ps.executeUpdate();
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
public void close() {
if (dataSource != null) {
dataSource.close();
}
}
}
public static class ESReader {
private final RestHighLevelClient client;
public ESReader(String esHost, int port, String esUsername, String esPassword) {
// 配置 Elasticsearch 客户端连接
HttpHost httpHost = new HttpHost(esHost, port, "http");
// 创建 RestHighLevelClient
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esUsername, esPassword));
client = new RestHighLevelClient(RestClient.builder(httpHost).setHttpClientConfigCallback(
httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));
}
public List<Record> fetchDataFromES(String index) throws IOException {
// 创建搜索请求
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery()); // 查询所有数据
sourceBuilder.from(0); // 从第一个数据开始
sourceBuilder.size(100); // 获取前 100 条数据
searchRequest.source(sourceBuilder);
// 执行查询
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
List<Record> records = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits()) {
// 从 Elasticsearch 返回的每条记录中获取字段数据
String name = (String) hit.getSourceAsMap().get("name");
Integer age = (Integer) hit.getSourceAsMap().get("age");
String sex = (String) hit.getSourceAsMap().get("id");
String jsonData = hit.getSourceAsString();
// 创建 Record 对象并添加到列表中
records.add(new Record(name, age != null ? age : 0, sex, jsonData));
}
return records;
}
public void close() throws IOException {
// 关闭客户端连接
client.close();
}
}
public static class Record {
private String name;
private int age;
private String sex;
private String content;
// 构造函数
public Record(String name, int age, String sex, String content) {
this.name = name;
this.age = age;
this.sex = sex;
this.content = content;
}
// Getter 和 Setter 方法
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
}