概要

ESP32で取得したセンサーデータをJSON形式にしてFastAPIを介してMongoDBに保存する. fastapi.jpg (27.6 kB)

MongoDB

公式ドキュメントに従ってMongoDBをインストールする. MongoDB用GUIのMongoDB CompassをPCにインストールしておくと後で動作確認の時に便利.

FastAPI

FastAPIでは受け取ったJSONをMongoDBに保存するAPIを作成する.

pip install fastapi pip install uvicorn

pythonファイルは - main.py - database.py - model.py - を作成

main.py

FastAPIのメイン処理を書く

from typing import Union
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from database import (
    create_data,
)
from model import Device, Sensors_temp, Sensors_hum, Data

app = FastAPI()

origins = [
  'http://localhost:3000',
  'http://localhost',
  ]

app.add_middleware(
  CORSMiddleware,
  allow_origins=origins,
  allow_credentials=True,
  allow_methods=["*"],
  allow_headers=["*"],
)

@app.get("/")
def read_root():
    return {"HELLO" : "WORLD"}

@app.post("/api/data", response_model=Data)
async def post_data(data:Data):
    response = await create_data(data.dict())
    if response:
        return response
    raise HTTPException(400, "Something went wrong / Bad Request")
main.pyをuvicornで実行することでサーバーが起動する. uvicorn main:app --reload or nohup uvicorn main:app --reload &

database.py

使用するデータベースの情報,データベースに対してどのような処理をするか記述する. ここでは,MongoDB(client)のMonitoring(database)のdata(collection)にデータを保存する.

from model import *
import sys
import datetime
# mongoDB driver
import motor.motor_asyncio

client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017/")
database = client.Monitoring
collection = database.data

async def create_data(data):
    document = data
    result = await collection.insert_one(document)
    return result

model.py

Pydanticライブラリを使って受け取るJSONの中身の構造を指定する.バリデーションとか型チェックとか言う.

from pydantic import BaseModel
import datetime
from typing import Set, Union

class Device(BaseModel):
    id: str
    name: str
    type: str
    manufacturer: str

class Sensors_temp(BaseModel):
    id: str
    type: str
    location: str
    value: float
    unit: str
    timestamp: str

class Sensors_hum(BaseModel):
    id: str
    type: str
    location: str
    value: float
    unit: str
    timestamp: str

class Data(BaseModel):
    sensors_hum: Union[Sensors_hum, None] = None
    sensors_temp: Union[Sensors_temp, None] = None
    device: Union[Device, None] = None
このコードでは,以下のようなJSONの受け取りを期待している.
data = {
    "device": {
        "id": "002",
        "name": "ESP32-WROOM-32",
        "type": "MicroController",
        "manufacturer": "ESPRESSIF",
    },
    "sensors_temp": {
        "id": "temp_sensor_2",
        "type": "temperature",
        "location": "door",
        "value": float(measure.get_temp()),
        "unit": "Celsius",
        "timestamp": datetime_str,
    },
    "sensors_hum": {
        "id": "hum_sensor_2",
        "type": "humidity",
        "location": "door",
        "value": float(measure.get_hum()),
        "unit": "%",
        "timestamp": datetime_str,
    },
}

ESP32

BME280.py

温度センサーであるBME280から温度データと湿度データを取得する.MicroPythonではモジュールをインストールすることができないため,モジュールファイルを自分で作成する必要がある. 参考:https://qiita.com/sirius1000/items/e536b42099dbaae7e1a2

from machine import I2C
import time

# BME280 default address.
BME280_I2CADDR = 0x76

# Operating Modes
BME280_OSAMPLE_1 = 1
BME280_OSAMPLE_2 = 2
BME280_OSAMPLE_4 = 3
BME280_OSAMPLE_8 = 4
BME280_OSAMPLE_16 = 5

# BME280 Registers

BME280_REGISTER_DIG_T1 = 0x88  # Trimming parameter registers
BME280_REGISTER_DIG_T2 = 0x8A
BME280_REGISTER_DIG_T3 = 0x8C

BME280_REGISTER_DIG_P1 = 0x8E
BME280_REGISTER_DIG_P2 = 0x90
BME280_REGISTER_DIG_P3 = 0x92
BME280_REGISTER_DIG_P4 = 0x94
BME280_REGISTER_DIG_P5 = 0x96
BME280_REGISTER_DIG_P6 = 0x98
BME280_REGISTER_DIG_P7 = 0x9A
BME280_REGISTER_DIG_P8 = 0x9C
BME280_REGISTER_DIG_P9 = 0x9E

BME280_REGISTER_DIG_H1 = 0xA1
BME280_REGISTER_DIG_H2 = 0xE1
BME280_REGISTER_DIG_H3 = 0xE3
BME280_REGISTER_DIG_H4 = 0xE4
BME280_REGISTER_DIG_H5 = 0xE5
BME280_REGISTER_DIG_H6 = 0xE6
BME280_REGISTER_DIG_H7 = 0xE7

BME280_REGISTER_CHIPID = 0xD0
BME280_REGISTER_VERSION = 0xD1
BME280_REGISTER_SOFTRESET = 0xE0

BME280_REGISTER_CONTROL_HUM = 0xF2
BME280_REGISTER_CONTROL = 0xF4
BME280_REGISTER_CONFIG = 0xF5
BME280_REGISTER_PRESSURE_DATA = 0xF7
BME280_REGISTER_TEMP_DATA = 0xFA
BME280_REGISTER_HUMIDITY_DATA = 0xFD


class Device:
    """Class for communicating with an I2C device.

    Allows reading and writing 8-bit, 16-bit, and byte array values to
    registers on the device."""

    def __init__(self, address, i2c):
        """Create an instance of the I2C device at the specified address using
        the specified I2C interface object."""
        self._address = address
        self._i2c = i2c

    def writeRaw8(self, value):
        """Write an 8-bit value on the bus (without register)."""
        value = value & 0xFF
        self._i2c.writeto(self._address, value)

    def write8(self, register, value):
        """Write an 8-bit value to the specified register."""
        b=bytearray(1)
        b[0]=value & 0xFF
        self._i2c.writeto_mem(self._address, register, b)

    def write16(self, register, value):
        """Write a 16-bit value to the specified register."""
        value = value & 0xFFFF
        b=bytearray(2)
        b[0]= value & 0xFF
        b[1]= (value>>8) & 0xFF
        self.i2c.writeto_mem(self._address, register, value)

    def readRaw8(self):
        """Read an 8-bit value on the bus (without register)."""
        return int.from_bytes(self._i2c.readfrom(self._address, 1),'little') & 0xFF

    def readU8(self, register):
        """Read an unsigned byte from the specified register."""
        return int.from_bytes(
            self._i2c.readfrom_mem(self._address, register, 1),'little') & 0xFF

    def readS8(self, register):
        """Read a signed byte from the specified register."""
        result = self.readU8(register)
        if result > 127:
            result -= 256
        return result

    def readU16(self, register, little_endian=True):
        result = int.from_bytes(
            self._i2c.readfrom_mem(self._address, register, 2),'little') & 0xFFFF
        if not little_endian:
            result = ((result << 8) & 0xFF00) + (result >> 8)
        return result

    def readS16(self, register, little_endian=True):
        """Read a signed 16-bit value from the specified register, with the
        specified endianness (default little endian, or least significant byte
        first)."""
        result = self.readU16(register, little_endian)
        if result > 32767:
            result -= 65536
        return result

    def readU16LE(self, register):
        """Read an unsigned 16-bit value from the specified register, in little
        endian byte order."""
        return self.readU16(register, little_endian=True)

    def readU16BE(self, register):
        """Read an unsigned 16-bit value from the specified register, in big
        endian byte order."""
        return self.readU16(register, little_endian=False)

    def readS16LE(self, register):
        """Read a signed 16-bit value from the specified register, in little
        endian byte order."""
        return self.readS16(register, little_endian=True)

    def readS16BE(self, register):
        """Read a signed 16-bit value from the specified register, in big
        endian byte order."""
        return self.readS16(register, little_endian=False)


class BME280:
    def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None,
                **kwargs):
        # Check that mode is valid.
        if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,
                        BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
            raise ValueError(
                'Unexpected mode value {0}. Set mode to one of '
                'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
                'BME280_ULTRAHIGHRES'.format(mode))
        self._mode = mode
        # Create I2C device.
        if i2c is None:
            raise ValueError('An I2C object is required.')
        self._device = Device(address, i2c)
        # Load calibration values.
        self._load_calibration()
        self._device.write8(BME280_REGISTER_CONTROL, 0x3F)
        self.t_fine = 0

    def _load_calibration(self):

        self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1)
        self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2)
        self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3)

        self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1)
        self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2)
        self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3)
        self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4)
        self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5)
        self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6)
        self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7)
        self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8)
        self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9)

        self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1)
        self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2)
        self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3)
        self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7)

        h4 = self._device.readS8(BME280_REGISTER_DIG_H4)
        h4 = (h4 << 24) >> 20
        self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F)

        h5 = self._device.readS8(BME280_REGISTER_DIG_H6)
        h5 = (h5 << 24) >> 20
        self.dig_H5 = h5 | (
            self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F)

    def read_raw_temp(self):
        """Reads the raw (uncompensated) temperature from the sensor."""
        meas = self._mode
        self._device.write8(BME280_REGISTER_CONTROL_HUM, meas)
        meas = self._mode << 5 | self._mode << 2 | 1
        self._device.write8(BME280_REGISTER_CONTROL, meas)
        sleep_time = 1250 + 2300 * (1 << self._mode)

        sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
        sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
        time.sleep_us(sleep_time)  # Wait the required time
        msb = self._device.readU8(BME280_REGISTER_TEMP_DATA)
        lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1)
        xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2)
        raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
        return raw

    def read_raw_pressure(self):
        """Reads the raw (uncompensated) pressure level from the sensor."""
        """Assumes that the temperature has already been read """
        """i.e. that enough delay has been provided"""
        msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA)
        lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1)
        xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2)
        raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
        return raw

    def read_raw_humidity(self):
        """Assumes that the temperature has already been read """
        """i.e. that enough delay has been provided"""
        msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA)
        lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1)
        raw = (msb << 8) | lsb
        return raw

    def read_temperature(self):
        """Get the compensated temperature in 0.01 of a degree celsius."""
        adc = self.read_raw_temp()
        var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11)
        var2 = ((
            (((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) *
            self.dig_T3) >> 14
        self.t_fine = var1 + var2
        return (self.t_fine * 5 + 128) >> 8

    def read_pressure(self):
        """Gets the compensated pressure in Pascals."""
        adc = self.read_raw_pressure()
        var1 = self.t_fine - 128000
        var2 = var1 * var1 * self.dig_P6
        var2 = var2 + ((var1 * self.dig_P5) << 17)
        var2 = var2 + (self.dig_P4 << 35)
        var1 = (((var1 * var1 * self.dig_P3) >> 8) +
                ((var1 * self.dig_P2) >> 12))
        var1 = (((1 << 47) + var1) * self.dig_P1) >> 33
        if var1 == 0:
            return 0
        p = 1048576 - adc
        p = (((p << 31) - var2) * 3125) // var1
        var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25
        var2 = (self.dig_P8 * p) >> 19
        return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)

    def read_humidity(self):
        adc = self.read_raw_humidity()
        # print 'Raw humidity = {0:d}'.format (adc)
        h = self.t_fine - 76800
        h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) +
            16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h *
                            self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) *
                            self.dig_H2 + 8192) >> 14))
        h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)
        h = 0 if h < 0 else h
        h = 419430400 if h > 419430400 else h
        return h >> 12

    @property
    def temperature(self):
        "Return the temperature in degrees."
        t = self.read_temperature()
        ti = t // 100
        td = t - ti * 100
        return "{}.{:02d}".format(ti, td)

    @property
    def pressure(self):
        "Return the temperature in hPa."
        p = self.read_pressure() // 256
        pi = p // 100
        pd = p - pi * 100
        return "{}.{:02d}".format(pi, pd)

    @property
    def humidity(self):
        "Return the humidity in percent."
        h = self.read_humidity()
        hi = h // 1024
        hd = h * 100 // 1024 - hi * 100
        return "{}.{:02d}".format(hi, hd)

measure.py

BME280モジュールを使ってデータを取得する.

from machine import Pin,I2C
import utime
import BME280

p21 = Pin(21,Pin.IN,Pin.PULL_UP)
p22 = Pin(22,Pin.IN,Pin.PULL_UP)

i2c = I2C(scl=Pin(22), sda=Pin(21), freq=10000)

def get_temp():
    bme = BME280.BME280(i2c=i2c)
    temp = bme.temperature
    hum = bme.humidity
    pres = bme.pressure
    print(f'temperature: {temp} ºC')
    return temp

def get_hum():
    bme = BME280.BME280(i2c=i2c)
    temp = bme.temperature
    hum = bme.humidity
    pres = bme.pressure
    print(f'humidity: {hum} %')
    return hum

transmit.py

このpythonファイルで取得したデータをFastAPIに送る.

import requests,urequests
import json
import measure
import machine

#現在時刻を取得
url_jst = "http://worldtimeapi.org/api/timezone/Asia/Tokyo"

retry_delay = 5000 # interval time of retry after a failed Web query
response = urequests.get(url_jst)
parsed = response.json()
datetime_str = str(parsed["datetime"])

#送信データをJSON形式にまとめる
data = {
    "device": {
        "id": "002",
        "name": "ESP32-WROOM-32",
        "type": "MicroController",
        "manufacturer": "ESPRESSIF",
    },
    "sensors_temp": {
        "id": "temp_sensor_2",
        "type": "temperature",
        "location": "door",
        "value": float(measure.get_temp()), #measure.pyで取得したデータ
        "unit": "Celsius",
        "timestamp": datetime_str, #現在時刻ISO8601形式
    },
    "sensors_hum": {
        "id": "hum_sensor_2",
        "type": "humidity",
        "location": "door",
        "value": float(measure.get_hum()), #measure.pyで取得したデータ
        "unit": "%",
        "timestamp": datetime_str,
    },
}

#送信
def send():
    json_data = json.dumps(data)
    response = requests.post(
        "http://your_VM_ip/api/data",
        data = json_data,
        headers={"Content-Type":"application/json"}
    )

    print(response.status_code)
    print(data)

send()

確認

  • MongoDBは動いているか sudo systemctl status mongod
  • FastAPIサーバーは動いているか http://your_VM_ip:port/ にアクセス
  • ESP32はWi-Fiに接続できているか

実行

ターミナル image.png (81.6 kB)

MongoDB compass image.png (128.2 kB)

こんな感じになればOK *適当に撮ったスクショなのでターミナルとDBの値は一致してません

参考

https://techpr.info/python/farm-stack/ https://fastapi.tiangolo.com/ https://fastapi.tiangolo.com/ja/tutorial/body-nested-models/ https://qiita.com/uenosy/items/2f6b1aa258018d3db76c https://qiita.com/ueda_st_/items/bcbaf3b7c97c5102af4a https://beta-notes.way-nifty.com/blog/2020/03/post-c320f1.html https://www.mongodb.com/docs/manual/tutorial/install-mongodb-on-ubuntu/ https://kennejs.com/entry/mongodb-compass-howtouse#documentfindfilter