当前位置: 代码迷 >> Eclipse >> Eclipse 开发Hadoop2.7.1可能会遇到的有关问题
  详细解决方案

Eclipse 开发Hadoop2.7.1可能会遇到的有关问题

热度:403   发布时间:2016-04-22 23:38:31.0
Eclipse 开发Hadoop2.7.1可能会遇到的问题

安装Eclipse插件之类的问题我就不说了,网络上一大堆,基本上可以搞定,现在就说一下经常会遇到的问题。

?

1.?org.apache.hadoop.security.AccessControlException: org.apache.hadoop.security?.AccessControlException: Permission denied: user=Administrator, 在eclipse中测试代码时遇到这样的错误,可以在main方法中添加这句就可以:

System.setProperty("HADOOP_USER_NAME", "master");

?

2.?java.lang.ClassNotFoundException: Class com.simple.hadoop.analysis.Temperature$TemperatureMapper not found,在eclipse中调试代码时遇到这种错误,以下代码中EJob.createTempJar("target");与((JobConf) job.getConfiguration()).setJar(jarFile.toString());是关键,在eclipse运行需要先把程序打成jar报,然后指定你的jar包目录也就是我的target目录,然后再添加job.getConfiguration()).setJar(jarFile.toString());这句代码即可。

/**		 * 环境变量配置		 */		File jarFile = EJob.createTempJar("target");		ClassLoader classLoader = EJob.getClassLoader();		Thread.currentThread().setContextClassLoader(classLoader);		        //输入路径        String dst = "hdfs://master:9000/tmp/input.txt";        //输出路径,必须是不存在的,空文件加也不行。        String dstOut = "hdfs://master:9000/tmp/out/"+System.currentTimeMillis()+"/";        Configuration hadoopConfig = new Configuration();                // 手动设置hadoop用户// 		System.setProperty("HADOOP_USER_NAME", "master"); 		hadoopConfig.set("fs.default.name", "hdfs://master:9000"); 		hadoopConfig.set("hadoop.job.user", "hadoop"); 		hadoopConfig.set("mapreduce.framework.name", "yarn"); 		hadoopConfig.set("mapreduce.jobtracker.address", "master:9001"); 		hadoopConfig.set("yarn.resourcemanager.hostname", "master"); 		hadoopConfig.set("yarn.resourcemanager.admin.address", "master:8033"); 		hadoopConfig.set("yarn.resourcemanager.address", "master:8032"); 		hadoopConfig.set("yarn.resourcemanager.resource-tracker.address", 				"master:8031"); 		hadoopConfig.set("yarn.resourcemanager.scheduler.address", "master:8030");        hadoopConfig.set("fs.hdfs.impl",             org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()        );        hadoopConfig.set("fs.file.impl",            org.apache.hadoop.fs.LocalFileSystem.class.getName()        );                Job job = Job.getInstance(hadoopConfig);                  //如果需要打成jar运行,需要下面这句//        job.setJarByClass(Temperature.class);                // 在eclipse中运行自己的例子,需要先打成jar包,然后EJob.createTempJar("target");指定jar报的地址,再加上下面这句就OK,如果是在服务器上运行,就注释这句//        ((JobConf) job.getConfiguration()).setJar(jarFile.toString());           //job执行作业时输入和输出文件的路径        FileInputFormat.addInputPath(job, new Path(dst));        FileOutputFormat.setOutputPath(job, new Path(dstOut));          //指定自定义的Mapper和Reducer作为两个阶段的任务处理类        job.setMapperClass(TemperatureMapper.class);        job.setReducerClass(TemperatureReducer.class);                  //设置最后输出结果的Key和Value的类型        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);                  //执行job,直到完成        job.waitForCompletion(true);        System.out.println("Finished");

?

3.?exitCode=1: /bin/bash: line 0: fg: no job control,遇到这种错误,需要重写hadoop一个类的源码:org.apache.hadoop.mapred.YARNRunner,eclipse在执行的时候YARNRunner负责设置环境变量window的变量值是%JAVA_HOME%这种的,而linux中是$JAVA_HOME,所以需要将YARNRunner重写,这个类在本地测试时使用,在服务器中不需要,我的hadoop版本是2.7.1,重写的源码如下,你可以直接复制到自己的项目中,记得包名类名不要改:

package org.apache.hadoop.mapred;/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/import java.io.IOException;import java.nio.ByteBuffer;import java.util.ArrayList;import java.util.Collection;import java.util.HashMap;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Vector;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.classification.InterfaceAudience.Private;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileContext;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.UnsupportedFileSystemException;import org.apache.hadoop.io.DataOutputBuffer;import org.apache.hadoop.io.Text;import org.apache.hadoop.ipc.ProtocolSignature;import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;import org.apache.hadoop.mapreduce.ClusterMetrics;import org.apache.hadoop.mapreduce.Counters;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.JobID;import org.apache.hadoop.mapreduce.JobStatus;import org.apache.hadoop.mapreduce.MRJobConfig;import org.apache.hadoop.mapreduce.QueueAclsInfo;import org.apache.hadoop.mapreduce.QueueInfo;import org.apache.hadoop.mapreduce.TaskAttemptID;import org.apache.hadoop.mapreduce.TaskCompletionEvent;import org.apache.hadoop.mapreduce.TaskReport;import org.apache.hadoop.mapreduce.TaskTrackerInfo;import org.apache.hadoop.mapreduce.TaskType;import org.apache.hadoop.mapreduce.TypeConverter;import org.apache.hadoop.mapreduce.protocol.ClientProtocol;import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;import org.apache.hadoop.mapreduce.v2.LogParams;import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;import org.apache.hadoop.mapreduce.v2.util.MRApps;import org.apache.hadoop.security.Credentials;import org.apache.hadoop.security.SecurityUtil;import org.apache.hadoop.security.UserGroupInformation;import org.apache.hadoop.security.authorize.AccessControlList;import org.apache.hadoop.security.token.Token;import org.apache.hadoop.yarn.api.ApplicationConstants;import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;import org.apache.hadoop.yarn.api.records.ApplicationAccessType;import org.apache.hadoop.yarn.api.records.ApplicationId;import org.apache.hadoop.yarn.api.records.ApplicationReport;import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;import org.apache.hadoop.yarn.api.records.LocalResource;import org.apache.hadoop.yarn.api.records.LocalResourceType;import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;import org.apache.hadoop.yarn.api.records.ReservationId;import org.apache.hadoop.yarn.api.records.Resource;import org.apache.hadoop.yarn.api.records.URL;import org.apache.hadoop.yarn.api.records.YarnApplicationState;import org.apache.hadoop.yarn.conf.YarnConfiguration;import org.apache.hadoop.yarn.exceptions.YarnException;import org.apache.hadoop.yarn.factories.RecordFactory;import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;import org.apache.hadoop.yarn.security.client.RMDelegationTokenSelector;import org.apache.hadoop.yarn.util.ConverterUtils;import com.google.common.annotations.VisibleForTesting;/** * This class enables the current JobClient (0.22 hadoop) to run on YARN. */@SuppressWarnings("unchecked")public class YARNRunner implements ClientProtocol {  private static final Log LOG = LogFactory.getLog(YARNRunner.class);  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);  private ResourceMgrDelegate resMgrDelegate;  private ClientCache clientCache;  private Configuration conf;  private final FileContext defaultFileContext;    /**   * Yarn runner incapsulates the client interface of   * yarn   * @param conf the configuration object for the client   */  public YARNRunner(Configuration conf) {   this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));  }  /**   * Similar to {@link #YARNRunner(Configuration)} but allowing injecting   * {@link ResourceMgrDelegate}. Enables mocking and testing.   * @param conf the configuration object for the client   * @param resMgrDelegate the resourcemanager client handle.   */  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {   this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));  }  /**   * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}   * but allowing injecting {@link ClientCache}. Enable mocking and testing.   * @param conf the configuration object   * @param resMgrDelegate the resource manager delegate   * @param clientCache the client cache object.   */  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,      ClientCache clientCache) {    this.conf = conf;    try {      this.resMgrDelegate = resMgrDelegate;      this.clientCache = clientCache;      this.defaultFileContext = FileContext.getFileContext(this.conf);    } catch (UnsupportedFileSystemException ufe) {      throw new RuntimeException("Error in instantiating YarnClient", ufe);    }  }    @Private  /**   * Used for testing mostly.   * @param resMgrDelegate the resource manager delegate to set to.   */  public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) {    this.resMgrDelegate = resMgrDelegate;  }    @Override  public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)      throws IOException, InterruptedException {    throw new UnsupportedOperationException("Use Token.renew instead");  }  @Override  public TaskTrackerInfo[] getActiveTrackers() throws IOException,      InterruptedException {    return resMgrDelegate.getActiveTrackers();  }  @Override  public JobStatus[] getAllJobs() throws IOException, InterruptedException {    return resMgrDelegate.getAllJobs();  }  @Override  public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,      InterruptedException {    return resMgrDelegate.getBlacklistedTrackers();  }  @Override  public ClusterMetrics getClusterMetrics() throws IOException,      InterruptedException {    return resMgrDelegate.getClusterMetrics();  }  @VisibleForTesting  void addHistoryToken(Credentials ts) throws IOException, InterruptedException {    /* check if we have a hsproxy, if not, no need */    MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();    if (UserGroupInformation.isSecurityEnabled() && (hsProxy != null)) {      /*       * note that get delegation token was called. Again this is hack for oozie       * to make sure we add history server delegation tokens to the credentials       */      RMDelegationTokenSelector tokenSelector = new RMDelegationTokenSelector();      Text service = resMgrDelegate.getRMDelegationTokenService();      if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) {        Text hsService = SecurityUtil.buildTokenService(hsProxy            .getConnectAddress());        if (ts.getToken(hsService) == null) {          ts.addToken(hsService, getDelegationTokenFromHS(hsProxy));        }      }    }  }    @VisibleForTesting  Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy)      throws IOException, InterruptedException {    GetDelegationTokenRequest request = recordFactory      .newRecordInstance(GetDelegationTokenRequest.class);    request.setRenewer(Master.getMasterPrincipal(conf));    org.apache.hadoop.yarn.api.records.Token mrDelegationToken;    mrDelegationToken = hsProxy.getDelegationToken(request)        .getDelegationToken();    return ConverterUtils.convertFromYarn(mrDelegationToken,        hsProxy.getConnectAddress());  }  @Override  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)      throws IOException, InterruptedException {    // The token is only used for serialization. So the type information    // mismatch should be fine.    return resMgrDelegate.getDelegationToken(renewer);  }  @Override  public String getFilesystemName() throws IOException, InterruptedException {    return resMgrDelegate.getFilesystemName();  }  @Override  public JobID getNewJobID() throws IOException, InterruptedException {    return resMgrDelegate.getNewJobID();  }  @Override  public QueueInfo getQueue(String queueName) throws IOException,      InterruptedException {    return resMgrDelegate.getQueue(queueName);  }  @Override  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,      InterruptedException {    return resMgrDelegate.getQueueAclsForCurrentUser();  }  @Override  public QueueInfo[] getQueues() throws IOException, InterruptedException {    return resMgrDelegate.getQueues();  }  @Override  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {    return resMgrDelegate.getRootQueues();  }  @Override  public QueueInfo[] getChildQueues(String parent) throws IOException,      InterruptedException {    return resMgrDelegate.getChildQueues(parent);  }  @Override  public String getStagingAreaDir() throws IOException, InterruptedException {    return resMgrDelegate.getStagingAreaDir();  }  @Override  public String getSystemDir() throws IOException, InterruptedException {    return resMgrDelegate.getSystemDir();  }  @Override  public long getTaskTrackerExpiryInterval() throws IOException,      InterruptedException {    return resMgrDelegate.getTaskTrackerExpiryInterval();  }  @Override  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)  throws IOException, InterruptedException {        addHistoryToken(ts);        // Construct necessary information to start the MR AM    ApplicationSubmissionContext appContext =      createApplicationSubmissionContext(conf, jobSubmitDir, ts);    // Submit to ResourceManager    try {      ApplicationId applicationId =          resMgrDelegate.submitApplication(appContext);      ApplicationReport appMaster = resMgrDelegate          .getApplicationReport(applicationId);      String diagnostics =          (appMaster == null ?              "application report is null" : appMaster.getDiagnostics());      if (appMaster == null          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {        throw new IOException("Failed to run job : " +            diagnostics);      }      return clientCache.getClient(jobId).getJobStatus(jobId);    } catch (YarnException e) {      throw new IOException(e);    }  }  private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)      throws IOException {    LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);    FileStatus rsrcStat = fs.getFileStatus(p);    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));    rsrc.setSize(rsrcStat.getLen());    rsrc.setTimestamp(rsrcStat.getModificationTime());    rsrc.setType(type);    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);    return rsrc;  }  public ApplicationSubmissionContext createApplicationSubmissionContext(      Configuration jobConf,      String jobSubmitDir, Credentials ts) throws IOException {    ApplicationId applicationId = resMgrDelegate.getApplicationId();    // Setup resource requirements    Resource capability = recordFactory.newRecordInstance(Resource.class);    capability.setMemory(        conf.getInt(            MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB            )        );    capability.setVirtualCores(        conf.getInt(            MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES            )        );    LOG.debug("AppMaster capability = " + capability);    // Setup LocalResources    Map<String, LocalResource> localResources =        new HashMap<String, LocalResource>();    Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);    URL yarnUrlForJobSubmitDir = ConverterUtils        .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()            .resolvePath(                defaultFileContext.makeQualified(new Path(jobSubmitDir))));    LOG.debug("Creating setup context, jobSubmitDir url is "        + yarnUrlForJobSubmitDir);    localResources.put(MRJobConfig.JOB_CONF_FILE,        createApplicationResource(defaultFileContext,            jobConfPath, LocalResourceType.FILE));    if (jobConf.get(MRJobConfig.JAR) != null) {      Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));      LocalResource rc = createApplicationResource(          FileContext.getFileContext(jobJarPath.toUri(), jobConf),          jobJarPath,          LocalResourceType.PATTERN);      String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,           JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();      rc.setPattern(pattern);      localResources.put(MRJobConfig.JOB_JAR, rc);    } else {      // Job jar may be null. For e.g, for pipes, the job jar is the hadoop      // mapreduce jar itself which is already on the classpath.      LOG.info("Job jar is not present. "          + "Not adding any jar to the list of resources.");    }    // TODO gross hack    for (String s : new String[] {        MRJobConfig.JOB_SPLIT,        MRJobConfig.JOB_SPLIT_METAINFO }) {      localResources.put(          MRJobConfig.JOB_SUBMIT_DIR + "/" + s,          createApplicationResource(defaultFileContext,              new Path(jobSubmitDir, s), LocalResourceType.FILE));    }    // Setup security tokens    DataOutputBuffer dob = new DataOutputBuffer();    ts.writeTokenStorageToStream(dob);    ByteBuffer securityTokens  = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());    // Setup the command to run the AM    List<String> vargs = new ArrayList<String>(8);    vargs.add("$JAVA_HOME/bin/java");    MRApps.addLog4jSystemProperties(null, vargs, conf);    // Check for Java Lib Path usage in MAP and REDUCE configs    warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map",         MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV);    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map",         MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);    warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce",         MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV);    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce",         MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);    // Add AM admin command opts before user command opts    // so that it can be overridden by user    String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);    warnForJavaLibPath(mrAppMasterAdminOptions, "app master",         MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);    vargs.add(mrAppMasterAdminOptions);        // Add AM user command opts    String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);    warnForJavaLibPath(mrAppMasterUserOptions, "app master",         MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);    vargs.add(mrAppMasterUserOptions);    if (jobConf.getBoolean(MRJobConfig.MR_AM_PROFILE,        MRJobConfig.DEFAULT_MR_AM_PROFILE)) {      final String profileParams = jobConf.get(MRJobConfig.MR_AM_PROFILE_PARAMS,          MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);      if (profileParams != null) {        vargs.add(String.format(profileParams,            ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR                + TaskLog.LogName.PROFILE));      }    }    vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +        Path.SEPARATOR + ApplicationConstants.STDOUT);    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +        Path.SEPARATOR + ApplicationConstants.STDERR);    Vector<String> vargsFinal = new Vector<String>(8);    // Final command    StringBuilder mergedCommand = new StringBuilder();    for (CharSequence str : vargs) {      mergedCommand.append(str).append(" ");    }    vargsFinal.add(mergedCommand.toString());    LOG.debug("Command to launch container for ApplicationMaster is : "        + mergedCommand);    // Setup the CLASSPATH in environment    // i.e. add { Hadoop jars, job jar, CWD } to classpath.    Map<String, String> environment = new HashMap<String, String>();    MRApps.setClasspath(environment, conf);    // Shell    environment.put(Environment.SHELL.name(),        conf.get(MRJobConfig.MAPRED_ADMIN_USER_SHELL,            MRJobConfig.DEFAULT_SHELL));    // Add the container working directory at the front of LD_LIBRARY_PATH    MRApps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),        MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);    // Setup the environment variables for Admin first    MRApps.setEnvFromInputString(environment,         conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV), conf);    // Setup the environment variables (LD_LIBRARY_PATH, etc)    MRApps.setEnvFromInputString(environment,         conf.get(MRJobConfig.MR_AM_ENV), conf);    // 替换%%关键字    replaceEnvironment(environment);        // Parse distributed cache    MRApps.setupDistributedCache(jobConf, localResources);    Map<ApplicationAccessType, String> acls        = new HashMap<ApplicationAccessType, String>(2);    acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(        MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));    acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(        MRJobConfig.JOB_ACL_MODIFY_JOB,        MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));    // Setup ContainerLaunchContext for AM container    ContainerLaunchContext amContainer =        ContainerLaunchContext.newInstance(localResources, environment,          vargsFinal, null, securityTokens, acls);    Collection<String> tagsFromConf =        jobConf.getTrimmedStringCollection(MRJobConfig.JOB_TAGS);    // Set up the ApplicationSubmissionContext    ApplicationSubmissionContext appContext =        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);    appContext.setApplicationId(applicationId);                // ApplicationId    appContext.setQueue(                                       // Queue name        jobConf.get(JobContext.QUEUE_NAME,        YarnConfiguration.DEFAULT_QUEUE_NAME));    // add reservationID if present    ReservationId reservationID = null;    try {      reservationID =          ReservationId.parseReservationId(jobConf              .get(JobContext.RESERVATION_ID));    } catch (NumberFormatException e) {      // throw exception as reservationid as is invalid      String errMsg =          "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID)              + " specified for the app: " + applicationId;      LOG.warn(errMsg);      throw new IOException(errMsg);    }    if (reservationID != null) {      appContext.setReservationID(reservationID);      LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId          + " to queue:" + appContext.getQueue() + " with reservationId:"          + appContext.getReservationID());    }    appContext.setApplicationName(                             // Job name        jobConf.get(JobContext.JOB_NAME,        YarnConfiguration.DEFAULT_APPLICATION_NAME));    appContext.setCancelTokensWhenComplete(        conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));    appContext.setAMContainerSpec(amContainer);         // AM Container    appContext.setMaxAppAttempts(        conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,            MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));    appContext.setResource(capability);    appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);    if (tagsFromConf != null && !tagsFromConf.isEmpty()) {      appContext.setApplicationTags(new HashSet<String>(tagsFromConf));    }    return appContext;  }    private void replaceEnvironment(Map<String, String> environment) {      String tmpClassPath = environment.get("CLASSPATH");      tmpClassPath=tmpClassPath.replaceAll(";", ":");      tmpClassPath=tmpClassPath.replaceAll("%PWD%", "\\$PWD");      tmpClassPath=tmpClassPath.replaceAll("%HADOOP_MAPRED_HOME%", "\\$HADOOP_MAPRED_HOME");      tmpClassPath=tmpClassPath.replaceAll("%HADOOP_CONF_DIR%", "\\$HADOOP_CONF_DIR");      tmpClassPath=tmpClassPath.replaceAll("%HADOOP_COMMON_HOME%", "\\$HADOOP_COMMON_HOME");      tmpClassPath=tmpClassPath.replaceAll("%HADOOP_HDFS_HOME%", "\\$HADOOP_HDFS_HOME");      tmpClassPath=tmpClassPath.replaceAll("%HADOOP_YARN_HOME%", "\\$HADOOP_YARN_HOME");      tmpClassPath= tmpClassPath.replaceAll("\\\\", "/" );      environment.put("CLASSPATH",tmpClassPath);      environment.put("LD_LIBRARY_PATH","$PWD");}  @Override  public void setJobPriority(JobID arg0, String arg1) throws IOException,      InterruptedException {    resMgrDelegate.setJobPriority(arg0, arg1);  }  @Override  public long getProtocolVersion(String arg0, long arg1) throws IOException {    return resMgrDelegate.getProtocolVersion(arg0, arg1);  }  @Override  public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)      throws IOException, InterruptedException {    throw new UnsupportedOperationException("Use Token.renew instead");  }  @Override  public Counters getJobCounters(JobID arg0) throws IOException,      InterruptedException {    return clientCache.getClient(arg0).getJobCounters(arg0);  }  @Override  public String getJobHistoryDir() throws IOException, InterruptedException {    return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);  }  @Override  public JobStatus getJobStatus(JobID jobID) throws IOException,      InterruptedException {    JobStatus status = clientCache.getClient(jobID).getJobStatus(jobID);    return status;  }  @Override  public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,      int arg2) throws IOException, InterruptedException {    return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);  }  @Override  public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException,      InterruptedException {    return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0);  }  @Override  public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)  throws IOException, InterruptedException {    return clientCache.getClient(jobID)        .getTaskReports(jobID, taskType);  }  private void killUnFinishedApplication(ApplicationId appId)      throws IOException {    ApplicationReport application = null;    try {      application = resMgrDelegate.getApplicationReport(appId);    } catch (YarnException e) {      throw new IOException(e);    }    if (application.getYarnApplicationState() == YarnApplicationState.FINISHED        || application.getYarnApplicationState() == YarnApplicationState.FAILED        || application.getYarnApplicationState() == YarnApplicationState.KILLED) {      return;    }    killApplication(appId);  }  private void killApplication(ApplicationId appId) throws IOException {    try {      resMgrDelegate.killApplication(appId);    } catch (YarnException e) {      throw new IOException(e);    }  }  private boolean isJobInTerminalState(JobStatus status) {    return status.getState() == JobStatus.State.KILLED        || status.getState() == JobStatus.State.FAILED        || status.getState() == JobStatus.State.SUCCEEDED;  }  @Override  public void killJob(JobID arg0) throws IOException, InterruptedException {    /* check if the status is not running, if not send kill to RM */    JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);    ApplicationId appId = TypeConverter.toYarn(arg0).getAppId();    // get status from RM and return    if (status == null) {      killUnFinishedApplication(appId);      return;    }    if (status.getState() != JobStatus.State.RUNNING) {      killApplication(appId);      return;    }    try {      /* send a kill to the AM */      clientCache.getClient(arg0).killJob(arg0);      long currentTimeMillis = System.currentTimeMillis();      long timeKillIssued = currentTimeMillis;      long killTimeOut =          conf.getLong(MRJobConfig.MR_AM_HARD_KILL_TIMEOUT_MS,                       MRJobConfig.DEFAULT_MR_AM_HARD_KILL_TIMEOUT_MS);      while ((currentTimeMillis < timeKillIssued + killTimeOut)          && !isJobInTerminalState(status)) {        try {          Thread.sleep(1000L);        } catch (InterruptedException ie) {          /** interrupted, just break */          break;        }        currentTimeMillis = System.currentTimeMillis();        status = clientCache.getClient(arg0).getJobStatus(arg0);        if (status == null) {          killUnFinishedApplication(appId);          return;        }      }    } catch(IOException io) {      LOG.debug("Error when checking for application status", io);    }    if (status != null && !isJobInTerminalState(status)) {      killApplication(appId);    }  }  @Override  public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,      InterruptedException {    return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1);  }  @Override  public AccessControlList getQueueAdmins(String arg0) throws IOException {    return new AccessControlList("*");  }  @Override  public JobTrackerStatus getJobTrackerStatus() throws IOException,      InterruptedException {    return JobTrackerStatus.RUNNING;  }  @Override  public ProtocolSignature getProtocolSignature(String protocol,      long clientVersion, int clientMethodsHash) throws IOException {    return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,        clientMethodsHash);  }  @Override  public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)      throws IOException {    return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);  }  private static void warnForJavaLibPath(String opts, String component,       String javaConf, String envConf) {    if (opts != null && opts.contains("-Djava.library.path")) {      LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " +               "programs to no longer function if hadoop native libraries " +               "are used. These values should be set as part of the " +               "LD_LIBRARY_PATH in the " + component + " JVM env using " +               envConf + " config settings.");    }  }}

?

4. 如果在运行时抛出了hadoop的classnotfound错误,这种都是缺少引用的hadoop包,只要引用就可以。

  相关解决方案