当前位置: 代码迷 >> 综合 >> tf.train.Coordinator协调器、tf.train.start_queue_runners启动器
  详细解决方案

tf.train.Coordinator协调器、tf.train.start_queue_runners启动器

热度:24   发布时间:2023-11-23 09:43:27.0

TensorFlow的Session对象是支持多线程的,可以在同一个会话(Session)中创建多个线程,并行执行。在Session中的所有线程都必须能被同步终止,异常必须能被正确捕获并报告,会话终止的时候, 队列必须能被正确地关闭。

TensorFlow提供了两个类来实现对Session中多线程的管理:tf.Coordinator和 tf.QueueRunner,这两个类往往一起使用。

Coordinator类用来管理在Session中的多个线程,可以用来同时停止多个工作线程并且向那个在等待所有工作线程终止的程序报告异常,该线程捕获到这个异常之后就会终止所有线程。使用 tf.train.Coordinator()来创建一个线程管理器(协调器)对象。

QueueRunner类用来启动tensor的入队线程,可以用来启动多个工作线程同时将多个tensor(训练数据)推送入文件名称队列中,具体执行函数是 tf.train.start_queue_runners , 只有调用 tf.train.start_queue_runners 之后,才会真正把tensor推入内存序列中,供计算单元调用,否则会由于内存序列为空,数据流图会处于一直等待状态。
实例:

# -*- coding:utf-8 -*-
import tensorflow as tf
import numpy as np# 样本个数
sample_num=5
# 设置迭代次数
epoch_num = 2
# 设置一个批次中包含样本个数
batch_size = 3
# 计算每一轮epoch中含有的batch个数
batch_total = int(sample_num/batch_size)+1# 生成4个数据和标签
def generate_data(sample_num=sample_num):labels = np.asarray(range(0, sample_num))images = np.random.random([sample_num, 224, 224, 3])print('image size {},label size :{}'.format(images.shape, labels.shape))return images,labelsdef get_batch_data(batch_size=batch_size):images, label = generate_data()# 数据类型转换为tf.float32images = tf.cast(images, tf.float32)label = tf.cast(label, tf.int32)#从tensor列表中按顺序或随机抽取一个tensor准备放入文件名称队列input_queue = tf.train.slice_input_producer([images, label], num_epochs=epoch_num, shuffle=False)#从文件名称队列中读取文件准备放入文件队列image_batch, label_batch = tf.train.batch(input_queue, batch_size=batch_size, num_threads=2, capacity=64, allow_smaller_final_batch=False)return image_batch, label_batchimage_batch, label_batch = get_batch_data(batch_size=batch_size)with tf.Session() as sess:# 先执行初始化工作sess.run(tf.global_variables_initializer())sess.run(tf.local_variables_initializer())# 开启一个协调器coord = tf.train.Coordinator()# 使用start_queue_runners 启动队列填充threads = tf.train.start_queue_runners(sess, coord)try:while not coord.should_stop():print '************'# 获取每一个batch中batch_size个样本和标签image_batch_v, label_batch_v = sess.run([image_batch, label_batch])print(image_batch_v.shape, label_batch_v)except tf.errors.OutOfRangeError:  #如果读取到文件队列末尾会抛出此异常print("done! now lets kill all the threads……")finally:# 协调器coord发出所有线程终止信号coord.request_stop()print('all threads are asked to stop!')coord.join(threads) #把开启的线程加入主线程,等待threads结束print('all threads are stopped!')

输出:

************
((3, 224, 224, 3), array([0, 1, 2], dtype=int32))
************
((3, 224, 224, 3), array([3, 4, 0], dtype=int32))
************
((3, 224, 224, 3), array([1, 2, 3], dtype=int32))
************
done! now lets kill all the threads……
all threads are asked to stop!
all threads are stopped!

以上程序在 tf.train.slice_input_producer 函数中设置了 num_epochs 的数量, 所以在文件队列末尾有结束标志,读到这个结束标志的时候抛出 OutofRangeError 异常,就可以结束各个线程了。 (num_epochs如何设置,如何体现结束标志)

如果不设置 num_epochs 的数量,则文件队列是无限循环的,没有结束标志,程序会一直执行下去。

原文链接:https://blog.csdn.net/dcrmg/article/details/79780331

  相关解决方案