一. 前言
在同一个HA HDFS集群中, 将会同时运行两个Namenode实例, 其中一个为Active Namenode,用于实时处理所有客户端请求; 另一个为Standby Namenode, StandbyNamenode的命名空间与ActiveNamenode是完全保持一致的。 所以当ActiveNamenode出现故障时, Standby Namenode可以立即切换成Active状态。
二.checkpoint操作
为了让Standby Namenode的命名空间与Active Namenode保持同步, 它们都将和JournalNodes守护进程通信。
当Active Namenode执行任何修改命名空间的操作时, 它至少需要将产生的editlog文件持久化到N-(N-1)/2个JournalNode节点上才能保证命名空间修改的安全性。 换句话说, 如果在HA策略下启动了N个JournalNode进程, 那么整个JournalNode集群最多允许(N-1)/2个进程死掉, 这样才能保证editlog成功完整地被写入。 比如集群中有3个JournalNode时, 最多允许1个JournalNode挂掉; 集群中有5个JournalNode时, 最多允许2个JournalNode挂掉。 Standby Namenode则负责观察editlog文件的变化, 它能够从JournalNodes中读取editlog文件, 然后合并更新到它自己的命名空间中。 一旦ActiveNamenode出现故障, Standby Namenode就会保证从JournalNodes中读出全部的editlog文件, 然后切换成Active状态。
Standby Namenode读取全部的editlog文件可确保在发生故障转移之前, 和Active Namenode拥有完全同步的命名空间状态。
Standby Namenode始终保持着一个最新版本的命名空间, 它会不断地将读入的editlog文件与当前的命名空间并 .
StandbyNamenode只需判断当前是否满足触发检查点操作的两个条件, 如果满足触发条件, 则将Standby Namenode的命名空间写入一个新的fsimage文件中, 并通过HTTP将这个fsimage文件传回Active Namenode。
■ Standby Namenode检查是否满足触发检查点操作的两个条件。
■ Standby Namenode将当前的命名空间保存到fsimage.ckpt_txid文件中, 这里的txid是当前最新的editlog文件中记录的事务id。 之后Standby Namenode写入fsimage文件的MD5校验和, 最后重命名这个fsimage.ckpt_txid文件为fsimage_txid。 当执行这个操作时, 其他的Standby Namenode操作将会被阻塞, 例如Active Namenode发生错误时, 需要进行主备切换或者访问Standby Namenode的Web接口等操作。注意, Active Namenode的操作并不会被影响, 例如listing、 reading、 writing文件等。
■ Standby Namenode向Active Namenode的ImageServlet发送HTTP GET请求/getimage?putimage=1。 这个请求的URL中包含了新的fsimage文件的事务id,以及Standby Namenode用于下载的端口和IP地址。
■ Active Namenode会根据Standby Namenode提供的信息向Standby Namenode的ImageServlet发起HTTP GET请求下载fsimage文件。 Namenode首先将下载文件命名为fsimage.ckpt_*, 然后创建MD5校验和, 最后将fsimage.ckpt_*重命名为fsimage_*。
三. FSNamesystem#startStandbyServices
Namenode在启动的时候会加载FSNamesystem, 在FSNamesystem中会通过startStandbyServices启动一个StandbyCheckpointer类.用于处理checkpoint操作.
/*** Start services required in standby or observer state* * @throws IOException*/void startStandbyServices(final Configuration conf, boolean isObserver)throws IOException {LOG.info("Starting services required for " +(isObserver ? "observer" : "standby") + " state");if (!getFSImage().editLog.isOpenForRead()) {// During startup, we're already open for read.getFSImage().editLog.initSharedJournalsForRead();}blockManager.setPostponeBlocksFromFuture(true);// Disable quota checks while in standby.dir.disableQuotaChecks();editLogTailer = new EditLogTailer(this, conf);editLogTailer.start();if (!isObserver && standbyShouldCheckpoint) {standbyCheckpointer = new StandbyCheckpointer(conf, this);standbyCheckpointer.start();}}
四. CheckpointerThread
StandbyCheckpointer在调用start方法的时候,会启动CheckpointerThread线程,而执行run方法的时候, 会调用doWork方法.
private void doWork() {//间隔时长 1小时final long checkPeriod = 1000 * checkpointConf.getCheckPeriod();System.out.println("StandbyCheckpointer#doWork=>checkPeriod : "+ checkPeriod);// Reset checkpoint time so that we don't always checkpoint// on startup.lastCheckpointTime = monotonicNow();lastUploadTime = monotonicNow();while (shouldRun) {boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();if (!needRollbackCheckpoint) {try {Thread.sleep(checkPeriod);} catch (InterruptedException ie) {}if (!shouldRun) {break;}}try {// We may have lost our ticket since last checkpoint, log in again, just in caseif (UserGroupInformation.isSecurityEnabled()) {UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();}final long now = monotonicNow();//获得最后一次往JournalNode写入的txid和最近一次做检查点的txid的差值final long uncheckpointed = countUncheckpointedTxns();//计算当前时间和上一次检查点操作时间的间隔final long secsSinceLast = (now - lastCheckpointTime) / 1000;// if we need a rollback checkpoint, always attempt to checkpointboolean needCheckpoint = needRollbackCheckpoint;if (needCheckpoint) {LOG.info("Triggering a rollback fsimage for rolling upgrade.");} else if (uncheckpointed >= checkpointConf.getTxnCount()) {///第一种符合合并的情况:// 当最后一次往JournalNode写入的txid和最近一次做检查点的txid的差值// 大于或者等于dfs.namenode.checkpoint.txns配置的数量(默认为100万)时做一次合并LOG.info("Triggering checkpoint because there have been {} txns " +"since the last checkpoint, " +"which exceeds the configured threshold {}",uncheckpointed, checkpointConf.getTxnCount());needCheckpoint = true;} else if (secsSinceLast >= checkpointConf.getPeriod()) {LOG.info("Triggering checkpoint because it has been {} seconds " +"since the last checkpoint, which exceeds the configured " +"interval {}", secsSinceLast, checkpointConf.getPeriod());//第二种符合合并的情况:// 当时间间隔大于或者等于dfs.namenode.checkpoint.period [一小时]// 配置的时间时做合并needCheckpoint = true;}//满足检查点执行条件, 则调用doCheckpoint()方法执行检查点操作if (needCheckpoint) {synchronized (cancelLock) {if (now < preventCheckpointsUntil) {LOG.info("But skipping this checkpoint since we are about to failover!");canceledCount++;continue;}assert canceler == null;canceler = new Canceler();}// on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a// rollback request, are the checkpointer, are outside the quiet period.final long secsSinceLastUpload = (now - lastUploadTime) / 1000;boolean sendRequest = isPrimaryCheckPointer || secsSinceLastUpload >= checkpointConf.getQuietPeriod();doCheckpoint(sendRequest);// reset needRollbackCheckpoint to false only when we finish a ckpt// for rollback imageif (needRollbackCheckpoint&& namesystem.getFSImage().hasRollbackFSImage()) {namesystem.setCreatedRollbackImages(true);namesystem.setNeedRollbackFsImage(false);}lastCheckpointTime = now;LOG.info("Checkpoint finished successfully.");}} catch (SaveNamespaceCancelledException ce) {LOG.info("Checkpoint was cancelled: {}", ce.getMessage());canceledCount++;} catch (InterruptedException ie) {LOG.info("Interrupted during checkpointing", ie);// Probably requested shutdown.continue;} catch (Throwable t) {LOG.error("Exception in doCheckpoint", t);} finally {synchronized (cancelLock) {canceler = null;}}}}
六. doCheckpoint(sendRequest);
整个检查点执行操作的逻辑都是在doCheckpoint()方法中实现的。doCheckpoint()方法首先获取当前保存的fsimage的prevCheckpointTxId, 然后获取最近更新的editlog的thisCheckpointTxId, 只有新的thisCheckpointTxId大于prevCheckpointTxId, 也
就是当前命名空间有更新, 但是并没有保存到新的fsimage文件中时, 才有必要进行一次检查点操作。 判断完成后, doCheckpoint()会调用saveNamespace()方法将最新的命名空间保存到fsimage文件中。 之后构造一个线程, 将新产生的fsimage文件通过HTTP方式上传到
AvtiveNamenode中
/*** 整个检查点执行操作的逻辑都是在doCheckpoint()方法中实现的。** doCheckpoint()方法首先获取当前保存的fsimage的prevCheckpointTxId,* 然后获取最近更新的editlog的thisCheckpointTxId,* 只有新的thisCheckpointTxId大于prevCheckpointTxId,* 也就是当前命名空间有更新, 但是并没有保存到新的fsimage文件中时,* 才有必要进行一次 检查点操作。** 判断完成后,* doCheckpoint()会调用saveNamespace()方法将最新的命名空间保存到fsimage文件中。** 之后构造一个线程, 将新产生的fsimage文件通过HTTP方式上传到AvtiveNamenode中。** @param sendCheckpoint* @throws InterruptedException* @throws IOException*/private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException {assert canceler != null;final long txid;final NameNodeFile imageType;// Acquire cpLock to make sure no one is modifying the name system.// It does not need the full namesystem write lock, since the only thing// that modifies namesystem on standby node is edit log replaying.namesystem.cpLockInterruptibly();try {assert namesystem.getEditLog().isOpenForRead() :"Standby Checkpointer should only attempt a checkpoint when " +"NN is in standby mode, but the edit logs are in an unexpected state";//获取当前Standby Namenode上保存的最新的fsimage对象FSImage img = namesystem.getFSImage();//获取fsimage中保存的txid, 也就是上一次进行检查点操作时保存的txidlong prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();//获取当前命名空间的最新的txid, 也就是收到的editlog的最新的txidlong thisCheckpointTxId = img.getCorrectLastAppliedOrWrittenTxId();assert thisCheckpointTxId >= prevCheckpointTxId;//如果相等则没有必要执行检查点操作, 当前fsimage已经是最新的了if (thisCheckpointTxId == prevCheckpointTxId) {LOG.info("A checkpoint was triggered but the Standby Node has not " +"received any transactions since the last checkpoint at txid {}. " +"Skipping...", thisCheckpointTxId);return;}if (namesystem.isRollingUpgrade()&& !namesystem.getFSImage().hasRollbackFSImage()) {//如果当前Namenode正在执行升级操作, 则创建fsimage_rollback文件// if we will do rolling upgrade but have not created the rollback image// yet, name this checkpoint as fsimage_rollbackimageType = NameNodeFile.IMAGE_ROLLBACK;} else {//在正常情况下创建fsimage文件imageType = NameNodeFile.IMAGE;}//调用saveNamespace()将当前命名空间保存到新的文件中img.saveNamespace(namesystem, imageType, canceler);txid = img.getStorage().getMostRecentCheckpointTxId();assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +thisCheckpointTxId + " but instead saved at txid=" + txid;// Save the legacy OIV image, if the output dir is defined.String outputDir = checkpointConf.getLegacyOivImageDir();if (outputDir != null && !outputDir.isEmpty()) {try {img.saveLegacyOIVImage(namesystem, outputDir, canceler);} catch (IOException ioe) {LOG.warn("Exception encountered while saving legacy OIV image; "+ "continuing with other checkpointing steps", ioe);}}} finally {namesystem.cpUnlock();}//early exit if we shouldn't actually send the checkpoint to the ANNif(!sendCheckpoint){return;}//构造一个线程, 通过HTTP将fsimage上传到Active Namenode中// Upload the saved checkpoint back to the active// Do this in a separate thread to avoid blocking transition to active, but don't allow more// than the expected number of tasks to run or queue up// See HDFS-4816ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),uploadThreadFactory);// for right now, just match the upload to the nn address by convention. There is no need to// directly tie them together by adding a pair class.List<Future<TransferFsImage.TransferResult>> uploads =new ArrayList<Future<TransferFsImage.TransferResult>>();for (final URL activeNNAddress : activeNNAddresses) {Future<TransferFsImage.TransferResult> upload =executor.submit(new Callable<TransferFsImage.TransferResult>() {@Overridepublic TransferFsImage.TransferResult call()throws IOException, InterruptedException {CheckpointFaultInjector.getInstance().duringUploadInProgess();return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem.getFSImage().getStorage(), imageType, txid, canceler);}});uploads.add(upload);}InterruptedException ie = null;IOException ioe= null;int i = 0;boolean success = false;for (; i < uploads.size(); i++) {Future<TransferFsImage.TransferResult> upload = uploads.get(i);try {// TODO should there be some smarts here about retries nodes that are not the active NN?if (upload.get() == TransferFsImage.TransferResult.SUCCESS) {success = true;//avoid getting the rest of the results - we don't care since we had a successful uploadbreak;}} catch (ExecutionException e) {ioe = new IOException("Exception during image upload", e);break;} catch (InterruptedException e) {ie = e;break;}}if (ie == null && ioe == null) {//Update only when response from remote about success orlastUploadTime = monotonicNow();// we are primary if we successfully updated the ANNthis.isPrimaryCheckPointer = success;}// cleaner than copying code for multiple catch statements and better than catching all// exceptions, so we just handle the ones we expect.if (ie != null || ioe != null) {// cancel the rest of the tasks, and close the poolfor (; i < uploads.size(); i++) {Future<TransferFsImage.TransferResult> upload = uploads.get(i);// The background thread may be blocked waiting in the throttler, so// interrupt it.upload.cancel(true);}// shutdown so we interrupt anything running and don't start anything newexecutor.shutdownNow();// this is a good bit longer than the thread timeout, just to make sure all the threads// that are not doing any work also stopexecutor.awaitTermination(500, TimeUnit.MILLISECONDS);// re-throw the exception we got, since one of these two must be non-nullif (ie != null) {throw ie;} else if (ioe != null) {throw ioe;}}}