- 미세먼지 센서
blog.naver.com/elepartsblog/221347040698
라즈베리파이에서 PMS7003먼지센서 사용하기
이번 포스팅에서는 PLANTOWER 사의 PMS 7003 먼지센서를 라즈베리파이에 연결하여 동작시켜 먼...
blog.naver.com
-PMS7003
"""
* PMS7003 데이터 수신 프로그램
* 수정 : 2018. 11. 19
* 제작 : eleparts 부설연구소
* SW ver. 1.0.2
> 관련자료
파이썬 라이브러리
https://docs.python.org/3/library/struct.html
점프 투 파이썬
https://wikidocs.net/book/1
PMS7003 datasheet
http://eleparts.co.kr/data/_gextends/good-pdf/201803/good-pdf-4208690-1.pdf
"""
import serial
import struct
import time
class PMS7003(object):
# PMS7003 protocol data (HEADER 2byte + 30byte)
PMS_7003_PROTOCOL_SIZE = 32
# PMS7003 data list
HEADER_HIGH = 0 # 0x42
HEADER_LOW = 1 # 0x4d
FRAME_LENGTH = 2 # 2x13+2(data+check bytes)
DUST_PM1_0_CF1 = 3 # PM1.0 concentration unit μ g/m3(CF=1,standard particle)
DUST_PM2_5_CF1 = 4 # PM2.5 concentration unit μ g/m3(CF=1,standard particle)
DUST_PM10_0_CF1 = 5 # PM10 concentration unit μ g/m3(CF=1,standard particle)
DUST_PM1_0_ATM = 6 # PM1.0 concentration unit μ g/m3(under atmospheric environment)
DUST_PM2_5_ATM = 7 # PM2.5 concentration unit μ g/m3(under atmospheric environment)
DUST_PM10_0_ATM = 8 # PM10 concentration unit μ g/m3 (under atmospheric environment)
DUST_AIR_0_3 = 9 # indicates the number of particles with diameter beyond 0.3 um in 0.1 L of air.
DUST_AIR_0_5 = 10 # indicates the number of particles with diameter beyond 0.5 um in 0.1 L of air.
DUST_AIR_1_0 = 11 # indicates the number of particles with diameter beyond 1.0 um in 0.1 L of air.
DUST_AIR_2_5 = 12 # indicates the number of particles with diameter beyond 2.5 um in 0.1 L of air.
DUST_AIR_5_0 = 13 # indicates the number of particles with diameter beyond 5.0 um in 0.1 L of air.
DUST_AIR_10_0 = 14 # indicates the number of particles with diameter beyond 10 um in 0.1 L of air.
RESERVEDF = 15 # Data13 Reserved high 8 bits
RESERVEDB = 16 # Data13 Reserved low 8 bits
CHECKSUM = 17 # Checksum code
# header check
def header_chk(self, buffer):
if (buffer[self.HEADER_HIGH] == 66 and buffer[self.HEADER_LOW] == 77):
return True
else:
return False
# chksum value calculation
def chksum_cal(self, buffer):
buffer = buffer[0:self.PMS_7003_PROTOCOL_SIZE]
# data unpack (Byte -> Tuple (30 x unsigned char <B> + unsigned short <H>))
chksum_data = struct.unpack('!30BH', buffer)
chksum = 0
for i in range(30):
chksum = chksum + chksum_data[i]
return chksum
# checksum check
def chksum_chk(self, buffer):
chk_result = self.chksum_cal(buffer)
chksum_buffer = buffer[30:self.PMS_7003_PROTOCOL_SIZE]
chksum = struct.unpack('!H', chksum_buffer)
if (chk_result == chksum[0]):
return True
else:
return False
# protocol size(small) check
def protocol_size_chk(self, buffer):
if(self.PMS_7003_PROTOCOL_SIZE <= len(buffer)):
return True
else:
return False
# protocol check
def protocol_chk(self, buffer):
if(self.protocol_size_chk(buffer)):
if(self.header_chk(buffer)):
if(self.chksum_chk(buffer)):
return True
else:
print("Chksum err")
else:
print("Header err")
else:
print("Protol err")
return False
# unpack data
# <Tuple (13 x unsigned short <H> + 2 x unsigned char <B> + unsigned short <H>)>
def unpack_data(self, buffer):
buffer = buffer[0:self.PMS_7003_PROTOCOL_SIZE]
# data unpack (Byte -> Tuple (13 x unsigned short <H> + 2 x unsigned char <B> + unsigned short <H>))
data = struct.unpack('!2B13H2BH', buffer)
return data
def print_serial(self, buffer):
chksum = self.chksum_cal(buffer)
data = self.unpack_data(buffer)
print ("============================================================================")
print ("Header : %c %c \t\t | Frame length : %s" % (data[self.HEADER_HIGH], data[self.HEADER_LOW], data[self.FRAME_LENGTH]))
print ("PM 1.0 (CF=1) : %s\t | PM 1.0 : %s" % (data[self.DUST_PM1_0_CF1], data[self.DUST_PM1_0_ATM]))
print ("PM 2.5 (CF=1) : %s\t | PM 2.5 : %s" % (data[self.DUST_PM2_5_CF1], data[self.DUST_PM2_5_ATM]))
print ("PM 10.0 (CF=1) : %s\t | PM 10.0 : %s" % (data[self.DUST_PM10_0_CF1], data[self.DUST_PM10_0_ATM]))
print ("0.3um in 0.1L of air : %s" % (data[self.DUST_AIR_0_3]))
print ("0.5um in 0.1L of air : %s" % (data[self.DUST_AIR_0_5]))
print ("1.0um in 0.1L of air : %s" % (data[self.DUST_AIR_1_0]))
print ("2.5um in 0.1L of air : %s" % (data[self.DUST_AIR_2_5]))
print ("5.0um in 0.1L of air : %s" % (data[self.DUST_AIR_5_0]))
print ("10.0um in 0.1L of air : %s" % (data[self.DUST_AIR_10_0]))
print ("Reserved F : %s | Reserved B : %s" % (data[self.RESERVEDF],data[self.RESERVEDB]))
print ("CHKSUM : %s | read CHKSUM : %s | CHKSUM result : %s" % (chksum, data[self.CHECKSUM], chksum == data[self.CHECKSUM]))
print ("============================================================================")
# UART / USB Serial : 'dmesg | grep ttyUSB'
USB0 = '/dev/ttyUSB0'
UART = '/dev/ttyAMA0'
# USE PORT
SERIAL_PORT = UART
# Baud Rate
Speed = 9600
# example
if __name__=='__main__':
#serial setting
ser = serial.Serial(SERIAL_PORT, Speed, timeout = 1)
dust = PMS7003()
while True:
ser.flushInput()
buffer = ser.read(1024)
if(dust.protocol_chk(buffer)):
print("DATA read success")
# print data
dust.print_serial(buffer)
else:
print("DATA read fail...")
ser.close()
- dust 파일
"""
*******************************************
* PMS7003 데이터 수신 프로그램 import 예제
* 수정 : 2018. 08. 27
* 제작 : eleparts 부설연구소
* SW ver. 1.0.1
*******************************************
# unpack_data(buffer)
# data list
HEADER_HIGH = 0x42
HEADER_LOW = 0x4d
FRAME_LENGTH = 2x13+2(data+check bytes)
DUST_PM1_0_CF1 = PM1.0 concentration unit μ g/m3(CF=1,standard particle)
DUST_PM2_5_CF1 = PM2.5 concentration unit μ g/m3(CF=1,standard particle)
DUST_PM10_0_CF1 = PM10 concentration unit μ g/m3(CF=1,standard particle)
DUST_PM1_0_ATM = PM1.0 concentration unit μ g/m3(under atmospheric environment)
DUST_PM2_5_ATM = PM2.5 concentration unit μ g/m3(under atmospheric environment)
DUST_PM10_0_ATM = PM10 concentration unit μ g/m3 (under atmospheric environment)
DUST_AIR_0_3 = indicates the number of particles with diameter beyond 0.3 um in 0.1 L of air.
DUST_AIR_0_5 = indicates the number of particles with diameter beyond 0.5 um in 0.1 L of air.
DUST_AIR_1_0 = indicates the number of particles with diameter beyond 1.0 um in 0.1 L of air.
DUST_AIR_2_5 = indicates the number of particles with diameter beyond 2.5 um in 0.1 L of air.
DUST_AIR_5_0 = indicates the number of particles with diameter beyond 5.0 um in 0.1 L of air.
DUST_AIR_10_0 = indicates the number of particles with diameter beyond 10 um in 0.1 L of air.
RESERVEDF = Data13 Reserved high 8 bits
RESERVEDB = Data13 Reserved low 8 bits
CHECKSUM = Checksum code
# CF=1 should be used in the factory environment
"""
import serial
from PMS7003 import PMS7003
dust = PMS7003()
# Baud Rate
Speed = 9600
# UART / USB Serial
USB0 = '/dev/ttyUSB0'
UART = '/dev/ttyAMA0'
# USE PORT
SERIAL_PORT = UART
#serial setting
ser = serial.Serial(SERIAL_PORT, Speed, timeout = 1)
buffer = ser.read(1024)
if(dust.protocol_chk(buffer)):
data = dust.unpack_data(buffer)
print ("PMS 7003 dust data")
print ("PM 1.0 : %s" % (data[dust.DUST_PM1_0_ATM]))
print ("PM 2.5 : %s" % (data[dust.DUST_PM2_5_ATM]))
print ("PM 10.0 : %s" % (data[dust.DUST_PM10_0_ATM]))
else:
print ("data read Err")
ser.close()
- 파이썬 코드
import Adafruit_DHT
from time import *
import sys
from PyQt5.QtWidgets import *
from PyQt5 import uic
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
from threading import Timer
import datetime
from PyQt5.QtCore import QTimer, QTime
import serial
from PMS7003 import PMS7003
from mq import *
form_class = uic.loadUiType("test_window.ui")[0]
pin = 4
sensor = Adafruit_DHT.DHT11
dust = PMS7003()
Speed = 9600
UART = '/dev/ttyAMA0'
SERIAL_PORT = UART
ser = serial.Serial(SERIAL_PORT, Speed, timeout = 1)
#mq = MQ();
#perc = mq.MQPercentage()
#sys.stdout.write("LPG: %g ppm, CO: %g ppm, Smoke: %g ppm" % (perc["GAS_LPG"], perc["CO"], perc["SMOKE"]))
class MyWindow(QMainWindow, form_class):
def __init__(self):
super().__init__()
self.setupUi(self)
self.on_set.clicked.connect(self.on_set_clicked)
self.off_set.clicked.connect(self.off_set_clicked)
self.timer = QTimer(self)
self.timer.setInterval(1000)
self.timer.timeout.connect(self.displayTime)
self.timer.start()
def displayTime(self):
h, t = Adafruit_DHT.read_retry(sensor, pin)
buffer = ser.read(1024)
data = dust.unpack_data(buffer)
d = (data[dust.DUST_PM2_5_ATM])
now = datetime.datetime.now()
nowDate = now.strftime('%Y-%m-%d %H:%M:%S')
lpg = round((perc[“GAS_LPG”] / 10000,7) # 소숫점 7번째자리까지
mq = MQ();
perc = mq.MQPercentage()
#sys.stdout.write("LPG: %g ppm, CO: %g ppm, Smoke: %g ppm" % (perc["GAS_LPG"], perc["CO"], perc["SMOKE"]))
self.time.setText(str(nowDate))
self.h_label.setText(str(h))
self.t_label.setText(str(t))
self.d_label.setText(str(d))
self.g_label.setText(str(perc["GAS_LPG"]))
if(self.on_set_clicked == True):
set_list1 = set_on_clicked()
set_hum1 = set_list1[1]
set_tem1 = set_list1[0]
if(set_hum1 < h) | (set_tem1 < t):
print("온도가 높거나 습도가 높습니다.")
if(d < 15):
self.d_image.setText("좋음")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
elif(d < 50):
self.d_image.setText("보통")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
elif(d < 100):
self.d_image.setText("나쁨")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
else:
self.d_image.setText("매우나쁨")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
else:
print("온도와 습도가 적당합니다.")
if(d < 15):
self.d_image.setText("좋음")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
elif(d < 50):
self.d_image.setText("보통")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
elif(d < 100):
self.d_image.setText("나쁨")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
else:
self.d_image.setText("매우나쁨")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
elif(self.off_set_clicked == True):
set_list2 = set_on_clicked()
set_hum2 = set_list2[1]
set_tem2 = set_list2[0]
if(set_hum2 < h) | (set_tem2 < t):
print("온도가 낮거나 습도가 낮습니다.")
if(d < 15):
self.d_image.setText("좋음")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
elif(d < 50):
self.d_image.setText("보통")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
elif(d < 100):
self.d_image.setText("나쁨")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
else:
self.d_image.setText("매우나쁨")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
else:
print("온도와 습도가 적당합니다.")
if(d < 15):
self.d_image.setText("좋음")
else:
set_hum1 = 80
set_tem1 = 25
if(set_hum1 < h) | (set_tem1 < t):
print("온도가 높거나 습도가 높습니다.")
if(d < 15):
self.d_image.setText("좋음")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
elif(d < 50):
self.d_image.setText("보통")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
elif(d < 100):
self.d_image.setText("나쁨")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
else:
self.d_image.setText("매우나쁨")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
else:
print("온도와 습도가 적당합니다.")
if(d < 15):
self.d_image.setText("좋음")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
elif(d < 50):
self.d_image.setText("보통")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
elif(d < 100):
self.d_image.setText("나쁨")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
else:
self.d_image.setText("매우나쁨")
if(perc["GAS_LPG"] < 0.01):
self.g_image.setText("정상")
else:
self.d_image.setText("경고")
def on_set_clicked(self):
tem = self.t_on_set_spinbox_on.value()
hum = self.h_on_set_spinbox_on.value()
set_list1 = [tem, hum]
return set_list1
def off_set_clicked(self):
tem1 = self.t_off_set_spinbox_off.value()
hum1 = self.h_off_set_spinbox_off.value()
set_list2 = [tem1, hum1]
return set_list2
if __name__ =="__main__":
app = QApplication(sys.argv)
mywindow = MyWindow()
mywindow.show()
app.exec_()
test_window.ui
0.02MB
project_test.py
0.01MB