动手学树莓派第5章:不能再两耳不闻窗外事了!你们需要交流

进程间通信

在一个标准的操作系统中(例如在树莓派上使用的linux系统),提供如下几种进程间通信方式:
(1)信号。发送的信息量极少。就一个字。
(2)管道(匿名管道)。可以发送一串信息了,但只能用于父子进程间通信。
(3)命名管道。功能与管道一致,但可用于任何进程间通信。
(4)消息队列。用于任何进程间通信,发送的消息由“自定义消息类型+消息数据”;优势:可以从消息队列中取出指定“自定义类型”的消息。
(5)共享内存。用于任何进程间通信,多个进程都可以读取该内存区域;比消息队列的优势就是:快;缺点:需要在访问共享内存是通过锁的机制来保护,防止多个进程同时写内存,导致该内存区域数据异常。例如,进程1写该内存写了一半,内核调度进程2写该内存,进程2写完整段内存后,内核调度进程1继续写未完成的另一半,此时共享内存中的数据就是一半进程1写的,一半进程2写的,这不就错乱了。对于这段内存,我们称之为“临界资源”,下一章讲解临界资源保护的问题。
(6)套接字(socket)。socket优点:就在于可以用于本计算机内任何进程通信,也可用于同在一个网络内的任意计算机上的进程间通信,可移植性好;缺点:发送的数据要至少要走一遍网络协议栈,所以会慢点。

之前做过的一个项目就是就使用了套接字作为进程间通信手段,

进程1:nodejs服务进程,用于接收远端浏览器发送的请求,并发送个请求处理进程;

进程2:请求处理进程,通过套接字接收到请求后,控制本地的继电器进行动作。

这样做的优势在于:nodejs服务器开发人员与请求处理进程开发人员分开进行,在各自的PC机上进行调试和自测试和两个程序的联调;联调完毕后,将套接字绑定的IP更改为127网段的即可;是不是极大地提高的开发效率。

Multiprocessing模块提供的进程间通信机制

multiprocessing支持进程之间的两种通信通道:队列和管道。

(1)mutliprocess的queues模块基本是在python的queue模块进行封装,可以配置成先入先出(FIFO)、后进先出(LIFO)、有优先级的队列(Priority Queue);同时,mutliprocessing的queues是线程和进程安全的(不用加锁保护了)。
(2)mutliprocess的Pipes模块,一个进程在管道一端发送数据、另一个进程在管道另一个端收数据,更能和队列类似,但没有后进先出、优先级等功能,同时最麻烦的是它不是线程和进程安全的,两个进程同时写或者同时读,需要加锁保护。(这里的锁就是下一章讲到的“进程同步机制”)

后续的演示,使用mutliprocess的queues模块来进行。

#更多关于multiprocessing.queues的介绍运行下面代码

help("multiprocessing.queues")

这么好用,那就试试呗

任务要求:
1.1s采集一次工业现场温度(这里我们用NXEZ扩展版的ds18b20代替)。
2.将温度实时显示工业现场(这里我们用NXEZ扩展版的数码管)。
3.将温度实时传递给远端控制器(这我们就不实现了,假设下)。
4.将温度在本地进行实时存储(这我们就不实现了,假设下)。

针对上述任务,通常做法创建1个进程,1s采集一次温度数据、本地存储和远程发送。但这里有个问题,本体存储和远程发送需要的时间是不确定的,特别是本地存储,将数据提交给操作系统过后,操作系统不会立即写会硬盘、要攒够一个扇区才启动真正的写(这叫写回方式——操作系统先缓存到内存中,然后,达到写硬盘的条件后,再启动硬盘写;相对的是写通方式——每次应用层提交的写操作,都直接写硬盘。写通的缺点就是特别慢)。在真正写硬盘时,应用层的写操作被阻塞,等待着写完成;所以这里的耗时是不确定的。

改进方式:启动4个进程,
(1)进程1完成1s的数据采集,并将数据发往相应的队列中;
(2)进程2完成将从队列中取出的数据在工业现场显示(本地数码管显示);
(3)进程3完成将从队列中取出的数据发送到远端(这里就不模拟了);
(4)进程4完成从队列中取出的数据发送进行本地存储(这里就不模拟了)。

这样改进是不是解决的执行时间不确定的问题,同时体会到操作系统的多任务让程序设计如此简洁、美丽吧。

from multiprocessing import Process, Queue
import time
import os
from sakshat import SAKSHAT
from sakspins import SAKSPins as PINS

#Declare the SAKS Board
SAKS = SAKSHAT()

#ds18b20采集进程
def get_ds18b20(task_name, mult_queue1, mult_queue2, mult_queue3):
    print(task_name + "任务启动")
    try:
        while True:
            #从 ds18b20 读取温度(摄氏度为单位)
            temp = SAKS.ds18b20.temperature
            
            #返回值为 -128.0 表示读取失败
            if temp == -128.0 :
                #本次采集不进行处理
                continue
                
            #将采集到的温度发送至本地显示进程
            mult_queue1.put(temp)      
            
            #将采集到的温度发送至远程发送进程
            mult_queue2.put(temp)   
            
            #将采集到的温度发送至本地存储进程
            mult_queue3.put(temp)  

    except KeyboardInterrupt:
        print(task_name + "任务被终止")
        
#数码管显示温度进程
def digital_display(task_name, mult_queue):
    print(task_name + "任务启动")
    try:
        while True:
            #在本地进行温度值显示
            SAKS.digital_display.show(("%5.1f" % mult_queue.get()).replace(' ','#'))
    except KeyboardInterrupt:
        print(task_name + "任务被终止")
        
#发送给远端进程
def send_remote(task_name, mult_queue):
    print(task_name + "任务启动")
    try:
        while True:
            print ("1.将温度值" + ("%5.1f" % mult_queue.get()).replace(' ','#') + "发送至远端")
    except KeyboardInterrupt:
        print(task_name + "任务被终止")
        
#消息本地存储进程
def tmp_store(task_name, mult_queue):
    print(task_name + "任务启动")
    try:
        while True:
            print ("2.将温度值" + ("%5.1f" % mult_queue.get()).replace(' ','#') + "进程本地存储")
    except KeyboardInterrupt:
        print(task_name + "任务被终止")
        
if __name__ == "__main__":
    try:
        #定义本地显示、远程发送和本地存储的队列
        disp_q = Queue()
        remote_q = Queue()
        store_q = Queue()
        
        #创建温度采集、本地显示温度、远程发送温度、本地温度存储进程
        get_ds18b20_prog = Process(target=get_ds18b20, args=("温度采集进程", disp_q, remote_q, store_q))
        dig_disp_prog    = Process(target=digital_display, args=("本地温度显示进程", disp_q))
        send_remote_prog = Process(target=send_remote, args=("远程发送进程", remote_q))
        tmp_store_prog   = Process(target=tmp_store, args=("温度本地存储进程", store_q))
        
        # 启动任务
        get_ds18b20_prog.start()
        dig_disp_prog.start()
        send_remote_prog.start()
        tmp_store_prog.start()

        #等待启动的进程执行结束
        get_ds18b20_prog.join()
        dig_disp_prog.join()
        send_remote_prog.join()
        tmp_store_prog.join()

    except KeyboardInterrupt:
        print("任务被终止了")

课程 bilibili 视频地址:https://www.bilibili.com/video/av71878718/?p=14

返回课程目录

课程 gitee 地址:https://gitee.com/shirf_taste_raspi/shirf_serial_share