database_utils module

Module Name: Wassermonitor2 API database functions

Description:

This file provides the database functions for the wassermonitor API.

It includes functions for:
  • Insert measurement data into the database

  • Read data from database.

Dependencies:
  • sqlite3 (for sqlite support)

  • pymysql (for mysql support)

  • scipy.signal (for signale processing)

Configuration:
  • Some parameters can be configured in the config_file ../config.cfg.

Author:
database_utils.assign_color(value, warn, alarm)[source]
database_utils.assign_sign(value, warn, alarm, dt)[source]
database_utils.convert_nan_to_none(x)[source]
database_utils.create_sqlite_database(conn, cur)[source]

Creates the necessary tables in the SQLite3 database if they do not already exist.

This function checks the SQLite3 database and creates the following tables if they are not already present: - meas_point: Stores measurement point data, including the point’s ID and name. - sensor: Stores sensor data, including the sensor’s ID, measurement point ID, name, tank height, maximum value, warning threshold, and alarm threshold. - measurement: Stores measurement data, including the measurement’s ID, datetime, sensor ID, and a comment. - meas_val: Stores measurement values, including the value of the measurement and any associated comment. - messages: Stores message data, including a timestamp, signal and email targets, message text, and alarm/warning flags.

Parameters:
  • conn (sqlite3.Connection) – The SQLite3 connection object.

  • cur (sqlite3.Cursor) – The SQLite3 cursor object.

Raises:

Error – If there is an error while executing the SQL commands.

Example usage:

conn, cur = get_sqlite3_connection('example.db')
create_sqlite_database(conn, cur)
database_utils.datetime_to_hours(x)[source]
database_utils.get_all_sqlite_files(path)[source]

Retrieve all SQLite files from a specified directory, sorted by their date.

This function scans the specified directory for files with a .sqlite extension, parses their filenames as dates in the format MM-YYYY, and returns a sorted list of filenames in the same format.

Parameters:

path (str) – The directory path to scan for SQLite files.

Returns:

A list of filenames (str) with the .sqlite extension, sorted by their date in ascending order.

Return type:

list

Raises:

ValueError – If a file with a .sqlite extension does not conform to the MM-YYYY format.

Example usage:

Suppose the directory `/path/to/sqlite/` contains the following files:
- `01-2024.sqlite`
- `03-2023.sqlite`
- `12-2023.sqlite`

Calling the function:
>>> sqlite_files = get_all_sqlite_files('/path/to/sqlite/')
>>> print(sqlite_files)

Would output:
['03-2023.sqlite', '12-2023.sqlite', '01-2024.sqlite']
database_utils.get_available_meas_points_from_sqlite_db(db_conf)[source]

Retrieve a list of available measurement points from SQLite databases.

This function scans all SQLite database files in the specified path, executes a query to retrieve distinct measurement point names, and compiles them into a unique list.

Parameters:

db_conf (dict) –

A dictionary containing the database configuration. It must include the following keys:

  • engine (str): The database engine, which must be set to sqlite.

  • sqlite_path (str): The directory path containing SQLite database files.

Returns:

A list of unique measurement point names.

Return type:

list

Raises:

ValueError – If the database engine specified in db_conf is not sqlite.

Note

  • The function uses get_all_sqlite_files to retrieve all SQLite database file names in the directory specified by db_conf['sqlite_path'].

  • The function requires an external helper get_sqlite3_connection to establish SQLite connections.

Example usage:

db_conf = {
    'engine': 'sqlite',
    'sqlite_path': '/path/to/sqlite/files/'
}
measurement_points = get_available_meas_points_from_sqlite_db(db_conf)
print(measurement_points)
database_utils.get_last_meas_data_from_sqlite_db(db_conf)[source]

Retrieves the most recent measurement data from a SQLite database.

This function connects to a SQLite database, executes an SQL query to fetch the latest measurement data for each sensor, and processes the results into a nested dictionary structure. The dictionary is organized by measurement point names and sensor names, and contains information about the measurement datetime, warning and alarm thresholds, maximum allowed values, and calculated values (the difference between the maximum value and the actual value).

Parameters:

db_conf – A dictionary containing the database configuration. It should have the following keys: - ‘engine’: Should be ‘sqlite’ for this function to work. - ‘sqlite_path’: The file path to the SQLite database directory.

Returns:

A nested dictionary structure with measurement data. The structure is as follows:

output[measurement_point_name][sensor_name] = {

‘dt’: datetime, # Measurement timestamp ‘warn’: warning_threshold, # Warning threshold ‘alarm’: alarm_threshold, # Alarm threshold ‘tank_height’: tank_height, # Height of the tank ‘max_val’: max_value, # Maximum allowed value for the sensor ‘value’: calculated_value, # Difference between tank_height and actual value ‘color’: assigned_color # Color assigned based on value and thresholds

}

Raises:
  • ValueError – If the ‘engine’ in db_conf is not ‘sqlite’.

  • FileNotFoundError – If the SQLite database file does not exist.

Example usage:

db_conf = {
    'engine': 'sqlite',
    'sqlite_path': '/path/to/db/'
}

result = get_last_meas_data_from_sqlite_db(db_conf)
database_utils.get_latest_database_file(path)[source]

Retrieve the latest SQLite database file from a given directory based on its timestamp.

This function searches the specified directory for files with the .sqlite extension, extracts the timestamp from the filename (in the format MM-YYYY), and identifies the most recent file.

Parameters:

path (str) – The directory path containing the SQLite files.

Returns:

The filename of the most recent SQLite database, formatted as MM-YYYY.sqlite.

Return type:

str

Raises:

ValueError – If no .sqlite files are found in the specified directory.

Example usage:

latest_file = get_latest_database_file('/path/to/sqlite/files')
print(f"The latest database file is: {latest_file}")
database_utils.get_meas_data_from_sqlite_db(db_conf, dt_begin=None, dt_end=None)[source]

Retrieve measurement data from SQLite database within a specified date range.

This function is designed specifically for SQLite databases. It queries the data for a given date range (dt_begin to dt_end) and calculates additional metrics such as the slope and derivation of measurements. The results are returned as a pandas DataFrame.

Parameters:
  • db_conf (dict) – Configuration dictionary containing database connection settings. Must include the key engine with value ‘sqlite’ and sqlite_path specifying the path to the database files.

  • dt_begin (datetime, optional) – Start of the date range for the query. If not provided, defaults to 60 days before dt_end.

  • dt_end (datetime, optional) – End of the date range for the query. If not provided, defaults to the current time in UTC.

Returns:

A DataFrame containing the queried data with the following columns: - mid: Measurement ID - dt: Timestamp of the measurement - mpName: Measurement point name - sensorId: Sensor ID - tank_height: Height of the tank - max_val: Maximum value for the sensor - warn: Warning threshold - alarm: Alarm threshold - meas_val: Measured value - slope: Gradient of measured values over time - derivation: Derived metric calculated as -slope / slope_date - value: Difference between tank_height and meas_val, rounded to 1 decimal place

Return type:

pd.DataFrame

Raises:

ValueError – If the database engine is not SQLite, or if the inputs dt_begin or dt_end are not of type datetime, or if dt_begin is after dt_end.

Note

  • The function splits the query by months and looks for SQLite files in the paths specified by db_conf[‘sqlite_path’].

  • Requires external helper functions:
    • get_months_between(dt_begin, dt_end) to determine months in the range.

    • get_sqlite3_connection(db_path) to establish SQLite connections.

Example usage:

from datetime import datetime

db_conf = {
    'engine': 'sqlite',
    'sqlite_path': '/path/to/sqlite/files'
}
dt_begin = datetime(2024, 1, 1)
dt_end = datetime(2024, 2, 1)

result = get_meas_data_from_sqlite_db(db_conf, dt_begin, dt_end)
print(result.head())
database_utils.get_months_between(start_date, end_date)[source]

Generate a list of months between two datetime objects in “%m-%Y” format.

Parameters:
  • start_date (datetime) – The start date.

  • end_date (datetime) – The end date.

Returns:

A list of strings representing the months in “%m-%Y” format.

Return type:

list

Raises:

ValueError – If start_date is later than end_date.

Example usage:

from datetime import datetime

start_date = datetime(2023, 1, 1)
end_date = datetime(2023, 5, 1)

months = get_months_between(start_date, end_date)
print(months)  # Output: ['01-2023', '02-2023', '03-2023', '04-2023', '05-2023']
database_utils.get_mysql_connection(conf)[source]

Establishes a connection to a MySQL database and returns the connection and cursor objects.

Parameters:

conf (dict) – A dictionary containing the database connection parameters: - ‘host’ (str): The hostname or IP address of the MySQL server. - ‘user’ (str): The username to use for authentication. - ‘pass’ (str): The password to use for authentication. - ‘db’ (str): The name of the database to connect to.

Returns:

A tuple containing: - conn: The MySQL connection object. - cur: The MySQL cursor object.

Return type:

tuple

Example usage:

conf = {'host': 'localhost', 'user': 'root', 'pass': 'password', 'db': 'test_db'}
conn, cur = get_mysql_connection(conf)
database_utils.get_sqlite3_connection(db_file)[source]

Establishes a connection to an SQLite3 database, creates the database if it doesn’t exist, and returns the connection and cursor objects.

Parameters:

db_file (str) – The file path to the SQLite3 database file.

Returns:

A tuple containing: - conn: The SQLite3 connection object. - cur: The SQLite3 cursor object.

Return type:

tuple

Example usage:

db_file = 'example.db'
conn, cur = get_sqlite3_connection(db_file)
database_utils.get_sqlite3_file_name_from_conf(dt)[source]

Generates an SQLite3 file name based on the provided datetime object.

Parameters: dt (datetime): The datetime object used to generate the file name.

Returns: str: The generated SQLite3 file name in the format “month-year.sqlite”. bool: Returns False if the input is not a datetime object.

Example: >>> dt = datetime(2024, 12, 5) >>> get_sqlite3_file_name_from_conf(dt) ‘12-2024.sqlite’

database_utils.insert_and_get_id(db_conf, dt, sql, sql_args)[source]

Inserts a record into the database and returns the ID of the inserted record.

Parameters: db_conf (dict): A dictionary containing the database configuration parameters: - ‘engine’ (str): The type of database engine (e.g., “sqlite”). - ‘sqlite_path’ (str): The path to the SQLite database files.

dt (datetime): The datetime object used to generate the SQLite file name. sql (str): The SQL query to execute. sql_args (tuple): The arguments to pass to the SQL query.

Returns: int: The ID of the inserted record if successful. Exception: The exception object if an error occurs.

Example: >>> db_conf = {‘engine’: ‘sqlite’, ‘sqlite_path’: ‘/path/to/db/’} >>> dt = datetime.now(timezone.utc) with timezone utc >>> sql = “INSERT INTO measurement (dt, pi_name) VALUES (?, ?)” >>> sql_args = (dt, ‘sensor_1’) >>> insert_id = insert_and_get_id(db_conf, dt, sql, sql_args)

database_utils.insert_value(db_conf, val_dict)[source]

Inserts a new measurement and associated values into the SQLite database.

This function inserts a new measurement record into the database, along with associated values, such as sensor values and their corresponding timestamps. It first retrieves or generates the necessary sensor and measurement point IDs, then creates a new measurement entry, and finally inserts the actual measurement values.

Parameters:
  • db_conf – A dictionary containing the database configuration. It should have the following keys: - ‘engine’: Should be ‘sqlite’ for this function to work. - ‘sqlite_path’: The file path to the SQLite database directory.

  • val_dict – A dictionary containing the measurement data to insert. It should have the following keys: - ‘datetime’: ISO formatted datetime string for the measurement timestamp. - ‘meas_point’: The name of the measurement point. - ‘sensor_name’: The name of the sensor. - ‘max_val’: The maximum allowed value for the sensor. - ‘warn’: The warning threshold for the sensor. - ‘alarm’: The alarm threshold for the sensor. - ‘values’: A list of sensor values to insert.

Returns:

Always returns False. The return value is not used in this function.

Raises:
  • ValueError – If the necessary database configuration or values are invalid.

  • sqlite3.Error – If any SQLite database errors occur during insertion.

Example usage:

db_conf = {
    'engine': 'sqlite',
    'sqlite_path': '/path/to/db/'
}
val_dict = {
    'datetime': '2024-12-15T10:00:00',
    'meas_point': 'Temperature',
    'sensor_name': 'Sensor1',
    'tank_height': 120,
    'max_val': 100.0,
    'warn': 80.0,
    'alarm': 90.0,
    'values': [75.0, 76.0, 77.5]
}
result = insert_value(db_conf, val_dict)
database_utils.sqlite_get_meas_point_id(db_conf, mp_name, dt)[source]

Retrieves or inserts a measurement point ID based on the measurement point name.

This function checks if a measurement point with the given name already exists in the SQLite database. If the measurement point exists, it retrieves the corresponding ID. If it does not exist, the function inserts a new record for the measurement point and returns the newly inserted ID.

Parameters:
  • db_conf – A dictionary containing the database configuration. It should have the following keys: - ‘engine’: Should be ‘sqlite’ for this function to work. - ‘sqlite_path’: The file path to the SQLite database directory.

  • mp_name – The name of the measurement point.

  • dt – The datetime object used to derive the SQLite file name from the configuration.

Returns:

The ID of the measurement point. If the point does not exist, it is created and the new ID is returned.

Raises:

Error – If any SQLite database errors occur during the query or insertion.

Example usage:

db_conf = {
    'engine': 'sqlite',
    'sqlite_path': '/path/to/db/'
}
mp_name = 'Temperature'
dt = datetime(2024, 12, 15)
mp_id = sqlite_get_meas_point_id(db_conf, mp_name, dt)
database_utils.sqlite_get_sensor_id(db_conf, mp_id, s_name, s_tank_height, s_max_val, s_warn, s_alarm, dt)[source]

Retrieves or inserts a sensor ID based on the sensor details.

This function checks if a sensor with the given name, measurement point ID, maximum value, warning threshold, and alarm threshold already exists in the SQLite database. If the sensor exists, it retrieves the corresponding ID. If it does not exist, the function inserts a new record for the sensor and returns the newly inserted ID.

Parameters:
  • db_conf (dict) – A dictionary containing the database configuration. It should have the following keys: - engine (str): Should be ‘sqlite’ for this function to work. - sqlite_path (str): The file path to the SQLite database directory.

  • mp_id (int) – The ID of the measurement point to which the sensor is associated.

  • s_name (str) – The name of the sensor.

  • s_tank_height (float) – The height of the tank.

  • s_max_val (float) – The maximum allowed value for the sensor.

  • s_warn (float) – The warning threshold for the sensor.

  • s_alarm (float) – The alarm threshold for the sensor.

  • dt (datetime) – The datetime object used to derive the SQLite file name from the configuration.

Returns:

The ID of the sensor. If the sensor does not exist, it is created and the new ID is returned.

Return type:

int

Raises:

Error – If any SQLite database errors occur during the query or insertion.

Example:
db_conf = {

‘engine’: ‘sqlite’, ‘sqlite_path’: ‘/path/to/db/’

}

mp_id = 1

s_name = ‘TemperatureSensor’

s_tank_height = 120

s_max_val = 100.0

s_warn = 80.0

s_alarm = 90.0

dt = datetime(2024, 12, 15)

s_id = sqlite_get_sensor_id(db_conf, mp_id, s_name, s_max_val, s_warn, s_alarm, dt)