问题描述
在 Windows 上,我想用 Python 通过网络复制一堆文件。 有时,网络没有响应,复制停止。 我想检查一下,如果发生这种情况,并在发生这种情况时跳过有问题的文件。 通过提出这个相关问题,我发现了函数,它允许使用回调函数,可以中止文件复制。
Python 中的实现如下所示:
import win32file
def Win32_CopyFileEx( ExistingFileName, NewFileName, Canc = False):
win32file.CopyFileEx(
ExistingFileName, # PyUNICODE | File to be copied
NewFileName, # PyUNICODE | Place to which it will be copied
Win32_CopyFileEx_ProgressRoutine, # CopyProgressRoutine | A python function that receives progress updates, can be None
Data = None, # object | An arbitrary object to be passed to the callback function
Cancel = Canc, # boolean | Pass True to cancel a restartable copy that was previously interrupted
CopyFlags = win32file.COPY_FILE_RESTARTABLE, # int | Combination of COPY_FILE_* flags
Transaction = None # PyHANDLE | Handle to a transaction as returned by win32transaction::CreateTransaction
)
从 CopyFileEx 函数的文档中,我可以看到取消正在运行的副本的两种可能性。
pbCancel [in, optional]如果在复制操作期间该标志设置为TRUE ,则取消操作。 否则,复制操作将继续完成。
我想不出如何做到这一点。
我尝试再次使用相同的文件句柄调用相同的函数,但取消标志设置为TRUE
,但这导致错误,因为有问题的文件正在被另一个进程使用。
另一种可能性似乎是回调函数:
lpProgressRoutine [in, optional]每次复制文件的另一部分时调用的 LPPROGRESS_ROUTINE 类型的回调函数的地址。 此参数可以为 NULL。 有关进度回调函数的更多信息,请参阅 CopyProgressRoutine 函数。
的指出,这个回调要么在复制开始时调用,要么在文件的垃圾文件完成复制时调用。
如果返回1
或2
(取消、停止),回调函数可以取消复制过程。
然而,这个回调函数似乎没有被调用,当垃圾的副本停止时。
所以我的问题是:当它停滞时,我如何在每个文件的基础上取消这个副本?
1楼
不允许将Cancel
作为布尔值或整数值以外的任何值传递。
在 API 中,它是一个LPBOOL
指针,它允许调用者在另一个线程中同时设置它的值。
您必须使用 ctypes、Cython 或 C 扩展来获得这种级别的控制。
下面我写了一个使用 ctypes 的例子。
如果由于线程在同步 I/O 上被阻塞而取消复制不起作用,您可以尝试在进度例程中传递的文件句柄上调用 ,或取消线程的所有同步 I/O . 这些 I/O 取消功能是在 Windows Vista 中添加的。 它们在 Windows XP 中不可用,以防您仍然支持它。
import ctypes
from ctypes import wintypes
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
COPY_FILE_FAIL_IF_EXISTS = 0x0001
COPY_FILE_RESTARTABLE = 0x0002
COPY_FILE_OPEN_SOURCE_FOR_WRITE = 0x0004
COPY_FILE_ALLOW_DECRYPTED_DESTINATION = 0x0008
COPY_FILE_COPY_SYMLINK = 0x0800
COPY_FILE_NO_BUFFERING = 0x1000
CALLBACK_CHUNK_FINISHED = 0
CALLBACK_STREAM_SWITCH = 1
PROGRESS_CONTINUE = 0
PROGRESS_CANCEL = 1
PROGRESS_STOP = 2
PROGRESS_QUIET = 3
ERROR_REQUEST_ABORTED = 0x04D3
if not hasattr(wintypes, 'LPBOOL'):
wintypes.LPBOOL = ctypes.POINTER(wintypes.BOOL)
def _check_bool(result, func, args):
if not result:
raise ctypes.WinError(ctypes.get_last_error())
return args
LPPROGRESS_ROUTINE = ctypes.WINFUNCTYPE(
wintypes.DWORD, # _Retval_
wintypes.LARGE_INTEGER, # _In_ TotalFileSize
wintypes.LARGE_INTEGER, # _In_ TotalBytesTransferred
wintypes.LARGE_INTEGER, # _In_ StreamSize
wintypes.LARGE_INTEGER, # _In_ StreamBytesTransferred
wintypes.DWORD, # _In_ dwStreamNumber
wintypes.DWORD, # _In_ dwCallbackReason
wintypes.HANDLE, # _In_ hSourceFile
wintypes.HANDLE, # _In_ hDestinationFile
wintypes.LPVOID) # _In_opt_ lpData
kernel32.CopyFileExW.errcheck = _check_bool
kernel32.CopyFileExW.argtypes = (
wintypes.LPCWSTR, # _In_ lpExistingFileName
wintypes.LPCWSTR, # _In_ lpNewFileName
LPPROGRESS_ROUTINE, # _In_opt_ lpProgressRoutine
wintypes.LPVOID, # _In_opt_ lpData
wintypes.LPBOOL, # _In_opt_ pbCancel
wintypes.DWORD) # _In_ dwCopyFlags
@LPPROGRESS_ROUTINE
def debug_progress(tsize, ttrnsfr, stsize, sttrnsfr, stnum, reason,
hsrc, hdst, data):
print('ttrnsfr: %d, stnum: %d, stsize: %d, sttrnsfr: %d, reason: %d' %
(ttrnsfr, stnum, stsize, sttrnsfr, reason))
return PROGRESS_CONTINUE
def copy_file(src, dst, cancel=None, flags=0,
cbprogress=None, data=None):
if isinstance(cancel, int):
cancel = ctypes.byref(wintypes.BOOL(cancel))
elif cancel is not None:
cancel = ctypes.byref(cancel)
if cbprogress is None:
cbprogress = LPPROGRESS_ROUTINE()
kernel32.CopyFileExW(src, dst, cbprogress, data, cancel, flags)
例子
if __name__ == '__main__':
import os
import tempfile
import threading
src_fd, src = tempfile.mkstemp()
os.write(src_fd, os.urandom(16 * 2 ** 20))
os.close(src_fd)
dst = tempfile.mktemp()
cancel = wintypes.BOOL(False)
t = threading.Timer(0.001, type(cancel).value.__set__, (cancel, True))
t.start()
try:
copy_file(src, dst, cancel, cbprogress=debug_progress)
except OSError as e:
print(e)
assert e.winerror == ERROR_REQUEST_ABORTED
finally:
if os.path.exists(src):
os.remove(src)
if os.path.exists(dst):
os.remove(dst)