py并发_生产者消费者笔记

生产者与消费者

在多线程并发编程中有一个经典的案例程序–生产者和消费者问题,操作流程为:生产者进行指定数据的创建,每当生产者线程将数据创造完成后,消费者线程可以直接获取生产的数据进行处理。

测试

暴露要考虑的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# coding :UTF-8
import threading,time
class Message: # 数据的描述类型
def __init__(self):
self.__title = None
self.__content = None
def set_info(self,title,content):
self.__title = title # 设置属性内容
time.sleep(1) # 操作的生产延迟
self.__content = content # 设置属性内容
print("[%s]title = %s、content = %s" % (threading.current_thread().name,self.__title,self.__content))
def __str__(self): # 获取数据将由消费者负责
time.sleep(0.8) # 消费者的延迟时间短
return "【%s】title = %s、content = %s" % (threading.current_thread().name,self.__title,self.__content)
def producer_handle(message): # 生产者处理函数
for num in range(50): # 生产50组数据
if num % 2 == 0: # 交替生产
message.set_info("佳佳","下次见面时给我微笑吧")
else:
message.set_info("你再坚持一下","他很快就痊愈了")
def consumer_handle(message): # 消费者处理函数
for num in range(50):
print(message) # 获取50次的数据
def main(): # 主函数
message = Message() # 公共保存的数据对象
producer_thread = threading.Thread(target=producer_handle,
name="生产者线程",
args=(message,))
consumer_thread = threading.Thread(target=consumer_handle,
name="消费者进程",
args=(message,))
producer_thread.start() # 启动线程
consumer_thread.start() # 启动线程
if __name__ == "__main__":
main()

【消费者进程】title = 佳佳、content = None
[生产者线程]title = 佳佳、content = 下次见面时给我微笑吧
【消费者进程】title = 你再坚持一下、content = 下次见面时给我微笑吧
[生产者线程]title = 你再坚持一下、content = 他很快就痊愈了
【消费者进程】title = 佳佳、content = 他很快就痊愈了
[生产者线程]title = 佳佳、content = 下次见面时给我微笑吧
【消费者进程】title = 你再坚持一下、content = 下次见面时给我微笑吧
【消费者进程】title = 你再坚持一下、content = 下次见面时给我微笑吧
[生产者线程]title = 你再坚持一下、content = 他很快就痊愈了
【消费者进程】title = 佳佳、content = 他很快就痊愈了
[生产者线程]title = 佳佳、content = 下次见面时给我微笑吧
【消费者进程】title = 你再坚持一下、content = 下次见面时给我微笑吧
[生产者线程]title = 你再坚持一下、content = 他很快就痊愈了
【消费者进程】title = 佳佳、content = 他很快就痊愈了

……

首先如果只看前半部分的确是生产者和消费者交替运行着,但是如果再继续观察里面的数据就会发现有如下的问题:

  • 很多数据设置的不完整
  • 数据会产生错误操作(上图出现了数据的错位)
  • 数据会进行重复的处理(一个数据会被消费多次)

Condition同步处理

在生产者和消费者之间进行数据生产和消费的过程之中,如果没有引入任何的同步处理机制,就会发现存在有数据的重复消费以及数据生产不完整的问题,所以为了解决这样的处理,最好加入等待与唤醒机制。

  • 如果现在生产者发现数据没有被消费者消费,则生产者需要等待,而后当消费者把数据取走之后再进行生产;
  • 如果现在消费者发现没有数据被生产处理,则消费者也必须要进行等待,等到生产者生产了数据之后再进行消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# coding :UTF-8
import threading,time
class Message: # 数据的描述类型
def __init__(self,condition): # 构造方法
self.__title = None
self.__content = None
self.__condition = condition # 获取同步锁
# flag = True 表示可以生产,但是不能消费
# flag = False 表示可以消费,但是不能生产
self.__flag = True
def set_info(self,title,content):
self.__condition.acquire() # 获取同步锁
if self.__flag == False: # 不能够继续生产了,必须进行消费处理
self.__condition.wait() # 当前的线程进入到阻塞状态
# 如果现在可以生产则一定会执行如下的代码操作,进行数据的设置,同时修改flag标志(此时flag=True)
self.__title = title # 设置属性内容
time.sleep(1) # 操作的生产延迟
self.__content = content # 设置属性内容
print("[%s]title = %s、content = %s" % (threading.current_thread().name,self.__title,self.__content))
self.__flag = False
self.__condition.notify() # 唤醒其它等待的线程
self.__condition.release() # 释放锁
def __str__(self): # 获取数据将由消费者负责
self.__condition.acquire() # 获取锁
if self.__flag == True: # 不是消费状态
self.__condition.wait() # 等待生产者生产数据
# 如果执行了如下的代码,则意味着不进行等待(flag = False)
try:
time.sleep(0.8) # 消费者的延迟时间短
return "【%s】title = %s、content = %s" % (
threading.current_thread().name,self.__title,self.__content)
finally:
self.__flag = True # 可以继续生产,无法消费了
self.__condition.notify() #唤醒其它等待的线程,只有两个线程,一个是唤醒状态,另一个一定是阻塞
self.__condition.release() # 释放锁
def producer_handle(message): # 生产者处理函数
for num in range(10): # 生产50组数据
if num % 2 == 0: # 交替生产
message.set_info("佳佳","下次见面时给我微笑吧")
else:
message.set_info("你再坚持一下","他很快就痊愈了")
def consumer_handle(message): # 消费者处理函数
for num in range(10):
print(message) # 获取50次的数据
def main(): # 主函数
condition = threading.Condition()
message = Message(condition) # 公共保存的数据对象
producer_thread = threading.Thread(target=producer_handle,
name="生产者线程",
args=(message,))
consumer_thread = threading.Thread(target=consumer_handle,
name="消费者进程",
args=(message,))
producer_thread.start() # 启动线程
consumer_thread.start() # 启动线程
if __name__ == "__main__":
main()

[生产者线程]title = 佳佳、content = 下次见面时给我微笑吧
【消费者进程】title = 佳佳、content = 下次见面时给我微笑吧
[生产者线程]title = 你再坚持一下、content = 他很快就痊愈了
【消费者进程】title = 你再坚持一下、content = 他很快就痊愈了
[生产者线程]title = 佳佳、content = 下次见面时给我微笑吧
【消费者进程】title = 佳佳、content = 下次见面时给我微笑吧
[生产者线程]title = 你再坚持一下、content = 他很快就痊愈了
【消费者进程】title = 你再坚持一下、content = 他很快就痊愈了
[生产者线程]title = 佳佳、content = 下次见面时给我微笑吧
【消费者进程】title = 佳佳、content = 下次见面时给我微笑吧
[生产者线程]title = 你再坚持一下、content = 他很快就痊愈了
【消费者进程】title = 你再坚持一下、content = 他很快就痊愈了
[生产者线程]title = 佳佳、content = 下次见面时给我微笑吧
【消费者进程】title = 佳佳、content = 下次见面时给我微笑吧
[生产者线程]title = 你再坚持一下、content = 他很快就痊愈了
【消费者进程】title = 你再坚持一下、content = 他很快就痊愈了
[生产者线程]title = 佳佳、content = 下次见面时给我微笑吧
【消费者进程】title = 佳佳、content = 下次见面时给我微笑吧
[生产者线程]title = 你再坚持一下、content = 他很快就痊愈了
【消费者进程】title = 你再坚持一下、content = 他很快就痊愈了

进程已结束,退出代码0

此时的程序就成功地解决了生产者和消费者之间数据同步的操作处理,利用Condition实现等待与唤醒操作

注:不过这种方法的不适合有数据容器来缓存数据而且消费者生产者的效率相差较大且生产消费事件发生不确定无规律的情况,需要设计一种更加高明的方案,这里仅供示例,学习Condition。

笔记来源


py并发_生产者消费者笔记
https://blog.wangxk.cc/2020/02/03/py并发-生产者消费者笔记/
作者
Mike
发布于
2020年2月3日
许可协议