进程间通信
在一个标准的操作系统中(例如在树莓派上使用的linux系统),提供如下几种进程间通信方式:
(1)信号。发送的信息量极少。就一个字。
(2)管道(匿名管道)。可以发送一串信息了,但只能用于父子进程间通信。
(3)命名管道。功能与管道一致,但可用于任何进程间通信。
(4)消息队列。用于任何进程间通信,发送的消息由“自定义消息类型+消息数据”;优势:可以从消息队列中取出指定“自定义类型”的消息。
(5)共享内存。用于任何进程间通信,多个进程都可以读取该内存区域;比消息队列的优势就是:快;缺点:需要在访问共享内存是通过锁的机制来保护,防止多个进程同时写内存,导致该内存区域数据异常。例如,进程1写该内存写了一半,内核调度进程2写该内存,进程2写完整段内存后,内核调度进程1继续写未完成的另一半,此时共享内存中的数据就是一半进程1写的,一半进程2写的,这不就错乱了。对于这段内存,我们称之为“临界资源”,下一章讲解临界资源保护的问题。
(6)套接字(socket)。socket优点:就在于可以用于本计算机内任何进程通信,也可用于同在一个网络内的任意计算机上的进程间通信,可移植性好;缺点:发送的数据要至少要走一遍网络协议栈,所以会慢点。
之前做过的一个项目就是就使用了套接字作为进程间通信手段,
进程1:nodejs服务进程,用于接收远端浏览器发送的请求,并发送个请求处理进程;
进程2:请求处理进程,通过套接字接收到请求后,控制本地的继电器进行动作。
这样做的优势在于:nodejs服务器开发人员与请求处理进程开发人员分开进行,在各自的PC机上进行调试和自测试和两个程序的联调;联调完毕后,将套接字绑定的IP更改为127网段的即可;是不是极大地提高的开发效率。
Multiprocessing模块提供的进程间通信机制
multiprocessing支持进程之间的两种通信通道:队列和管道。
(1)mutliprocess的queues模块基本是在python的queue模块进行封装,可以配置成先入先出(FIFO)、后进先出(LIFO)、有优先级的队列(Priority Queue);同时,mutliprocessing的queues是线程和进程安全的(不用加锁保护了)。
(2)mutliprocess的Pipes模块,一个进程在管道一端发送数据、另一个进程在管道另一个端收数据,更能和队列类似,但没有后进先出、优先级等功能,同时最麻烦的是它不是线程和进程安全的,两个进程同时写或者同时读,需要加锁保护。(这里的锁就是下一章讲到的“进程同步机制”)
后续的演示,使用mutliprocess的queues模块来进行。
#更多关于multiprocessing.queues的介绍运行下面代码
help("multiprocessing.queues")
这么好用,那就试试呗
任务要求:
1.1s采集一次工业现场温度(这里我们用NXEZ扩展版的ds18b20代替)。
2.将温度实时显示工业现场(这里我们用NXEZ扩展版的数码管)。
3.将温度实时传递给远端控制器(这我们就不实现了,假设下)。
4.将温度在本地进行实时存储(这我们就不实现了,假设下)。
针对上述任务,通常做法创建1个进程,1s采集一次温度数据、本地存储和远程发送。但这里有个问题,本体存储和远程发送需要的时间是不确定的,特别是本地存储,将数据提交给操作系统过后,操作系统不会立即写会硬盘、要攒够一个扇区才启动真正的写(这叫写回方式——操作系统先缓存到内存中,然后,达到写硬盘的条件后,再启动硬盘写;相对的是写通方式——每次应用层提交的写操作,都直接写硬盘。写通的缺点就是特别慢)。在真正写硬盘时,应用层的写操作被阻塞,等待着写完成;所以这里的耗时是不确定的。
改进方式:启动4个进程,
(1)进程1完成1s的数据采集,并将数据发往相应的队列中;
(2)进程2完成将从队列中取出的数据在工业现场显示(本地数码管显示);
(3)进程3完成将从队列中取出的数据发送到远端(这里就不模拟了);
(4)进程4完成从队列中取出的数据发送进行本地存储(这里就不模拟了)。
这样改进是不是解决的执行时间不确定的问题,同时体会到操作系统的多任务让程序设计如此简洁、美丽吧。
from multiprocessing import Process, Queue import time import os from sakshat import SAKSHAT from sakspins import SAKSPins as PINS #Declare the SAKS Board SAKS = SAKSHAT() #ds18b20采集进程 def get_ds18b20(task_name, mult_queue1, mult_queue2, mult_queue3): print(task_name + "任务启动") try: while True: #从 ds18b20 读取温度(摄氏度为单位) temp = SAKS.ds18b20.temperature #返回值为 -128.0 表示读取失败 if temp == -128.0 : #本次采集不进行处理 continue #将采集到的温度发送至本地显示进程 mult_queue1.put(temp) #将采集到的温度发送至远程发送进程 mult_queue2.put(temp) #将采集到的温度发送至本地存储进程 mult_queue3.put(temp) except KeyboardInterrupt: print(task_name + "任务被终止") #数码管显示温度进程 def digital_display(task_name, mult_queue): print(task_name + "任务启动") try: while True: #在本地进行温度值显示 SAKS.digital_display.show(("%5.1f" % mult_queue.get()).replace(' ','#')) except KeyboardInterrupt: print(task_name + "任务被终止") #发送给远端进程 def send_remote(task_name, mult_queue): print(task_name + "任务启动") try: while True: print ("1.将温度值" + ("%5.1f" % mult_queue.get()).replace(' ','#') + "发送至远端") except KeyboardInterrupt: print(task_name + "任务被终止") #消息本地存储进程 def tmp_store(task_name, mult_queue): print(task_name + "任务启动") try: while True: print ("2.将温度值" + ("%5.1f" % mult_queue.get()).replace(' ','#') + "进程本地存储") except KeyboardInterrupt: print(task_name + "任务被终止") if __name__ == "__main__": try: #定义本地显示、远程发送和本地存储的队列 disp_q = Queue() remote_q = Queue() store_q = Queue() #创建温度采集、本地显示温度、远程发送温度、本地温度存储进程 get_ds18b20_prog = Process(target=get_ds18b20, args=("温度采集进程", disp_q, remote_q, store_q)) dig_disp_prog = Process(target=digital_display, args=("本地温度显示进程", disp_q)) send_remote_prog = Process(target=send_remote, args=("远程发送进程", remote_q)) tmp_store_prog = Process(target=tmp_store, args=("温度本地存储进程", store_q)) # 启动任务 get_ds18b20_prog.start() dig_disp_prog.start() send_remote_prog.start() tmp_store_prog.start() #等待启动的进程执行结束 get_ds18b20_prog.join() dig_disp_prog.join() send_remote_prog.join() tmp_store_prog.join() except KeyboardInterrupt: print("任务被终止了")
课程 bilibili 视频地址:https://www.bilibili.com/video/av71878718/?p=14
课程 gitee 地址:https://gitee.com/shirf_taste_raspi/shirf_serial_share