Source code for smartgadget

"""
Communicate with a Sensirion SHTxx Smart Gadget.
"""
import logging
from math import exp, log10
from datetime import datetime

from msl.network import manager, ssh

__author__ = 'Measurement Standards Laboratory of New Zealand'
__copyright__ = '\xa9 2019 - 2022, ' + __author__
__version__ = '0.1.0.dev0'

# if you change this value then you must also update the name of the
# virtual environment that is created in rpi-setup.sh
RPI_EXE_PATH = 'shtenv/bin/smartgadget'

logger = logging.getLogger(__package__)


[docs]def connect(*, host='raspberrypi', rpi_username='pi', rpi_password=None, timeout=10, **kwargs): """Connect to the :class:`~smartgadget.sht3x.SHT3XService` on the Raspberry Pi. Parameters ---------- host : :class:`str`, optional The hostname or IP address of the Raspberry Pi. rpi_username : :class:`str`, optional The username for the Raspberry Pi. rpi_password : :class:`str`, optional The password for `rpi_username`. timeout : :class:`float`, optional The maximum number of seconds to wait for the connection. kwargs Keyword arguments that are passed to :func:`~msl.network.manager.run_services`. Returns ------- :class:`~smartgadget.client.SmartGadgetClient` A connection to the :class:`~smartgadget.sht3x.SHT3XService` on the Raspberry Pi. """ console_script_path = '/home/{}/{}'.format(rpi_username, RPI_EXE_PATH) ssh.start_manager(host, console_script_path, ssh_username=rpi_username, ssh_password=rpi_password, timeout=timeout, as_sudo=True, **kwargs) kwargs['host'] = host return SmartGadgetClient('Smart Humigadget', **kwargs)
[docs]def start_service_on_rpi(): """Starts the Network :class:`~msl.network.manager.Manager` and the :class:`~smartgadget.sht3x.SHT3XService`. This function should only be called from the ``smartgadget`` console script (see setup.py). """ kwargs = ssh.parse_console_script_kwargs() if kwargs.get('auth_login', False) and ('username' not in kwargs or 'password' not in kwargs): raise ValueError( 'The Manager is using a login for authentication but the SmartGadgetService ' 'does not know the username and password to use to connect to the Manager' ) interface = kwargs.pop('interface', None) manager.run_services(SHT3XService(interface=interface), **kwargs)
[docs]def kill_manager(*, host='raspberrypi', rpi_username='pi', rpi_password=None, timeout=10, **kwargs): """Kill the Network :class:`~msl.network.manager.Manager` on the Raspberry Pi. Parameters ---------- host : :class:`str`, optional The hostname or IP address of the Raspberry Pi. rpi_username : :class:`str`, optional The username for the Raspberry Pi. rpi_password : :class:`str`, optional The password for `rpi_username`. timeout : :class:`float`, optional The maximum number of seconds to wait for the connection. kwargs Keyword arguments that are passed to :meth:`~paramiko.client.SSHClient.connect`. """ ssh_client = ssh.connect(host, username=rpi_username, password=rpi_password, timeout=timeout, **kwargs) lines = ssh.exec_command(ssh_client, 'ps aux | grep smartgadget') pids = [line.split()[1] for line in lines if RPI_EXE_PATH in line] for pid in pids: try: ssh.exec_command(ssh_client, 'sudo kill -9 ' + pid) except: pass ssh_client.close()
[docs]def dewpoint(temperature, humidity): """Calculate the dew point. Parameters ---------- temperature : :class:`float` The temperature [degree C]. humidity : :class:`float` The humidity [%RH]. Returns ------- :class:`float` The dew point [degree C]. """ # TODO get formula from JLS. # For now use Equation 7 from # https://www.vaisala.com/sites/default/files/documents/Humidity_Conversion_Formulas_B210973EN.pdf if temperature < -20 or temperature > 350: # the Equation 7 is only valid between -20 and +350 degree C raise ValueError('temperature={} is not between -20 and +350 degree C'.format(temperature)) # calculate Pws using Equation 3 C1 = -7.85951783 C2 = 1.84408259 C3 = -11.7866497 C4 = 22.6807411 C5 = -15.9618719 C6 = 1.80122502 Pc = 220640. Tc = 647.096 kelvin = temperature + 273.15 x = 1.0 - kelvin / Tc y = (Tc / kelvin) * (C1 * x + C2 * x**1.5 + C3 * x**3 + C4 * x**3.5 + C5 * x**4 + C6 * x**7.5) Pws = Pc * exp(y) # calculate Pw using Equation 1 Pw = Pws * humidity / 100. # calculate the dew point using Equation 7 if -20 <= temperature <= 50: A, m, Tn = 6.116441, 7.591386, 240.7263 elif 50 < temperature < 100: A, m, Tn = 6.004918, 7.337936, 229.3975 elif 100 <= temperature <= 150: A, m, Tn = 5.856548, 7.277310, 225.1033 elif 150 < temperature <= 200: A, m, Tn = 6.002859, 7.290361, 227.1704 elif 200 < temperature <= 350: A, m, Tn = 9.980622, 7.388931, 263.1239 else: assert False, 'should never get here' return Tn / (m / log10(Pw / A) - 1.0)
[docs]def timestamp_to_milliseconds(obj): """Convert an object into a timestamp in milliseconds. Parameters ---------- obj A :class:`~datetime.datetime` object, an ISO-8601 formatted :class:`str`, a :class:`float` in seconds, or an :class:`int` in milliseconds. Returns ------- :class:`int` The timestamp in milliseconds. """ if isinstance(obj, int): # in milliseconds return obj if isinstance(obj, float): # in seconds return round(obj * 1e3) if isinstance(obj, str): # an ISO-8601 string string = obj.replace('T', ' ') fmt = '%Y-%m-%d %H:%M:%S' if '.' in string: fmt += '.%f' obj = datetime.strptime(string, fmt) return round(obj.timestamp() * 1e3)
[docs]def milliseconds_to_datetime(milliseconds): """Convert a timestamp in milliseconds to a :class:`~datetime.datetime`. Parameters ---------- milliseconds : :class:`int` A timestamp in milliseconds. Returns ------- :class:`~datetime.datetime` The `milliseconds` converted to a :class:`~datetime.datetime` object. """ return datetime.fromtimestamp(milliseconds * 1e-3)
[docs]def scan(*, interface=0, delegate=None, timeout=10, passive=False): """Scan for Bluetooth devices. This function can only be called if the `bluepy` package is installed. Parameters ---------- interface : :class:`int`, optional The Bluetooth interface to use (where 0 is /dev/hci0). delegate : :ref:`DefaultDelegate <delegate>`, optional Receives a callback when broadcasts from devices are received. timeout : :class:`float`, optional Scan for devices for the given timeout in seconds. During this period, callbacks to the `delegate` object will be called. passive : :class:`bool`, optional Use active (to obtain more information when connecting) or passive scanning. Returns ------- A `view` of :ref:`ScanEntry <scanentry>` objects. """ from bluepy.btle import Scanner scanner = Scanner(interface) if delegate: scanner.withDelegate(delegate) return scanner.scan(timeout=timeout, passive=passive)
from .client import SmartGadgetClient from .sht3x import SHT3XService from .shtc1 import SHTC1Service