2020年7月31日金曜日

一回りしてpythonに戻って

一通り手直しを終え、ドライバが無かったらそれなりに動く形のものに落ち着きました。

各変数の位置がいい加減だったり、配列は相変わらずどんぶりだったりと突っ込み所も多いですが、参考になれば

#!/usr/bin/python3
#coding: utf-8
import array
import argparse
import ctypes
import enum
import fcntl
import time
import RPi.GPIO as GPIO

DEF_PIN = 5

class dht_models(enum.Enum):
DHT11 = "DHT11"
DHT11V1_3 = "DHT11V1.3"
DHT22 = "DHT22"

class TMinMax:
def __init__(self):
self.__min = None
self.__max = None
self.isValue = False
@property
def min(self):
return self.__min
@property
def max(self):
return self.__max
def __set_value(self, value):
self.isValue = True
if self.__min is None or self.__min > value:
self.__min = value
if self.__max is None or self.__max < value:
self.__max = value
value = property(None, __set_value)
def clear(self):
self.__min = None
self.__max = None
self.isValue = False

class TDhtResult:
'DHT sensor result returned by DHT.read() method'

ERR_NO_ERROR = 0
ERR_MISSING_DATA = 1
ERR_CRC = 2
ERR_WAITING_TIMEOUT = 3
ERR_RANGE = 101
ERR_GPIO = 201
ERR_MODEL = 202

error_code = ERR_NO_ERROR
temperature = -1
humidity = -1

def __init__(self, error_code, temperature = 0, humidity = 0, model = None):
self.error_code = error_code
self.temperature = temperature
self.humidity = humidity
self.model = model

def is_valid(self):
return self.error_code == TDhtResult.ERR_NO_ERROR


class Error(Exception):
pass

class ReciveDataError(Error):
def __init__(self, expression, message):
self.expression = expression
self.message = message

SAMPLING_SIZE = 100

class SGpioreadInit(ctypes.Structure):
_fields_ = [
("peripheral_address", ctypes.c_uint),
]

class SGpioreadGetSampling(ctypes.Structure):
_fields_ = [
("gpio_no", ctypes.c_uint),
("sampling", ctypes.c_int * SAMPLING_SIZE),
("sampling_length", ctypes.c_int),
("sampling_start_lev", ctypes.c_int),
("sampling_end_lev", ctypes.c_int),
("sampling_cnt", ctypes.c_int * SAMPLING_SIZE),
]

GPIOREAD_INIT = 1<<30 | ctypes.sizeof(SGpioreadInit)<<16 | ord('G') << 8 | 1
GPIOREAD_GET_SAMPLING = 3<<30 | ctypes.sizeof(SGpioreadGetSampling)<<16 | ord('G') << 8 | 101

LOW = 0
HIGH = 1

STATE_WAITING_START_LOW = 0
STATE_WAITING_START_HIGH = 1
STATE_RECEIVE_DATA = 2

class TDht:
isDebug = False
isNoDriver = False
sampling = array.array('I', [0 for i in range(SAMPLING_SIZE)])
sampling_length = 0
sampling_start_lev = 0
sampling_cnt = array.array('I', [0 for i in range(SAMPLING_SIZE)])
bit_count = 0
dataformat = array.array('B', [0 for i in range(5)])
'DHT sensor reader class for Raspberry'
def __init__(self, gpio_no = DEF_PIN):
self.gpio_no = gpio_no
self.__model = None
self.isReadOneOnly = False

@property
def gpio_no(self):
return self.__gpio_no
@gpio_no.setter
def gpio_no(self, gpio_no):
if gpio_no < 0 or gpio_no > 53:
raise ValueError('invalid gpio_no.')
if TDht.isDebug:
print("gpio_no :", gpio_no)
self.__gpio_no = gpio_no
@property
def model(self):
return self.__model
@model.setter
def model(self, model):
if not (model in dht_models):
raise ValueError('invalid type.')
if TDht.isDebug:
print("model :", model)
self.__model = model

def set_model_name(self, model_name):
if not model_name.upper() in (dht_models):
raise ValueError('invalid type.')
if TDht.isDebug:
print("model :", model_name.upper() )
self.__model = dht_models(model_name.upper())

def __send_and_sleep(self, output, sleep):
GPIO.output(self.gpio_no, output)
time.sleep(sleep)

def __sampling_ioctl(self):
libc = ctypes.CDLL("libbcm_host.so")
init_values = SGpioreadInit(libc.bcm_host_get_peripheral_address(None))
sampling_values = SGpioreadGetSampling(self.gpio_no)
with open("/dev/gpioread0", "r") as fd:
fcntl.ioctl(fd, GPIOREAD_INIT, init_values);
fcntl.ioctl(fd, GPIOREAD_GET_SAMPLING, sampling_values);
self.sampling_length = sampling_values.sampling_length
self.sampling_start_lev = sampling_values.sampling_start_lev
for i in range(self.sampling_length):
self.sampling[i] = sampling_values.sampling[i]
self.sampling_cnt[i] = sampling_values.sampling_cnt[i]
return self.sampling[0:self.sampling_length]

def __sampling(self):
GPIO.setup(self.gpio_no, GPIO.IN, GPIO.PUD_UP)

cnt = 0
idx = 0
self.sampling_start_lev = GPIO.input(self.gpio_no)
starttime = time.time()
now = starttime
last = self.sampling_start_lev
current = self.sampling_start_lev
while (idx < SAMPLING_SIZE and (now - starttime) < 0.050):
current = GPIO.input(self.gpio_no)
if last != current:
endtime = now
interval = int((endtime - starttime) * 1000000) & 0xFF
self.sampling_cnt[idx] = cnt
cnt = 0
self.sampling[idx] = interval
idx += 1
if idx > 100:
break
last = current
starttime = endtime
now = time.time()
cnt += 1
self.sampling_length = idx
return self.sampling[0:idx]

def __sampling_to_data(self):
self.bit_count = 0
state = STATE_WAITING_START_LOW
t0 = TMinMax()
t1 = TMinMax()
tlow = TMinMax()
idx = 0
data_idx=0
bit_idx=0
sampling_high_boarder = 50

cur_lev = self.sampling_start_lev
for i in range(len(self.dataformat)):
self.dataformat[i] = 0
while ((self.sampling_length > idx) and (self.bit_count < 40)):
if(state == STATE_WAITING_START_LOW):
if(cur_lev == LOW):
state = STATE_WAITING_START_HIGH;
elif(state == STATE_WAITING_START_HIGH):
if(cur_lev == HIGH):
state = STATE_RECEIVE_DATA;
sampling_data_start = idx + 2;
elif(state == STATE_RECEIVE_DATA):
if(cur_lev == HIGH):
if(self.sampling[idx] < sampling_high_boarder):
t0.value = self.sampling[idx]
else:
t1.value = self.sampling[idx]
data_idx = self.bit_count // 8
bit_idx = 7 - (self.bit_count % 8)
self.dataformat[data_idx] = self.dataformat[data_idx] | (0x01 << bit_idx)
self.bit_count+=1
else:
tlow.value = self.sampling[idx]
else:
return 1;
cur_lev = LOW if (cur_lev == HIGH) else HIGH
idx+=1

if(TDht.isDebug):
print("bitcount :", self.bit_count)
if(self.bit_count != 40):
raise ReciveDataError("bitcount", "bitcount : {:d}".format(self.bit_count))
if(TDht.isDebug):
if t0.isValue:
print("data high '0' : %d ~ %d" % (t0.min, t0.max))
if t1.isValue:
print("data high '1' : %d ~ %d" % (t1.min, t1.max))
if tlow.isValue:
print("data low : %d ~ %d" % (tlow.min, tlow.max))
s = ""
for data in self.dataformat:
s = s + "{:02X} ".format(data & 0xff);
print("data format :", s);

def __calc_checksum(self):
return self.dataformat[0] + self.dataformat[1] + self.dataformat[2] + self.dataformat[3] & 0xFF

def chk_dht11_value(temperature, humidity):
if ((temperature < 0) or (temperature > 50)
or (humidity < 20) or (humidity > 90)):
return TDhtResult.ERR_RANGE
return 0

def chk_dht11v1_3_value(temperature, humidity):
if ((temperature < -20) or (temperature > 60)
or (humidity < 5) or (humidity > 95)):
return TDhtResult.ERR_RANGE
return 0

def chk_dht22_value(temperature, humidity):
if ((temperature < -40) or (temperature > 80)
or (humidity < 0) or (humidity > 99.9)):
return TDhtResult.ERR_RANGE
return 0

def __get_dht11_values(self):
return (
(self.dataformat[2] + (self.dataformat[3] & 0x7F)*0.1)*(-1if(self.dataformat[3] & 0x80)else(1)),
self.dataformat[0]+self.dataformat[1]*0.1,
)

def __get_dht22_values(self):
return (
((self.dataformat[2] << 8) | self.dataformat[3])*0.1,
((self.dataformat[0] << 8) | self.dataformat[1])*0.1,
)

def __get_data(self):
result = TDhtResult.ERR_NO_ERROR
resultmodel = None
try:
GPIO.setup(self.gpio_no, GPIO.OUT)
self.__send_and_sleep(GPIO.HIGH, 0.05)
if self.model == dht_models.DHT22:
self.__send_and_sleep(GPIO.LOW, 0.0008)
else:
self.__send_and_sleep(GPIO.LOW, 0.02)
try:
if self.isNoDriver:
raise OSError(1, "IsNoDriver")
data = self.__sampling_ioctl()
except OSError as e:
if TDht.isDebug:
print("OSError :", e.errno, e.strerror)
print("Sampling is performed without using a driver.")
data = self.__sampling()
if TDht.isDebug:
print(self.sampling[0:self.sampling_length])
print(self.sampling_cnt[0:self.sampling_length])
self.__sampling_to_data()
checksum = self.__calc_checksum()
if self.dataformat[4] != checksum:
if TDht.isDebug:
print("CRC Error.")
return TDhtResult(TDhtResult.ERR_CRC, 0, 0)
except ReciveDataError:
return TDhtResult(TDhtResult.ERR_MISSING_DATA)
if self.model == dht_models.DHT22:
resultmodel = self.model
(temperature, humidity) = self.__get_dht22_values()
if TDht.chk_dht22_value(temperature, humidity):
result = TDhtResult.ERR_RANGE
elif self.model in (dht_models.DHT11, dht_models.DHT11V1_3):
resultmodel = self.model
(temperature, humidity) = self.__get_dht11_values()
if self.model == dht_models.DHT11:
if TDht.chk_dht11_value(temperature, humidity):
result = TDhtResult.ERR_RANGE
else:
if TDht.chk_dht11v1_3_value(temperature, humidity):
result = TDhtResult.ERR_RANGE
else:
(temperature, humidity) = self.__get_dht11_values()
resultmodel = dht_models.DHT11V1_3
if TDht.chk_dht11v1_3_value(temperature, humidity):
(trial_temperature, trial_humidity) = self.__get_dht22_values()
if(TDht.chk_dht22_value(trial_temperature, trial_humidity) == 0):
temperature = trial_temperature
humidity = trial_humidity
resultmodel = dht_models.DHT22
if TDht.isDebug:
print("DHT22として気温と湿度が設定されました。")
else:
result = TDhtResult.ERR_RANGE
else:
if (self.dataformat[1] != 0) or((self.dataformat[3] & 0x70) != 0):
(trial_temperature, trial_humidity) = self.__get_dht22_values()
if(TDht.chk_dht22_value(trial_temperature, trial_humidity) == 0):
temperature = trial_temperature
humidity = trial_humidity
resultmodel = dht_models.DHT22
if TDht.isDebug:
print("DHT22として気温と湿度が設定されました。")
return TDhtResult(result, temperature, humidity, resultmodel)

def __one_read(self):
cnt = 0
result = self.__get_data()
while (not result.is_valid()) and (cnt < 10):
cnt = cnt + 1
time.sleep(2)
if TDht.isDebug:
print("ErrorCode : ", result.error_code, " retry.")
else:
if cnt == 1:
print("retry.", end="", flush=True)
else:
print(".", end="", flush=True)
result = self.__get_data()
if TDht.isDebug is False:
if cnt:
print("", flush=True)
self.retry_cnt += cnt
return result

def read(self):
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)

self.retry_cnt = 0
result = self.__one_read()
if self.isReadOneOnly is False and result.is_valid():
if TDht.isDebug:
print("2nd read.")
time.sleep(2)
result = self.__one_read()
return result
if __name__ == '__main__':
try:
parser = argparse.ArgumentParser()
parser.add_argument("--gpio", type=int, default=DEF_PIN, help="GPIO No.(0 to 53). Default is %d." % (DEF_PIN))
parser.add_argument("--model", help="Specify the model number of the sensor.(DHT11/DHT11V1.3/DHT22)")
parser.add_argument("--onetime", help="Read only once.", action="store_true")
parser.add_argument("--nodriver", help="Read directly without using a driver.It will be affected by interrupt processing.", action="store_true")
parser.add_argument("--debug", help="Output debug information.", action="store_true")
args = parser.parse_args()
if args.debug:
TDht.isDebug = True
print("Output for debugging.")
try:
dht = TDht(args.gpio)
except:
print("Invalid gpio no.")
exit()
if args.nodriver:
dht.isNoDriver = True
if TDht.isDebug:
print("No driver is used.")
if args.model:
try:
dht.model = dht_models(args.model.upper())
except:
print("Invalid model name.")
exit()
dht.isReadOneOnly = args.onetime
result = dht.read()
print("DHT results.")
if dht.retry_cnt:
print("Retry count : %d time(s)" % (dht.retry_cnt))
if result.error_code:
print("Error code : %d" % (result.error_code))
if result.model:
print("Conversion :", result.model.value)
print("Temperature : %.1f C" % (result.temperature))
print("Humidity : %.1f %s" % (result.humidity, "%"))

except KeyboardInterrupt:
GPIO.cleanup()
pass
(2020/08/01 17:24にスクリプト部分の気になる部分を書き換えたので更新)
このスクリプトを直接実行すればgpioの5番に接続されているDHT11かDHT22の値を取り込みます。連続で2回読みだしているので必ず2秒待たされます。1回だけ限定させたい場合は、オプションで指定したり、そんな感じでw
# python3 dht.py --gpio 6 --onetime
これでGPIOの6番で1回読込んで表示します。
--model オプションでDHT11とかDHT22と指定すると、少しだけ読み込みタイミングを変えて、あとは読込んだ値の整合性チェックを行います。なにも指定しないとDHT11として動作し、温度と湿度を求めるときに整合性チェックを行います。さらに、DHT11では受信したデータは仕様上ビットが立たない場所があるので、そのビットが立っていたらDHT22として値を変換し、範囲内であればDHT22として出力します。

ライブラリとして使う場合は、__main__部分を参考に組み込むといいと思います。
dht = TDht(args.gpio)
result = dht.read()
でresultにセンサーからの値が取り込まれます。

ちなみに、pythonの推奨はスペース区切りなのですが、タブ区切りを使っている上に、クラスのネーミングルールをTP風に変更しています。

0 件のコメント:

コメントを投稿