当前位置: 代码迷 >> 综合 >> Azkaban Api
  详细解决方案

Azkaban Api

热度:22   发布时间:2024-02-06 14:02:15.0

  项目原因   

       最近项目在做数据推送的时候,设计方式是采用分布式任务的方式推送到指定数据库中,所以采用azkaban,采用azkaban的原因有以下几点:

  • 查看日志

  日志查看方便。在之前做了一个系统,数据的引接和指标的计算,出现一个问题是,所有逻辑都在一个jar工程里面,遇到问题,需要查看日志的时候,所有日志都混合在一起,不是按照任务来区分,需要使用vim根据任务id去寻找需要的日志,比较麻烦。而使用azkaban,日志是根据任务分割开的,方便查看,并且,可以看到当前任务所执行时间。

查看日志方便

  • jar热更新

采用的是job的方式,不用每次修改调试都发一次版本,采用热更新。修改程序过后,重新打包job上传就可以执行,非常方便。

可以查看编写的job,地址是:https://github.com/poemp/azkaban-data-push-job,欢迎交流。

上传job

  • 编写的job不局限于Java

由于azkaban的job type多样性,所有编写job的语言不局限于Java,可以是Scala,Go, Python,Shell编写。

  • 和业务系统分离

不和业务系统在一起,方便部署管理。任务和业务系统在一起,特别麻烦的是定位问题,需要一大堆依赖,处理各种情况。

  • 调试方便

任务可以在azkaban里面方便的调试,而不需要真正的调用一次,只需要把使用的参数准备好,就可以执行一次job,非常方便

 

工程项目说明

azkaban工程项目地址:https://github.com/poemp/springboot-azkaban, 可以方便的继承在自己的工程项目里面,直接调用就可以。工程项目中使用到的是Http请求,组装参数,发送给azkaban做处理就好了,没有其他的技术点。

api说明

核心的代码说明

登录

主要是去拿通信的token

/*** 登录*/public void login() {LinkedMultiValueMap<String, String> params = new LinkedMultiValueMap<>();params.add("action", "login");params.add("username", config.getUsername());params.add("password", config.getPassword());HttpEntity<LinkedMultiValueMap<String, String>> httpEntity = new HttpEntity<>(params, getAzkabanHeaders());String respResult = restTemplate.postForObject(config.getUrl(), httpEntity, String.class);log.info("Result: " + respResult);try {JsonNode respRoot = objectMapper.readTree(respResult);if (respRoot.hasNonNull("status") && "success".equals(respRoot.get("status").asText())) {SESSION_ID = respRoot.get("session.id").asText();log.info("Azkaban login success:{}", respRoot);} else {log.warn("Azkaban login failure:{}", respRoot);}} catch (IOException e) {log.error(String.format("Azkaban login failure: %s !", e.getMessage()), e);throw new AzkabanException(e.getMessage());}}

创建项目

  /*** 创建项目** @param projectName 项目名称* @param description 项目描述*/public void createProject(String projectName, String description) {LinkedMultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();parameters.add("session.id", SESSION_ID);parameters.add("action", "create");parameters.add("name", projectName);parameters.add("description", description);HttpEntity<LinkedMultiValueMap<String, String>> httpEntity = new HttpEntity<>(parameters, getAzkabanHeaders());String respResult = restTemplate.postForObject(config.getUrl() + "/manager", httpEntity, String.class);log.info("Result: " + respResult);try {JsonNode respRoot = objectMapper.readTree(respResult);if (respRoot.hasNonNull("status") && "success".equals(respRoot.get("status").asText())) {log.info("Azcaban create a Project: {}", projectName);} else {String errorMessage = respRoot.hasNonNull("message") ? respRoot.get("message").asText() : "No message.";log.error("Azcaban create Project %s failure: %s", projectName, errorMessage);throw new AzkabanException(errorMessage);}} catch (IOException e) {log.error(String.format("Azcaban create Project %s failure: %s", projectName, e.getMessage()), e);throw new AzkabanException(e.getMessage());}}

启动flow

/*** 执行** @param projectName 项目名称* @param flowId      flow* @param optionalParams 参数* @return*/public String executeFLow(String projectName, String flowId, Map<String, Object> optionalParams) {HttpHeaders httpHeaders = new HttpHeaders();httpHeaders.add(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8");Map<String, Object> map = new HashMap<>();if (optionalParams != null) {map.putAll(optionalParams);}map.put("session.id", SESSION_ID);map.put("ajax", "getRunning");map.put("project", projectName);map.put("flow", flowId);//flowOverride[type]=appleString paramStr = map.keySet().stream().map(key -> "flowOverride["+ key + "]=" + map.get(key)).collect(Collectors.joining("&"));ResponseEntity<String> exchange = restTemplate.exchange(config.getUrl() + "/executor?" + paramStr, HttpMethod.GET,new HttpEntity<String>(httpHeaders), String.class);log.info("Azkban execute a Flow:{}", exchange);return exchange.toString();}

查询状态

  /*** 执行信息** @param execId 执行ID* @return 结果*/public String executionInfo(String execId) {LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<>();linkedMultiValueMap.add("session.id", SESSION_ID);linkedMultiValueMap.add("ajax", "fetchexecflow");linkedMultiValueMap.add("execid", execId);String res = restTemplate.postForObject(config.getUrl() + "/executor", linkedMultiValueMap, String.class);log.info("azkaban execution info:{}", res);return res;}

其中调用执行情况的代码片段是:

String s = azkabanAdapter.executionInfo(dataPushTaskHistory.getExecId());JSONObject result = JSONObject.parseObject(s);if ("SUCCEEDED".equals(result.getString("status"))) {dataPushTaskHistory.setStatus(3);} else if ("FAILED".equals(result.getString("status"))) {dataPushTaskHistory.setStatus(2);}

 

这样就完成了调用。