-
Notifications
You must be signed in to change notification settings - Fork 94
Open
Labels
Description
函数create_items、multiply_items部分无注释,看了好久才明白。
第一个费解的地方pipe_1 = multiprocessing.Pipe(True)
,不知道这个Pipe具体干了什么的,查看源码后发现就是弄了两个队列,然后建立连接,至于至于send,就是使用的Queue的put函数,recv就是使用的Queue的get函数,用两个队列实现双工。
# 源代码位于multiprocessing/dummy/connection.py
def Pipe(duplex=True):
a, b = Queue(), Queue()
return Connection(a, b), Connection(b, a)
class Connection(object):
def __init__(self, _in, _out):
self._out = _out
self._in = _in
self.send = self.send_bytes = _out.put
self.recv = self.recv_bytes = _in.get
def poll(self, timeout=0.0):
if self._in.qsize() > 0:
return True
if timeout <= 0.0:
return False
with self._in.not_empty:
self._in.not_empty.wait(timeout)
return self._in.qsize() > 0
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.close()
自己写了一下文中的实现,p1_a与p1_b是一组能够通信的,p2_a与p2_b是一组互相能通信的
import multiprocessing
def create_items(pipe):
p1_a, p1_b = pipe
for item in range(10):
p1_a.send(item)
p1_a.close()
def multiply_items(pipe_1, pipe_2):
p1_a, p1_b = pipe_1
p1_a.close()
p2_a, p2_b = pipe_2
try:
while True:
item = p1_b.recv()
p2_a.send(item * item)
except EOFError:
p2_a.close()
if __name__== '__main__':
# 第一个进程管道发出数字
pipe_1 = multiprocessing.Pipe(True)
process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
process_pipe_1.start()
# 第二个进程管道接收数字并计算
pipe_2 = multiprocessing.Pipe(True)
process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
process_pipe_2.start()
pipe_1[0].close()
pipe_2[0].close()
try:
while True:
print(pipe_2[1].recv())
except EOFError:
print("End")
laixintao
Metadata
Metadata
Assignees
Labels
Projects
Milestone
Relationships
Development
Select code repository
Activity