# mvm_smart_meter/smart_meter.py
import os
import requests
import json
from datetime import datetime, timedelta
from requestium import Session, Keys
from selenium import webdriver
from webdriver_manager.firefox import GeckoDriverManager
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.firefox.options import Options
import pandas
import io
[docs]class GuidNotFoundException(Exception):
"""
Exception raised when the GUID is not found in a URL.
:param message: Custom error message describing the exception. Defaults to "Guid not found in URL".
:type message: str
:param url: URL where the GUID was not found. Defaults to None.
:type url: str
:ivar message: Custom error message describing the exception.
:ivar url: URL where the GUID was not found.
"""
def __init__(self, message="Guid not found in URL", url=None):
"""
Initialize a GuidNotFoundException instance.
:param message: Custom error message. Defaults to "Guid not found in URL".
:type message: str
:param url: URL where the GUID was not found. Defaults to None.
:type url: str
"""
self.message = message
self.url = url
super().__init__(f"{self.message}: {url}")
[docs]class Smart_meter:
"""All the main function to gather date from the smart metering site is gathered here"""
def __init__(self, username: str, password: str):
self.options = Options()
self.options.add_argument("--headless")
agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/114.0"
self.options.set_preference("general.useragent.override", agent)
if gecko_driver_path := os.getenv("GECKO_DRIVER_PATH"):
gecko_executable_path = gecko_driver_path
else:
gecko_executable_path = GeckoDriverManager().install()
self.service = Service(executable_path=gecko_executable_path)
self.firefox_driver = webdriver.Firefox(
service=self.service, options=self.options
)
self.s = Session(driver=self.firefox_driver)
self.base_url = "https://eloszto.mvmemaszhalozat.hu"
self.username = username
self.password = password
[docs] def get_base_cookies(self):
"""Gets cookies from the main site to be used for later"""
r = self.s.get(self.base_url)
response_url = r.url
split_url = response_url.split("(")[1]
self.sap_id = split_url.split(")")[0]
# print(r.url)
[docs] def get_login_cookies(self):
"""Log's into the main site, gets the cookies and the AuthCode for grabbing the token later on."""
url = "https://eloszto.mvmemaszhalozat.hu/sap/opu/odata/sap/ZGW_UGYFELSZLG_CMS_NO_AUTH_SRV/Szovegek"
querystring = {
"sap-client": "112",
"sap-language": "HU",
"$filter": "Funkcio eq 'AKADALYMEN'",
}
r = self.s.get(url, params=querystring)
login_url = "https://eloszto.mvmemaszhalozat.hu/sap/opu/odata/sap/ZGW_UGYFELSZOLGALAT_LOGIN_SRV/Login"
querystring = {"sap-client": "112", "sap-language": "HU"}
payload = {"Username": self.username, "Password": self.password}
headers = {
"Accept": "application/json",
"X-Requested-With": "X",
"Content-Type": "application/json",
}
self.s.cookies.set("cookiePanelAccepted", "1")
r = self.s.post(login_url, json=payload, headers=headers, params=querystring)
r_dict = r.json()
# print(r.text)
self.authcode = r_dict["d"]["AuthCode"]
[docs] def get_token(self):
"""Gets Oauth 2.0 token to be used on the main site for authorization"""
token_url = "https://eloszto.mvmemaszhalozat.hu/sap/opu/odata/sap/ZGW_OAUTH_SRV/GetToken"
querystring = {
"Code": f"'{self.authcode}'",
"sap-client": "112",
"sap-language": "HU",
}
self.headers = {
"Accept": "application/json",
"X-Requested-With": "X",
"Content-Type": "application/json",
}
r = self.s.get(token_url, headers=self.headers, params=querystring)
token_r_dict = r.json()
self.token = token_r_dict["d"]["GetToken"]["TokenCode"]
# print(r.status_code)
[docs] def get_custumer_data(self):
"""Gets custumer data as it's needs to be provided in the url's later on."""
custumer_number_url = "https://eloszto.mvmemaszhalozat.hu/sap/opu/odata/sap/ZGW_UGYFELSZOLGALAT_SRV/Vevok"
self.headers["Authorization"] = f"Bearer {self.token}"
querystring = {"Funkcio": "OKOSMERO", "sap-client": "112", "sap-language": "HU"}
r = self.s.get(custumer_number_url, headers=self.headers, params=querystring)
self.custumer_number = r.json()["d"]["results"][0]["Id"]
# print(r.status_code)
custumer_id_url = f"https://eloszto.mvmemaszhalozat.hu/sap/opu/odata/sap/ZGW_UGYFELSZOLGALAT_SRV/Vevok('{self.custumer_number}')/Felhelyek"
querystring = {"Funkcio": "OKOSMERO", "sap-client": "112", "sap-language": "HU"}
r = self.s.get(custumer_id_url, headers=self.headers, params=querystring)
self.customer_id = r.json()["d"]["results"][0]["Id"]
# print(r.status_code)
[docs] def get_smart_meter_data(self):
"""The smart metering site is on a external site, this function gets thoose links into a list."""
custumer_meters_url = f"https://eloszto.mvmemaszhalozat.hu/sap/opu/odata/sap/ZGW_UGYFELSZOLGALAT_SRV/Felhelyek(Vevo='{self.custumer_number}',Id='{self.customer_id}')/Okosmero"
querystring = {"sap-client": "112", "sap-language": "HU"}
r = self.s.get(custumer_meters_url, headers=self.headers, params=querystring)
r_list = r.json()["d"]["results"]
self.meter_ids = r.json()["d"]["results"]
self.smart_meter_links = []
r_list_lenght = len(r_list)
for link in r_list:
if link["URL"].find("guid=&") == -1:
split_url = link["URL"].split("?")
query_string = split_url[1]
query_string_list = query_string.split("&")
query_dict = {"url": split_url[0], "meter_id": link["FogyMeroAzon"]}
for item in query_string_list:
key, value = item.split("=")
query_dict[key] = value
self.smart_meter_links.append(query_dict)
else:
r_list_lenght -= 1
if r_list_lenght == 0:
raise GuidNotFoundException(url=link)
[docs] def get_cookies_smart_meter_site(self):
"""Get cookies from the main site.Need to find a workaround as requestium uses old version of selenium."""
# TODO get around without using selenium
self.smart_meter_url = f"{self.smart_meter_links[0]['url']}"
# print(f"Smart meter link : {smart_meter_url}")
self.sap_client = "100"
self.guid = self.smart_meter_links[0]["guid"]
self.s.transfer_session_cookies_to_driver(
domain=f"{self.smart_meter_url}?guid={self.guid}&sap-client={self.sap_client}"
)
# print(f"{smart_meter_url}?guid={guid}&sap-client={sap_client}")
response = self.s.driver.get(
f"{self.smart_meter_url}?guid={self.guid}&sap-client={self.sap_client}"
)
# print(response.text)
smart_meter_site_data_url = self.s.driver.current_url
split_url = smart_meter_site_data_url.split("(")[1]
self.sap_id = split_url.split(")")[0]
self.s.transfer_driver_cookies_to_session()
self.firefox_driver.quit()
[docs] def smart_site_accept_cookes(self):
"""Accapts the cookies on the smart metering site."""
first_page_url = f"{self.smart_meter_url}({self.sap_id})/oldal_1.htm"
data = {
"accept": "on",
"OnInputProcessing(tovabb)": "",
}
# print(first_page_url)
self.s.headers.update(
{
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Origin": "https://eloszto.mvmemaszhalozat.hu",
"Upgrade-Insecure-Requests": "1",
"Referer": f"https://eloszto.mvmemaszhalozat.hu/SMMU({self.sap_id})/oldal_1.htm",
}
)
r = self.s.post(first_page_url, data=data)
[docs] def get_ldc(self):
"""Loads the load curve site, as data is stored in html."""
second_page_url = f"{self.smart_meter_url}({self.sap_id})/Oldal_2.htm?OnInputProcessing(ToTerhGor1)"
# print(second_page_url)
r = self.s.get(second_page_url)
# print(r.status_code)
# print(r.text)
[docs] def set_date_for_ldc(self, date_from: str, date_to: str):
"""Function for posting dates, as load curve data is stored in the html.
:param date_from: date where to start
:type date_from: str
:param date_to: date where to end
:type date_to: str
"""
data = {
"azonosito": self.smart_meter_links[0]["meter_id"],
"tipus": "Fogyasztás",
"idoszak_tol_mero": date_from,
"idoszak_ig_mero": date_to,
"mertekegyseg": "kWh",
"profil": "KIS_LAKOSSAG",
"OnInputProcessing(elkuld)": "Adatok frissítése",
}
r = self.s.post(f"{self.smart_meter_url}({self.sap_id})/Oldal_3.htm", data=data)
# print(r.status_code)
[docs] def download_ldc_data(self) -> object:
"""This function downloads the data inbetwen the previusly set dates.
:return: returns text object
:rtype: object
"""
smart_meter_page3_url = (
f"{self.smart_meter_url}({self.sap_id})/showPDF.htm?type=1"
)
query_string = {"type": "1"}
r = self.s.get(smart_meter_page3_url)
# print(r.history)
# print(r.status_code)
r.encoding = "ISO-8859-1"
return r.text
[docs]def data_to_dataframe(data: object) -> pandas.DataFrame:
"""Converts html/text request to dataframe
:param data: Text from request
:type data: object
:return: Returns a dataframe
:rtype: pandas.DataFrame
"""
columns = [
"serial_number",
"id",
"date",
"time",
"imported",
"import_amount",
"import_state",
"import_state_desc",
"exported",
"exported_amount",
"export_state",
"export_state_desc",
"saldo",
"saldo_amount",
"saldo_state",
"saldo_state_desc",
]
return pandas.read_csv(
io.StringIO(data),
sep=";",
header=None,
skiprows=1,
names=columns,
)
[docs]def clean_data(df: pandas.DataFrame) -> pandas.DataFrame:
"""This function can clean up text response
:param df: Dataframe to be cleaned
:type df: pandas.DataFrame
:return: Return a cleaned up dataframe
:rtype: pandas.DataFrame
"""
for col in df.columns:
df[col] = df[col].map(lambda x: x.lstrip('="').rstrip('"'))
df["datetime"] = df["date"] + " " + df["time"]
df["datetime"] = pandas.to_datetime(df["datetime"])
df = df.drop(columns=["date", "time"])
df_colums = list(df.columns)
items_to_keep = ["datetime", "imported", "saldo", "exported"]
for col in items_to_keep:
df_colums.remove(col)
df = df.drop(columns=df_colums)
for col in ["imported", "saldo", "exported"]:
df[col] = df[col].str.replace(",", ".").str.replace(" ", "").astype(float)
return df
[docs]def get_load_curve(
username: str, password: str, date_from=None, date_to=None, raw_data: bool = False
) -> pandas.DataFrame:
"""Get's load curve data from smart metering site into a Dataframe
:param username: Username for smart metering site
:type username: str
:param password: Password for smart metering site
:type password: str
:param date_from: Usualy date_from and date_to is the same date., defaults to None
:type date_from: _type_, optional
:param date_to: Usualy date_from and date_to is the same date., defaults to None
:type date_to: _type_, optional
:param raw_data: Flag for cleaning up response txt, defaults to False
:type raw_data: bool, optional
:return: Returns load curve data between set dates into a Dataframe
:rtype: pandas.DataFrame
"""
smart_meter = Smart_meter(username, password)
smart_meter.get_base_cookies()
smart_meter.get_login_cookies()
smart_meter.get_token()
smart_meter.get_custumer_data()
smart_meter.get_smart_meter_data()
smart_meter.get_cookies_smart_meter_site()
smart_meter.smart_site_accept_cookes()
smart_meter.get_ldc()
if date_from and date_to != None:
smart_meter.set_date_for_ldc(date_from=date_from, date_to=date_to)
load_curve_data = smart_meter.download_ldc_data()
load_curve_df = data_to_dataframe(data=load_curve_data)
return load_curve_df if raw_data else clean_data(load_curve_df)
[docs]def get_all_load_curve(
username: str, password: str, date_from=None, date_to=None, raw_data: bool = False
) -> pandas.DataFrame:
"""It's similar to get_load_curve(), but this function was intented to get all the available data from the smart matering site day by day
:param username: Username for smart metering site
:type username: str
:param password: Password for smart metering site
:type password: str
:param date_from: Start to data where to start from, defaults to None
:type date_from: _type_, optional
:param date_to: End date where to stop, defaults to None
:type date_to: _type_, optional
:param raw_data: Flag for cleaning up response txt, defaults to False
:type raw_data: bool, optional
:return: Return all valid daily load curve in Dataframe
:rtype: pandas.DataFrame
"""
smart_meter = Smart_meter(username, password)
smart_meter.get_base_cookies()
smart_meter.get_login_cookies()
smart_meter.get_token()
smart_meter.get_custumer_data()
smart_meter.get_smart_meter_data()
smart_meter.get_cookies_smart_meter_site()
smart_meter.smart_site_accept_cookes()
smart_meter.get_ldc()
if date_from and date_to != None:
dates = date_list(date_from, date_to)
# print(dates)
df_list = []
for date in dates:
print(date)
smart_meter.set_date_for_ldc(date_from=date, date_to=date)
load_curve_data = smart_meter.download_ldc_data()
load_curve_df = data_to_dataframe(data=load_curve_data)
load_curve_df = clean_data(load_curve_df)
if validate_df(df_to_validate=load_curve_df):
df_list.append(load_curve_df)
# print("done")
# print(df_list)
return pandas.concat(df_list, axis=0)
[docs]def validate_df(df_to_validate: pandas.DataFrame) -> bool:
"""Usualy smart metering site stores daily data for 3 months, after that all the daily data is deleted.Helper function for get_all_load_curve() to validate the date is valid on that date.
:param df_to_validate: Dataframe which validation is performed
:type df_to_validate: pandas.DataFrame
:return: Returns True when the Dataframe is valid
:rtype: bool
"""
return (
sum(
[
df_to_validate.iloc[[48]].imported.item()
+ df_to_validate.iloc[[48]].exported.item()
+ df_to_validate.iloc[[48]].saldo.item()
]
)
> 0.0
)
[docs]def sum_load_curve(load_curve_df: pandas.DataFrame) -> pandas.DataFrame:
"""Summation function for load curve data set
:param load_curve_df: Dataframe which summoned
:type load_curve_df: pandas.DataFrame
:return: Returns summed dataset
:rtype: pandas.DataFrame
"""
return load_curve_df[["imported", "exported", "saldo"]].sum()
[docs]def date_list(date_from: str, date_to: str) -> list:
"""Makes a list with date's between date_from and date_to
:param date_from: date_from in string
:type date_from: str
:param date_to: date_to in string
:type date_to: str
:return: Returns a list with all the dates
:rtype: list
"""
datetime_from = datetime.strptime(date_from, "%Y.%m.%d").date()
datetime_to = datetime.strptime(date_to, "%Y.%m.%d").date()
date_delta = (datetime_to - datetime_from).days
return [
(datetime_from + timedelta(days=n)).strftime("%Y.%m.%d")
for n in range(date_delta + 1)
]
[docs]def main():
"""Main funtion intented for debuging"""
import dotenv
env_path = "./env/"
env_file = f"{env_path}config.env"
dotenv.find_dotenv(env_file, raise_error_if_not_found=True)
dotenv.load_dotenv(env_file)
username = os.getenv("USERNAME")
password = os.getenv("PASSWORD")
df = get_all_load_curve(
username=username,
password=password,
date_from="2022.10.01",
date_to="2023.02.08",
)
df.to_pickle("./df.pkl")
if __name__ == "__main__":
main()