如何允许一个类的变量被多个线程并发修改。

我有一个类(MyClass),它包含一个需要运行的动作队列(self.msg_queue),我有多个输入源,可以向队列中添加任务。

现在我有三个函数,我想同时运行。

  • MyClass.get_input_from_user()
    • 在tkinter中创建一个窗口,让用户填写信息,当用户按下提交键时,它会将该信息推送到队列中。
  • MyClass.get_input_from_server()
    • 检查服务器是否有消息,读取消息,然后把它放到队列中。 本方法使用MyClass父类的函数。
  • MyClass.execute_next_item_on_the_queue()
    • 从队列中弹出一条消息,然后对其进行操作。 这取决于消息是什么,但每个消息都对应MyClass或其父类中的一些方法,这些方法会根据一个大的决策树来运行。

流程描述:在类加入网络后,我让它产生三个线程(上面的每个函数一个)。 每个线程函数从队列中添加项目,语法为 “self.msg_queue.put(message)”,从队列中删除项目,语法为 “self.msg_queue.get_nowait()”。

问题描述:我遇到的问题是,似乎每个线程都在修改自己的队列对象(它们并没有共享队列,msg_queue,它们,函数,都是这个类的成员)。

我对多处理不够熟悉,不知道重要的错误信息是什么;但是,它在说明它不能拾取一个弱ref对象(它没有给出哪个对象是弱ref对象的说明),并且在queue.put()调用内有一行 “self._sem.acquire(block, timeout)产生一个'[WinError 5] Access is denied'”错误。是否可以认为这个故障在队列的引用没有正确复制过来?

我使用的是Python 3.7.2和多处理包的Process和Queue。

[我已经看到了多个关于让线程在类之间穿梭信息的QA–创建一个主线束,生成一个队列,然后把这个队列作为参数传递给每个线程。 如果函数不需要使用MyClass中的其他函数,我可以看到通过让这些函数接收队列并使用本地变量而不是类变量来调整这个策略。]

[我很有信心,这个错误不是把我的队列传给tkinter对象的结果,因为我的单元测试是关于我的GUI如何修改其调用者的队列,工作正常]

下面是一个队列出错的最小可重现的例子。

from multiprocessing import Queue
from multiprocessing import Process
import queue
import time

class MyTest:
    def __init__(self):
        self.my_q = Queue()
        self.counter = 0

    def input_function_A(self):
        while True:
            self.my_q.put(self.counter)
            self.counter = self.counter + 1
            time.sleep(0.2)

    def input_function_B(self):
        while True:
            self.counter = 0
            self.my_q.put(self.counter)
            time.sleep(1)

    def output_function(self):
        while True:
            try:
                var = self.my_q.get_nowait()
            except queue.Empty:
                var = -1
            except:
                break
            print(var)
            time.sleep(1)

    def run(self):
        process_A = Process(target=self.input_function_A)
        process_B = Process(target=self.input_function_B)
        process_C = Process(target=self.output_function)

        process_A.start()
        process_B.start()
        process_C.start()

        # without this it generates the WinError: 
        # with this it still behaves as if the two input functions do not modify the queue
        process_C.join() 

if __name__ == '__main__':
    test = MyTest()
    test.run()

解决方案:

事实上–这些不是 “线程”–这些是 “进程”–而如果你使用的是多线程,而不是多进程,那么你的 self.my_q 实例将是 同一对象,放置在计算机的同一内存空间,多处理做了一个 叉子 的进程,而原进程(”运行 “调用中正在执行的进程)中的任何数据在使用时都会被复制–所以,每个子进程都会看到自己的 “队列 “实例,与其他进程无关。

让各种进程共享一个多进程.Queue对象的正确方法是把它作为参数传递给目标方法。更简单的方法是这样重组你的代码,使其工作。

from multiprocessing import Queue
from multiprocessing import Process
import queue
import time

class MyTest:
    def __init__(self):
        self.my_q = Queue()
        self.counter = 0

    def input_function_A(self, queue):
        while True:
            queue.put(self.counter)
            self.counter = self.counter + 1
            time.sleep(0.2)

    def input_function_B(self, queue):
        while True:
            self.counter = 0
            queue.put(self.counter)
            time.sleep(1)

    def output_function(self, queue):
        while True:
            try:
                var = queue.get_nowait()
            except queue.Empty:
                var = -1
            except:
                break
            print(var)
            time.sleep(1)

    def run(self):
        process_A = Process(target=self.input_function_A, args=(queue,))
        process_B = Process(target=self.input_function_B, args=(queue,))
        process_C = Process(target=self.output_function, args=(queue,))

        process_A.start()
        process_B.start()
        process_C.start()

        # without this it generates the WinError: 
        # with this it still behaves as if the two input functions do not modify the queue
        process_C.join() 

if __name__ == '__main__':
    test = MyTest()
    test.run()

正如你所看到的,由于你的类实际上并没有通过实例的属性来共享任何数据,这种 “类 “的设计对于你的应用程序来说并没有什么意义–但对于在同一个代码块中对不同的工作者进行分组。

可以有一个神奇的多进程类,它有一些内部方法来实际启动worker-methods并共享Queue实例–所以如果你在一个项目中有很多这样的类,就会少很多模板。

类似的东西。

from multiprocessing import Queue
from multiprocessing import Process
import time


class MPWorkerBase:
    def __init__(self, *args, **kw):
        self.queue = None
        self.is_parent_process = False
        self.is_child_process = False
        self.processes = []
        # ensure this can be used as a colaborative mixin
        super().__init__(*args, **kw)

    def run(self):
        if self.is_parent_process or self.is_child_process:
            # workers already initialized
            return

        self.queue = Queue()
        processes = []

        cls = self.__class__
        for name in dir(cls):
            method = getattr(cls, name)
            if callable(method) and getattr(method, "_MP_worker", False):
                process = Process(target=self._start_worker, args=(self.queue, name))
                self.processes.append(process)
                process.start()
        # Setting these attributes here ensure the child processes have the initial values for them.
        self.is_parent_process = True
        self.processes = processes

    def _start_worker(self, queue, method_name):

        # this method is called in a new spawned process - attribute
        # changes here no longer reflect attributes on the
        # object in the initial process

        # overwrite queue in this process with the queue object sent over the wire:
        self.queue = queue
        self.is_child_process = True
        # call the worker method
        getattr(self, method_name)()

    def __del__(self):
        for process in self.processes:
            process.join()


def worker(func):
    """decorator to mark a method as a worker that should
    run in its own subprocess
    """

    func._MP_worker = True
    return func


class MyTest(MPWorkerBase):
    def __init__(self):
        super().__init__()
        self.counter = 0

    @worker
    def input_function_A(self):
        while True:
            self.queue.put(self.counter)
            self.counter = self.counter + 1
            time.sleep(0.2)

    @worker
    def input_function_B(self):
        while True:
            self.counter = 0
            self.queue.put(self.counter)
            time.sleep(1)

    @worker
    def output_function(self):
        while True:
            try:
                var = self.queue.get_nowait()
            except queue.Empty:
                var = -1
            except:
                break
            print(var)
            time.sleep(1)


if __name__ == '__main__':
    test = MyTest()
    test.run()

给TA打赏
共{{data.count}}人
人已打赏
未分类

如何引起另一个组件的更新?

2022-9-7 23:06:18

未分类

如何在线性模型创建的两张图中独立标注两个x轴?

2022-9-7 23:06:20

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索