Skip to content

第三章第7小节管道实现进程通信部分 #90

@Bestporter

Description

@Bestporter
Contributor

函数create_items、multiply_items部分无注释,看了好久才明白。
第一个费解的地方pipe_1 = multiprocessing.Pipe(True),不知道这个Pipe具体干了什么的,查看源码后发现就是弄了两个队列,然后建立连接,至于至于send,就是使用的Queue的put函数,recv就是使用的Queue的get函数,用两个队列实现双工。

# 源代码位于multiprocessing/dummy/connection.py
def Pipe(duplex=True):
    a, b = Queue(), Queue()
    return Connection(a, b), Connection(b, a)

class Connection(object):

    def __init__(self, _in, _out):
        self._out = _out
        self._in = _in
        self.send = self.send_bytes = _out.put
        self.recv = self.recv_bytes = _in.get

    def poll(self, timeout=0.0):
        if self._in.qsize() > 0:
            return True
        if timeout <= 0.0:
            return False
        with self._in.not_empty:
            self._in.not_empty.wait(timeout)
        return self._in.qsize() > 0

    def close(self):
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.close()

自己写了一下文中的实现,p1_a与p1_b是一组能够通信的,p2_a与p2_b是一组互相能通信的

import multiprocessing

def create_items(pipe):
    p1_a, p1_b = pipe
    for item in range(10):
        p1_a.send(item)
    p1_a.close()

def multiply_items(pipe_1, pipe_2):
    p1_a, p1_b = pipe_1
    p1_a.close()
    p2_a, p2_b = pipe_2
    try:
        while True:
            item = p1_b.recv()
            p2_a.send(item * item)
    except EOFError:
        p2_a.close()

if __name__== '__main__':
    # 第一个进程管道发出数字
    pipe_1 = multiprocessing.Pipe(True)
    process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
    process_pipe_1.start()
    # 第二个进程管道接收数字并计算
    pipe_2 = multiprocessing.Pipe(True)
    process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
    process_pipe_2.start()
    pipe_1[0].close()
    pipe_2[0].close()
    try:
        while True:
            print(pipe_2[1].recv())
    except EOFError:
        print("End")

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

      Development

      No branches or pull requests

        Participants

        @laixintao@Bestporter

        Issue actions

          第三章第7小节管道实现进程通信部分 · Issue #90 · laixintao/python-parallel-programming-cookbook-cn