package com.alibaba.datax.plugin.reader.jsonreader;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class JsonReader extends Reader {
public static class Job extends Reader.Job {
private Configuration configuration;
@Override
public void init() {
this.configuration = getPluginJobConf();
}
@Override
public void prepare() {
}
@Override
public void post() {
}
@Override
public void destroy() {
}
@Override
public List<Configuration> split(int adviceNumber) {
// 分割任务,返回多个配置片段
List<Configuration> splitConfigurations = new ArrayList<>();
for (int i = 0; i < adviceNumber; i++) {
splitConfigurations.add(this.configuration.clone());
}
return splitConfigurations;
}
}
public static class Task extends Reader.Task {
private Configuration configuration;
private ObjectMapper objectMapper;
@Override
public void init() {
this.configuration = getPluginJobConf();
this.objectMapper = new ObjectMapper();
}
@Override
public void prepare() {
}
@Override
public void startRead(RecordSender recordSender) {
String url = configuration.getString("url");
String method = configuration.getString("method", "GET");
Map<String, Object> headers = configuration.getMap("headers", new HashMap<>());
Map<String, Object> params = configuration.getMap("params", new HashMap<>());
int startLevel = configuration.getInt("startLevel", 1);
String listKey = configuration.getString("listKey", "");
List<String> columns = configuration.getList("column", String.class);
// // 获取字段映射配置 暂时不用
// List<Map> rawColumns = configuration.getList("column", Map.class);
//
// // 手动转换为 List<Map<String, String>> 暂时不用
// List<Map<String, String>> columns = new ArrayList<>();
// for (Map rawColumn : rawColumns) {
// columns.add((Map<String, String>) rawColumn);
// }
try {
String fullUrl = buildFullUrl(url, params);
if (fullUrl == null) {
throw new DataXException("url拼接params参数后为空,请检查url和params是否为空");
}
String jsonResponse = executeRequest(fullUrl, method, headers);
List<Map<String, Object>> parsedData = parseJson(jsonResponse, startLevel, listKey);
if (parsedData == null) {
throw new DataXException("经解析过的json数据为空,或者层级、层级名称填写错误");
}
for (Map<String, Object> data : parsedData) {
Record record = recordSender.createRecord();
for (String column : columns) {
// String jsonKey = column.get("jsonKey"); //这里暂时改一下,配置文件改成数组
// String columnName = column.get("columnName"); //这是是本想匹配写入字段的,按照顺序就行了
Object value = data.get(column);
if (value != null) {
record.addColumn(new StringColumn(value.toString()));
} else {
record.addColumn(new StringColumn(null)); // 插入null值
}
}
recordSender.sendToWriter(record);
}
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
String.format("JsonReader失败原因:", e));
}
}
@Override
public void post() {
}
@Override
public void destroy() {
}
private String buildFullUrl(String baseUrl, Map<String, Object> params) {
StringBuilder fullUrl = new StringBuilder(baseUrl);
if (!params.isEmpty()) {
fullUrl.append("?");
params.forEach((key, value) -> fullUrl.append(key).append("=").append(value).append("&"));
fullUrl.deleteCharAt(fullUrl.length() - 1);
}
return fullUrl.toString();
}
private String executeRequest(String url, String method, Map<String, Object> headers) throws Exception {
Request request = method.equalsIgnoreCase("POST") ? Request.Post(url) : Request.Get(url);
headers.forEach((key, value) -> request.addHeader(key, value.toString()));
Response response = request.execute();
// 强制使用UTF-8解析响应内容
String content = response.returnContent().asString(java.nio.charset.StandardCharsets.UTF_8);
return content;
}
private List<Map<String, Object>> parseJson(String jsonStr, int startLevel, String listKey) {
Object root = parseJsonStr(jsonStr);
List<Map<String, Object>> resultList = new ArrayList<>();
processObject(root, resultList, startLevel, 1, listKey);
return resultList;
}
private Object parseJsonStr(String jsonString) {
try {
return objectMapper.readValue(jsonString.replaceAll("\\r\\n|\\n", ""), new TypeReference<Map<String, Object>>() {});
} catch (IOException e) {
try {
return objectMapper.readValue(jsonString.replaceAll("\\r\\n|\\n", ""), new TypeReference<List<Object>>() {});
} catch (IOException ex) {
return null;
}
}
}
private void processObject(Object obj, List<Map<String, Object>> resultList, int startLevel, int currentLevel, String listKey) {
if (obj instanceof List) {
processList((List<?>) obj, resultList, startLevel, currentLevel, listKey);
} else if (obj instanceof Map) {
processMap((Map<?, ?>) obj, resultList, startLevel, currentLevel, listKey);
}
}
private void processList(List<?> list, List<Map<String, Object>> resultList, int startLevel, int currentLevel, String listKey) {
for (Object element : list) {
if (element instanceof Map) {
processMap((Map<?, ?>) element, resultList, startLevel, currentLevel, listKey);
}
}
}
private void processMap(Map<?, ?> map, List<Map<String, Object>> resultList, int startLevel, int currentLevel, String listKey) {
if (currentLevel >= startLevel) {
Map<String, Object> tempMap = new HashMap<>();
for (Map.Entry<?, ?> entry : map.entrySet()) {
Object key = ent
没有合适的资源?快使用搜索试试~ 我知道了~
对datax进行了插件扩展-以支持对http接口数据的采集-jsonreader-v1.1.1.zip

共7个文件
xml:2个
json:2个
iml:1个

1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉

温馨提示
对datax进行了插件扩展_以支持对http接口数据的采集_jsonreader_v1.1.1.zip 这个文件是我们在项目中,对大数据采集功能的需求进行的开发和补充. 通过这个插件,可以实现,直接对接http接口,从接口中获取数据,然后将数据 存储到对应的数据库中. 实现从api接口中采集数据,实现api接口和数据库之间的数据全量,或者增量采集. 另外增量采集做了优化,增量采集支持,如果数据存在就自动更新数据,先删除此条数据,然后重新插入,如果数据不存在就直接插入. 从而可以随时多次进行增量采集.
资源推荐
资源详情
资源评论






























收起资源包目录





















共 7 条
- 1
资源评论

- 普通网友2025-02-18资源不错,内容挺好的,有一定的使用价值,值得借鉴,感谢分享。

添柴程序猿
- 粉丝: 4043
上传资源 快速赚钱
我的内容管理 展开
我的资源 快来上传第一个资源
我的收益
登录查看自己的收益我的积分 登录查看自己的积分
我的C币 登录后查看C币余额
我的收藏
我的下载
下载帮助


最新资源
- 论网络环境下小学美术教育中的德育渗透.docx
- 单片机花样彩灯课程设计报告66165.doc
- 电子商务大赛技能规则.doc
- 移动互联网+SPOC的计算机混合教学实验研究.docx
- 高铁项目管理施工成本管理中的问题及对策研究.doc
- 单片机的游泳馆计价器系统设计.doc
- 基于相关学习神经网络的图像识别方法.docx
- 时分复用通信系统的设计与实现.docx
- 我国计算机编制铁路列车运行图技术迈出历史性一步.docx
- 单片机交通灯设计和实现.doc
- 互联网+背景下旅游酒店产品和服务升级策略探析.docx
- 《网络安全技术》课程标准.doc
- 基于智慧教育的学习大数据分析技术.docx
- 区块链拓展实体经济应用场景.docx
- 施工项目管理经理忠诚度影响因素初步研究.doc
- 《算法与数据结构》第5章-图与网151.ppt
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈



安全验证
文档复制为VIP权益,开通VIP直接复制
