当前位置: 代码迷 >> python >> 这种文件锁定方法可以接受吗?
  详细解决方案

这种文件锁定方法可以接受吗?

热度:110   发布时间:2023-06-13 13:56:06.0

我们有 10 个 Linux 机器,每周必须运行 100 个不同的任务。 当我们在家时,这些计算机主要在晚上处理这些任务。 我的一位同事正在开展一个项目,通过使用 Python 自动启动任务来优化运行时间。 他的程序将读取任务列表,抓取一个打开的任务,在文件中将该任务标记为进行中,然后在任务完成后在文件中将该任务标记为完成。 任务文件将在我们的网络挂载上。

我们意识到不建议让一个程序的多个实例访问同一个文件,但我们实际上看不到任何其他选项。 在他寻找防止两台计算机同时写入文件的方法时,我想出了自己的方法,这似乎比我们在网上找到的方法更容易实现。 我的方法是检查文件是否存在,如果不存在则等待几秒钟,如果存在则暂时移动文件。 我写了一个脚本来测试这个方法:

#!/usr/bin/env python

import time, os, shutil
from shutil import move
from os import path


fh = "testfile"
fhtemp = "testfiletemp"


while os.path.exists(fh) == False:
    time.sleep(3)

move(fh, fhtemp)
f = open(fhtemp, 'w')
line = raw_input("type something: ")
print "writing to file"
f.write(line)
raw_input("hit enter to close file.")
f.close()
move(fhtemp, fh)

在我们的测试中,此方法有效,但我想知道我们是否可能会遇到一些使用此方法未发现的问题。 我意识到两台计算机同时运行exists()可能会导致灾难。 两台计算机不太可能同时到达那个点,因为任务在 20 分钟到 8 小时之间。

您基本上已经开发了二进制信号量(或互斥量)的文件系统版本。 这是一个经过充分研究的用于锁定的结构,因此只要您获得正确的实现细节,它应该可以工作。 诀窍是让“测试和设置”操作,或者在你的情况下“检查存在和移动”,成为真正的原子。 为此,我会使用这样的东西:

lock_acquired = False
while not lock_acquired:
    try:
        move(fh, fhtemp)
    except:
        sleep(3)
    else:
        lock_acquired = True
# do your writing
move(fhtemp, fh)
lock_acquired = False

您拥有的程序大部分时间都可以工作,但如前所述,如果另一个进程在检查文件是否存在和调用move之间移动文件,您可能会遇到问题。 我想你可以解决这个问题,但我个人建议坚持使用经过良好测试的互斥锁算法。 (我已经从 Andrew Tanenbaum 的现代操作系统中翻译/移植了上述代码示例,但我可能在转换中引入了错误 - 只是公平警告)

顺便说一下,Linux 上open函数的手册页提供了这个文件锁定解决方案:

使用锁文件执行原子文件锁定的解决方案是在同一文件系统上创建一个唯一的文件(例如,合并主机名和 pid),使用 link(2) 链接到锁文件。 如果 link() 返回 0,则锁定成功。 否则,在唯一文件上使用 stat(2) 来检查它的链接数是否增加到 2,在这种情况下锁定也成功。

要在 Python 中实现它,您可以执行以下操作:

# each instance of the process should have a different filename here
process_lockfile = '/path/to/hostname.pid.lock'
# all processes should have the same filename here
global_lockfile = '/path/to/lockfile'
# create the file if necessary (only once, at the beginning of each process)
with open(process_lockfile, 'w') as f:
    f.write('\n') # or maybe write the hostname and pid

# now, each time you have to lock the file:
lock_acquired = False
while not lock_acquired:
    try:
        link(process_lockfile, global_lockfile)
    except:
        lock_acquired = (stat(process_lockfile).st_nlinks == 2)
    else:
        lock_acquired = True
# do your writing
unlink(global_lockfile)
lock_acquired = False

在我看来,如果您更改数据结构,您正在付出太多努力来完成一些可能很简单的事情。 现在您有一个包含任务列表的文件。

将任务队列改为一个目录如何,其中每个待处理的任务都是一个文件? 然后这个过程就像从目录“Pending”中选择一个任务一样简单,移动到目录(比如)“Running”,完成后,将任务文件移动到目录“Completed”。 由于文件移动是原子操作,因此不会有竞争条件(如果移动失败,意味着另一个工人只是先抢走了它,所以接下一个任务)。

此外,检查进度就像在其中一个目录上发出ls一样简单:-)

文件移动/重命名在大多数操作系统上通常是一个原子操作,因此它可能是一个可行的解决方案。

但是,您需要在moveopen调用中添加异常检查,以防其他进程在您的存在检查和move之间移动文件(或者move未能完成)。

编辑

总结将起作用的正确流程:

  1. 问题从 A move到 A。[myID]
  2. 尝试open A.[myID]
  3. 如果#1 或#2 失败,我们没有得到锁; 稍等片刻,然后回到#1。 否则,我们得到了锁,继续。
  4. 进行修改。
  5. 问题从 A.[myID] move到 A。(应该永远不会失败。)这会释放锁。

[myID]一个不错的选择是进程的 PID(如果在多个系统上运行,可能还包括主机)。

如果您不跟踪您的move调用以查看它们是否成功,您将永远不会知道您是否会成为时间窗口的受害者。 请记住,如果任何事情可能出错,它都会在最糟糕的时候出错。

与其使用文件的内容作为标志,不如使用文件名本身? 对于每个任务,将文件“task_waiting_to_run”重命名为“task_running”到“task_complete”。 如果从“task_waiting_to_run”重命名为“task_running”失败,这意味着另一个盒子首先到达那里。

编辑:识别重命名文件的过程也是常见的做法。 这样,如果该进程在将其恢复为原始名称之前终止,则可以跟踪文件的所有权并确定是否进行干预。

我已经插入(几乎没有测试过) ossocket调用来添加这个功能。 使用风险自负。


如果两个进程竞争重命名文件,那么让它们首先检查文件的存在并不能防止竞争条件; 它只会在它发生时延迟时间。

如果文件不存在, 的文档(遗憾的是)没有明确说明抛出 IOError ,但这似乎是一个合理的期望——我发现它确实发生在实践中:

import shutil
import os
import socket

oldname = "foobar.txt"
newname = (oldname + "." + socket.gethostbyaddr(socket.gethostname())[0]
           + "." + str(os.getpid()))
i_win = True
try:
    shutil.move(oldname, newname)
except IOError, e:
    print "File does not exist"
    i_win = False
except Exception, e:
    print e
    i_win = False

if i_win:
    print "I got it!"

这意味着只有一个进程可以认为它已成功重命名文件。

依赖网络文件系统进行锁定是一个困扰系统多年的问题(并且仍然经常无法按照您的预期工作)

为什么不使用设计为明确的多用户和事务性的东西,比如数据库系统? (我个人喜欢 Postgres...)

这可能有点矫枉过正,但对于这样的事情,工作原理通常很容易理解。 它还可以更轻松地扩展以在以后添加新功能。

这是一个带有超时的示例,作为上下文管理器实现,因此您可以像这样使用它:

with NetworkFileLock(r"\\machine\path\lockfile", 60):

...

@contextmanager
def NetworkFileLock(sharedFilePath, timeoutSeconds):

    # Try to acquire the lock here, by moving the file to a unique path for this process/thread
    uniqueFilePath = "{}-{}-{}-{}".format(sharedFilePath, socket.gethostname(), os.getpid(), threading.get_ident())

    startTime = time.time()
    while True:
        try:
            shutil.move(sharedFilePath, uniqueFilePath)
            # Check temp file now exists
            with open(uniqueFilePath, "r"):
                pass
            break
        except:
            if (time.time() - startTime) > timeoutSeconds:
                raise TimeoutError("Timed out after {} seconds waiting for network lock on file {}".format(time.time() - startTime, networkFilePath))
            time.sleep(3)

    try:
        # Yield to the body of the "with" statement
        yield
    except:
        # Move the file back to release the lock
        shutil.move(uniqueFilePath, sharedFilePath)
        raise
    else:
        # Move the file back to release the lock
        shutil.move(uniqueFilePath, sharedFilePath)

我更喜欢使用 ,这是一个几乎不需要任何额外代码的跨平台 Python 库。 以下是如何使用它的示例:

from filelock import FileLock

line = raw_input("type something: ")
lockfile = "testfile.txt"
lock = FileLock(lockfile + ".lock")
with lock:
    file = open(path, "w")
    file.write(line)
    file.close()

with lock:块中的任何代码都是线程安全的,这意味着它将在另一个进程访问该文件之前完成。

  相关解决方案