import mysql.connector
import os
from prettytable import PrettyTable
import configparser
import xml2obj
from utility_accert import Utility_methods
from Algorithm import Algorithm
import importlib
import numpy as np
import sys
import pandas.io.sql as sql
import pandas as pd
import warnings
from typing import Union
warnings.filterwarnings('ignore')
PathLike = Union[str, bytes, os.PathLike]
class Accert:
[docs]
def __init__(self, input_path, accert_path):
"""
Initialize the Accert class.
Parameters
----------
input_path : PathLike
Inputs file path.
accert_path: PathLike
ACCERT's repository path.
"""
self.input_path = input_path
self.accert_path = accert_path
self.input = self.load_obj(self.input_path, self.accert_path)
self.ref_model = None
self.acc_tabl = None
self.cel_tabl = None
self.var_tabl = None
self.alg_tabl = None
self.esc_tabl = None
self.fac_tabl = None
self.use_gncoa = False
self.gncoa_map = 'gncoamapping'
[docs]
def setup_table_names(self,xml2obj):
"""Setup different table names in the database.
Parameters
----------
xml2obj : xml2obj
xml2obj class instantiates objects that can convert son file to xml stream and create python data structure.
Returns
-------
None
"""
if xml2obj.use_gncoa is not None:
self.use_gncoa = str(xml2obj.use_gncoa.value).lower() == 'true'
if "abr1000" in str(xml2obj.ref_model.value).lower():
self.ref_model = 'abr1000'
self.acc_tabl = 'abr_account'
self.cel_tabl = 'abr_cost_element'
self.var_tabl = 'abr_variable'
self.alg_tabl = 'algorithm'
self.esc_tabl = 'escalation'
self.fac_tabl = 'facility'
elif "heatpipe" in str(xml2obj.ref_model.value).lower():
self.ref_model = 'heatpipe'
self.acc_tabl = 'heatpipe_account'
self.cel_tabl = 'heatpipe_cost_element'
self.var_tabl = 'heatpipe_variable'
self.alg_tabl = 'algorithm'
self.esc_tabl = 'escalation'
self.fac_tabl = 'facility'
elif "pwr12-be" in str(xml2obj.ref_model.value).lower():
self.ref_model = 'pwr12-be'
self.acc_tabl = 'account'
self.cel_tabl = 'cost_element'
self.var_tabl = 'variable'
self.alg_tabl = 'algorithm'
self.esc_tabl = 'escalation'
self.fac_tabl = 'facility'
elif "lfr" in str(xml2obj.ref_model.value).lower():
self.ref_model = 'lfr'
self.acc_tabl = 'lfr_account'
self.cel_tabl = 'abr_cost_element'
self.var_tabl = 'abr_variable'
self.alg_tabl = 'algorithm'
self.esc_tabl = 'escalation'
self.fac_tabl = 'facility'
elif "fusion" in str(xml2obj.ref_model.value).lower():
self.ref_model = 'fusion'
self.acc_tabl = 'fusion_acco'
self.cel_tabl = None
self.var_tabl = 'fusion_varv'
self.alg_tabl = 'fusion_alg'
self.esc_tabl = 'escalation'
self.fac_tabl = 'facility'
elif "stellarator" in str(xml2obj.ref_model.value).lower():
self.ref_model = 'stellarator'
self.acc_tabl = 'ste_acc'
self.cel_tabl = None
self.var_tabl = 'ste_var'
self.alg_tabl = 'fusion_alg'
self.esc_tabl = 'escalation'
self.fac_tabl = 'facility'
elif "user_defined" in str(xml2obj.ref_model.value).lower():
self.ref_model = 'user_defined'
self.acc_tabl = 'user_defined_account'
self.cel_tabl = None
self.var_tabl = 'user_defined_variable'
self.alg_tabl = 'user_defined_algorithm'
self.esc_tabl = 'escalation'
self.fac_tabl = 'facility'
return None
[docs]
def load_obj(self, input_path, accert_path):
"""Convert son file to xml stream and creates a python data structure.
Parameters
----------
input_path : PathLike
Inputs file path.
accert_path: PathLike
ACCERT's repository path.
Returns
-------
A Python object converted from the input file.
"""
import subprocess
sonvalidxml = os.path.join(accert_path, "bin", "sonvalidxml")
schema = os.path.join(accert_path, "src", "etc", "accert.sch")
cmd = ' '.join([sonvalidxml, schema, input_path])
xmlresult = subprocess.check_output(cmd, shell=True)
### obtain pieces of input by name for convenience
# from .wasppy import xml2obj
return xml2obj.xml2obj(xmlresult)
[docs]
def get_current_COAs(self, c, inp_id):
"""Get current Code of Accounts based on the input ID of Super Account.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
inp_id : str
COA ID
Returns
-------
coa_lst
List of COA's.
coa_others
List of a COA's other info, including ind, lft, rgt.
"""
# DELIMITER $$
# CREATE DEFINER=`root`@`localhost` PROCEDURE `get_current_COAs`(IN table_name VARCHAR(50),
# IN inp_id VARCHAR(50))
# BEGIN
# SET @stmt = CONCAT('SELECT code_of_account,
# ind FROM ', table_name, ' WHERE supaccount = ?');
# PREPARE stmt FROM @stmt;
# SET @inp_id = inp_id;
# EXECUTE stmt USING @inp_id;
# DEALLOCATE PREPARE stmt;
# END$$
# DELIMITER ;
c.callproc('get_current_COAs',(self.acc_tabl, inp_id))
for row in c.stored_results():
coa_info = row.fetchall()
coa_lst = []
coa_other =[]
for coa in coa_info:
coa_lst.append(coa[0])
coa_other.append(coa[1:])
return coa_lst, coa_other
[docs]
def update_account_before_insert(self, c, min_ind):
"""Updates the current COAs ind.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
min_ind : int
Original index of the account next to the inserted COA.
"""
# DELIMITER $$
# CREATE DEFINER=`root`@`localhost` PROCEDURE `update_account_before_insert`(IN table_name VARCHAR(50),
# IN min_ind INT)
# BEGIN
# SET @stmt = CONCAT('UPDATE ', table_name,
# ' SET ind = ind + 1 WHERE ind > ?');
# PREPARE stmt FROM @stmt;
# SET @min_ind = min_ind-1;
# EXECUTE stmt USING @max_ind;
# DEALLOCATE PREPARE stmt;
# END$$
# DELIMITER ;
c.callproc('update_account_before_insert',(self.acc_tabl, min_ind-1))
return None
[docs]
def insert_new_COA(self, c, ind, supaccount, level,
code_of_account, account_description= None,
total_cost=0, review_status='Added', prn='0'):
"""Insert a new COA in between an index in the account table.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
ind : int
Index of the new inserted COA.
supaccount : str
Super account of the new inserted COA.
level : int
Level of the new inserted COA.
code_of_account : str, optional
COA of the new inserted COA, by default "new"
account_description : str, optional
Account description of the new inserted COA. (By default none)
total_cost : int, optional
Total cost of the new inserted COA. (Set to 0 dollars by default)
review_status : str, optional
Review status of the new inserted COA. (By default 'Unchanged')
prn : str(float), optional
Percentage of the total cost of new inserted COA. (Set to 0% by default)
"""
# DELIMITER $$
# CREATE DEFINER=`root`@`localhost` PROCEDURE `insert_new_COA`(IN table_name VARCHAR(50),
# IN ind INT,
# IN supaccount VARCHAR(50),
# IN level INT,
# IN lft INT,
# IN rgt INT,
# IN code_of_account VARCHAR(50),
# IN account_description VARCHAR(50),
# IN total_cost INT,
# IN unit VARCHAR(50),
# IN main_subaccounts VARCHAR(100),
# IN cost_elements VARCHAR(50),
# IN review_status VARCHAR(50),
# IN prn VARCHAR(50))
# BEGIN
# SET @stmt = CONCAT('INSERT INTO ', table_name,
# ' (ind, supaccount, level, lft, rgt, code_of_account, account_description,
# total_cost, unit, main_subaccounts, cost_elements, review_status, prn)
# VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)');
# PREPARE stmt FROM @stmt;
# SET @ind = ind;
# SET @supaccount = supaccount;
# SET @level = level;
# SET @lft = lft;
# SET @rgt = rgt;
# SET @code_of_account = code_of_account;
# SET @account_description = account_description;
# SET @total_cost = total_cost;
# SET @unit = unit;
# SET @main_subaccounts = main_subaccounts;
# SET @cost_elements = cost_elements;
# SET @review_status = review_status;
# SET @prn = prn;
# EXECUTE stmt USING @ind, @supaccount, @level, @lft, @rgt, @code_of_account, @account_description,
# @total_cost, @unit, @main_subaccounts, @cost_elements, @review_status, @prn;
# DEALLOCATE PREPARE stmt;
# END$$
# DELIMITER ;
c.callproc('insert_new_COA',(self.acc_tabl, ind, supaccount, level,
code_of_account, account_description, total_cost,
review_status, prn))
return None
[docs]
def insert_COA(self, c, sup_coa,user_added_coa,user_added_coa_desc,
user_added_coa_total_cost):
"""Insert a new COA into the account table.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
sup_coa : str
Super account of the new inserted COA.
user_added_coa : str
COA of the new inserted COA.
user_added_coa_desc : str
Account description of the new inserted COA.
user_added_coa_total_cost : int
Total cost of the new inserted COA.
"""
# collect current COAs
# current_COAs are list of current COAs' code_of_account
# current_COA_others are list of current COAs' other info
# include current COAs' ind lft rgt
current_COAs, current_COA_others = self.get_current_COAs(c, sup_coa)
print('[Updating] Inserting new COA under COA',sup_coa)
# print current COAs wrapped word for easy reading
current_COAs = ', '.join(current_COAs)
print('[Updating] Current COAs under COA {}: {}'.format(sup_coa, current_COAs))
print(' ')
min_ind = min(current_COA_others,key=lambda item:item[0])[0]
# NOTE : if new COA is added, it will be added to the end of the top suplist
# TODO : return a new COA id with the COA list as input
# new_COA = get_new_COA_id(current_COAs)
# DELIMITER $$
# CREATE DEFINER=`root`@`localhost` PROCEDURE `sup_coa_level`(IN table_name VARCHAR(50),
# IN supaccount VARCHAR(50))
# BEGIN
# SET @stmt = CONCAT('SELECT level FROM ', table_name, ' WHERE code_of_account = ?');
# PREPARE stmt FROM @stmt;
# SET @supaccount = supaccount;
# EXECUTE stmt USING @supaccount;
# DEALLOCATE PREPARE stmt;
# END$$
# DELIMITER ;
c.callproc('sup_coa_level',(self.acc_tabl, sup_coa))
for row in c.stored_results():
sup_coa_level = row.fetchone()[0]
coa_level = sup_coa_level + 1
# before inserting new COA, update the current COAs' ind
# for example if the new COA is inserted between 1 and 2,
# then the min_ind is 2, so the current COAs' ind will be updated
# from 2 to n change to 3 to n+1
# and the new COA will be inserted at 2
self.update_account_before_insert(c, min_ind)
# insert new COA
self.insert_new_COA(c, ind=min_ind, supaccount=sup_coa,
level = coa_level, code_of_account=user_added_coa,
account_description=user_added_coa_desc,
total_cost= user_added_coa_total_cost)
return None
[docs]
def update_variable_info_on_name(self, c,var_id,var_value,var_unit):
"""
Updates variable info based on variable name.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
var_id : str
Variable ID.
var_value : float
Variable value.
var_unit : str
Variable unit.
"""
# DELIMITER $$
# CREATE DEFINER=`root`@`localhost` PROCEDURE `update_variable_info_on_name`(IN table_name VARCHAR(50),
# IN `u_i_var_name` VARCHAR(50), IN `value` FLOAT, IN `unit` VARCHAR(50))
# BEGIN
# SET @stmt = CONCAT('UPDATE ', table_name, ' SET var_value = ?,
# var_unit = ?,
# user_input = ? WHERE var_name = ?');
# PREPARE stmt FROM @stmt;
# SET @var_value = value;
# SET @var_unit = unit;
# SET @user_input = 1;
# SET @var_name = u_i_var_name;
# EXECUTE stmt USING @var_value, @var_unit, @user_input, @var_name;
# DEALLOCATE PREPARE stmt;
# END$$
# DELIMITER ;
args = (self.var_tabl, var_id, float(var_value), var_unit)
c.callproc('update_variable_info_on_name', args)
return None
[docs]
def update_super_variable(self, c,var_id):
"""
Updates super variable info based on variable name.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
var_id : str
Variable ID.
"""
# DELIMITER $$
# CREATE DEFINER=`root`@`localhost` PROCEDURE `update_super_variable`(IN var_table_name VARCHAR(50),
# IN alg_table_name VARCHAR(50), IN `u_i_var_name` VARCHAR(50))
# BEGIN
# SET @stmt = CONCAT('SELECT var.ind, var.var_name, var.var_value,
# var.var_alg, var.var_need, alg.ind, alg.alg_python,
# alg.alg_formulation, alg.alg_units, var.var_unit
# FROM ', var_table_name, ' as var JOIN ', alg_table_name, ' as alg
# ON var.var_alg=alg.alg_name
# WHERE var.var_name=?');
# PREPARE stmt FROM @stmt;
# SET @var_name = u_i_var_name;
# EXECUTE stmt USING @var_name;
# DEALLOCATE PREPARE stmt;
# END$$
# DELIMITER ;
c.callproc('update_super_variable',(self.var_tabl, self.alg_tabl, var_id))
for row in c.stored_results():
result = row.fetchone()
### results is a tuple
sup_var_name = result[1]
org_var_value = result[2]
alg_name = result[3]
var_name_lst = [x.strip() for x in result[4].split(',')]
alg_no = result[5]
alg = result[6]
alg_form = result[7]
alg_unit = result[8]
sup_var_unit = result[9]
# # # create a value list for debugging
# # var_value_lst = []
variables = {}
if self.cel_tabl:
for var_ind, var_name in enumerate(var_name_lst):
# var_value_lst.append(get_var_value_by_name(c, var_name))
variables['v_{}'.format(var_ind+1)] = self.get_var_value_by_name(c, var_name)
print('[Updating] Sup Variable {}, running algorithm: [{}], \n[Updating] with formulation: {}'.format(sup_var_name, alg_name, alg_form))
alg_value = self.run_pre_alg(alg, **variables)
else:
for var_ind, var_name in enumerate(var_name_lst):
variables[var_name] = self.get_var_value_by_name(c, var_name)
print('[Updating] Sup Variable {}, running algorithm: [{}], \n[Updating] with formulation: {}'.format(sup_var_name, alg_name, alg_form))
alg_value= self.update_account_value(alg, alg_name, variables)
self.update_input_variable(c,sup_var_name,alg_value,sup_var_unit,quite = True)
if alg_unit == '1':
alg_unit=''
sup_var_unit=''
# print formatting value for scientific notation
print('[Updated] Reference value is : {:,.2e} {}, calculated value is: {:,.2e} {}'.format(org_var_value,alg_unit,alg_value,sup_var_unit))
# print('[Updated] Reference value is : {} {}, calculated value is: {} {}'.format(org_var_value,alg_unit,alg_value,sup_var_unit))
print(' ')
return None
[docs]
def cal_LCOE(self, c, ut, accert):
"""
Calculates the Levelized Cost of Energy (LCOE) based on the input data.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
ut : Utility_methods
Utility_methods class instantiates objects that can perform utility methods.
accert : Accert
Accert class instantiates objects that can perform ACCERT methods.
Returns
-------
None
"""
if self.ref_model == 'fusion' or self.ref_model == 'stellarator':
# inport the LCOE module
module = importlib.import_module('Algorithm.LCOE')
LCOE_module = module.LCOE(c, ut, accert)
LCOE_module.setup_tables(Accert)
LCOE_module.quote_variable_values(c,Accert)
LCOE_module.coelc()
LCOE_module.generate_excel()
else:
pass
[docs]
def check_unit_conversion(self, org_unit, new_unit):
"""
Checks if unit conversion is needed.
Parameters
----------
org_unit : str
Original unit.
new_unit : str
New unit.
"""
if org_unit == new_unit:
return False
elif org_unit == "N/A" or org_unit == "none" or org_unit == "None":
print('[Note] Original unit is not available, no conversion needed')
return False
else:
return True
[docs]
def convert_unit(self, current_value, current_unit, to_unit):
"""
Converts the current unit to a new unit.
Parameters
----------
current_value : float
Current value to be converted.
current_unit : str
Current unit.
to_unit : str
Unit to be converted to.
Returns
-------
to_value : float
Converted value.
"""
scale = float(self.convert_unit_scale(current_unit,to_unit))
to_value = current_value * scale
if to_unit != 'dollar':
print("[Unit Changed] Converted input from {} {} to {} {}".format(current_value, current_unit,to_value,to_unit))
return to_value
[docs]
def convert_unit_scale(self, current_unit, to_unit):
"""
Converts the current unit to a new unit in a scale pattern. (I.e. from kiloWatts to megaWatts or gigaWatts)
Parameters
----------
current_unit : str
current unit
to_unit : str
unit to be converted to
Returns
-------
scale : float
"""
if current_unit == to_unit:
return 1
elif current_unit == 'KW':
if to_unit == 'MW':
return 0.001
elif to_unit == 'GW':
return 0.000001
elif current_unit == 'MW':
if to_unit == 'KW':
return 1000
elif to_unit == 'GW':
return 0.001
elif current_unit == 'GW':
if to_unit == 'KW':
return 1000000
elif to_unit == 'MW':
return 1000
elif current_unit == 'million':
if to_unit == 'dollar':
return 1000000
elif to_unit == 'thousand':
return 1000
elif current_unit == 'thousand':
if to_unit == 'million':
return 1/1000
elif to_unit == 'dollar':
return 1000
elif current_unit == 'dollar':
if to_unit == 'thousand':
return 1/1000
elif to_unit == 'million':
return 1/1000000
elif current_unit == 'lbs':
if to_unit == 'kg':
return 0.453592
elif to_unit == 'ton':
return 0.000453592
elif current_unit == 'kg':
if to_unit == 'lbs':
return 2.20462
elif to_unit == 'ton':
return 0.001
elif current_unit == 'ton':
if to_unit == 'lbs':
return 2204.62
elif to_unit == 'kg':
return 1000
elif current_unit == 'bar':
if to_unit == 'psi':
return 14.5038
elif to_unit == 'psf':
return 2088.54
elif current_unit == 'psi':
if to_unit == 'bar':
return 0.068947572927646
elif to_unit == 'psf':
return 144
else:
print('Cannot convert unit from ',current_unit,'to',to_unit)
raise ValueError
[docs]
def update_total_cost(self, c,tc_id, u_i_tc_value, u_i_tc_unit):
"""
Updates the total cost based on a total cost ID. (Checks if unit conversion is needed)
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
tc_id : str
COA of the total cost.
u_i_tc_value : float
Total cost's value.
u_i_tc_unit : str
Total cost's unit.
"""
print('[Updating] Total cost of account {}'.format(tc_id))
org_tc_info = self.extract_total_cost_on_name(c,tc_id)
org_tc_value = float(org_tc_info[2])
org_tc_unit = "dollar"
unit_convert = self.check_unit_conversion(org_tc_unit,u_i_tc_unit)
if unit_convert:
u_i_tc_value = self.convert_unit(u_i_tc_value,u_i_tc_unit,org_tc_unit)
u_i_tc_unit = org_tc_unit
self.update_total_cost_on_name(c,tc_id,u_i_tc_value)
# do not print unit if unit is '1' or 'N/A' or 'none' or 'None'
if u_i_tc_unit == '1' or u_i_tc_unit == 'N/A' or u_i_tc_unit == 'none' or u_i_tc_unit == 'None':
u_i_tc_unit = ''
if org_tc_unit == '1' or org_tc_unit == 'N/A' or org_tc_unit == 'none' or org_tc_unit == 'None':
org_tc_unit = ''
print('[Updated] Changed from {:,.2f} {} to {:,.2f} {}\n'.format( org_tc_value,org_tc_unit, int(u_i_tc_value), org_tc_unit))
return None
[docs]
def update_total_cost_on_name(self, c, tc_id, u_i_tc_value):
"""
Updates the total cost based on a total cost ID, without checking for unit conversion.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
tc_id : str
COA of the total cost.
u_i_tc_value : float
Total cost's value.
u_i_tc_unit : str
Total cost's unit.
Returns
-------
None
"""
## NOTE I'm not sure if this is the best way to update the total cost
## Statement is not working as expected when passing in a string in a dictionary
## but it works when passing in the string directly in .format() method
# DELIMITER $$
# CREATE DEFINER=`root`@`localhost` PROCEDURE `update_total_cost_on_name`(IN table_name VARCHAR(50),
# IN `tc_id` VARCHAR(50),
# IN `u_i_tc_value` FLOAT,
# IN `u_i_tc_unit` VARCHAR(50))
# BEGIN
# SET @stmt = CONCAT('UPDATE ', table_name, ' SET total_cost = ?, unit = ?,
# review_status = "User Input" WHERE code_of_account = ?');
# PREPARE stmt FROM @stmt;
# SET @tc_id = tc_id;
# SET @u_i_tc_value = u_i_tc_value;
# SET @u_i_tc_unit = u_i_tc_unit;
# EXECUTE stmt USING @u_i_tc_value, @u_i_tc_unit, @tc_id;
# DEALLOCATE PREPARE stmt;
# END$$
# DELIMITER ;
u_i_tc_value= float(u_i_tc_value)
c.callproc('update_total_cost_on_name',(self.acc_tabl,tc_id,u_i_tc_value))
return None
[docs]
def get_var_value_by_name(self, c, var_name):
"""
Get a variable value based on a specific variable name.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
var_name : str
Variable name.
Returns
-------
var_value : str
Variable value.
"""
# DELIMITER $$
# CREATE DEFINER=`root`@`localhost` PROCEDURE `get_var_value_by_name`(IN table_name VARCHAR(50),
# IN `var_name` VARCHAR(50))
# BEGIN
# SET @stmt = CONCAT('SELECT var_value FROM ', table_name, ' WHERE var_name = ?');
# PREPARE stmt FROM @stmt;
# SET @var_name = var_name;
# EXECUTE stmt USING @var_name;
# DEALLOCATE PREPARE stmt;
# END$$
# DELIMITER ;
c.callproc('get_var_value_by_name',(self.var_tabl,var_name))
for row in c.stored_results():
var_value = row.fetchone()[0]
return var_value
[docs]
def run_pre_alg(self, alg, **kwargs):
"""
Runs pre-algorithms.
Parameters
----------
alg : str
Pre-algorithm name.
**kwargs : dict
Keyword arguments.
Returns
-------
alg_value : float
Algorithm value
"""
# NOTE: comments below is the original note from Patrick,
# I would want to keep the original note for future reference
# add the variables in kwargs to the local
# function namespace
# (equivalent to c1 = 10; c2 = 10; c3 = 40.5)
locals().update(kwargs)
# report back the user algorithm
# evaluate the algorithm
alg_value = eval(alg)
return alg_value
[docs]
def update_account_value(self, alg_py, alg_name, variables):
"""
Calls the specified algorithm with the given variables. Only called for fusion model now.
For PWR, ABR,LFR, HEATPIPE the alg_py is in the form of a string that will be
evaluated in the Algorithm table stored in database. For Fusion, the algorithm is in
the form of a python file name that stored in Algorithm folder. For example, in
Fusion model, the alg_py value is 'FusionFunc' then it should look for FusionFunc.py
in the Algorithm folder.
Parameters
----------
alg_py : str
Algorithm in python.
alg_name : str
Algorithm name.my
variables : dict
Variables to be passed to the algorithm.
Returns
-------
result : float
Algorithm result.
"""
# Dynamically import the module
module = importlib.import_module(f'Algorithm.{alg_py}')
# Get the class from the module
class_ = getattr(module, alg_py)
# Create an instance of the class
algorithm_instance = class_(
ind=1, # Dummy value, may be needed for future reference
alg_name=alg_name,
alg_for='test', # Dummy value
alg_description=f'Description of {alg_name}', # Dummy value
alg_formulation=f'Formulation of {alg_name}', # Dummy value
alg_units='units', # Dummy value
variables=','.join(variables.keys()), # Convert variable names to a comma-separated string
constants='' # Dummy value, replace as needed
)
# Run the algorithm and get the result
result = algorithm_instance.run(variables)
return result
[docs]
def update_cost_element_on_name(self, c, ce_name, alg_value):
"""
Updates the cost element based on cost element name. (Turn off safe update mode)
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
ce_name : str
Cost element name starting with the COA of the account.
alg_value : float
Cost element value.
Returns
-------
None
"""
# Turn off safe update mode
# keep the original note for future reference
c.execute("""SET SQL_SAFE_UPDATES = 0;""")
# DELIMITER $$
# CREATE DEFINER=`root`@`localhost` PROCEDURE `update_cost_element_on_name`(
# IN table_name VARCHAR(50),
# IN ce_name VARCHAR(50),
# IN alg_value DECIMAL(20,5)
# )
# BEGIN
# -- Disable safe updates for this operation
# SET SQL_SAFE_UPDATES = 0;
# -- Build the dynamic SQL query
# SET @stmt = CONCAT('UPDATE ', table_name,
# ' SET cost_2017 = ', alg_value,
# ', updated = 1 WHERE cost_element = ''', ce_name, '''');
# -- Prepare and execute the dynamic statement
# PREPARE stmt FROM @stmt;
# EXECUTE stmt;
# -- Deallocate the prepared statement
# DEALLOCATE PREPARE stmt;
# END;$$
# DELIMITER ;
# NOTE, float is used for alg_value, but it can be changed to DECIMAL(20,15) in the
# stored procedure, since float in python is equivalent to double in MySQL, tested
# for several values but using float in stored procedure is not recommended since
# the rolled up value may not be accurate.
c.callproc('update_cost_element_on_name',(self.cel_tabl,ce_name,float(alg_value)))
return None
[docs]
def update_new_cost_elements(self, c):
"""
Calculates and updates affected cost elements based on the user input.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
"""
print(' Updating cost elements '.center(100,'='))
print('\n')
c.callproc('update_new_cost_elements',(self.cel_tabl,self.var_tabl,self.alg_tabl))
for row in c.stored_results():
results = row.fetchall()
# DELIMITER $$
# CREATE DEFINER=`root`@`localhost` PROCEDURE `update_new_cost_elements`(IN cel_tabl_name VARCHAR(50),
# IN var_tabl_name VARCHAR(50),
# IN alg_tabl_name VARCHAR(50))
# BEGIN
# SET SQL_SAFE_UPDATES = 0;
# SET @stmt = CONCAT("SELECT ce.ind, ce.cost_element,
# ce.cost_2017, ce.alg_name,
# ce.variables, ce.algno,
# alg.alg_python, alg.alg_formulation, alg.alg_units
# FROM ", cel_tabl_name, " AS ce
# JOIN ", alg_tabl_name, " AS alg ON ce.alg_name = alg.alg_name
# WHERE EXISTS (
# SELECT 1
# FROM ", var_tabl_name, " AS va
# WHERE va.user_input = 1
# AND FIND_IN_SET(va.var_name, REPLACE(ce.variables, ' ', '')) > 0);");
# PREPARE stmt FROM @stmt;
# EXECUTE stmt;
# DEALLOCATE PREPARE stmt;
# END$$
# DELIMITER ;
for row in results:
ce_name = row[1]
org_ce_value = row[2]
alg_name = row[3]
var_name_lst = [x.strip() for x in row[4].split(',')]
alg_no = row[5]
alg = row[6]
alg_form = row[7]
alg_unit = row[8]
# NOTE cost element unit is always in USD dollar
# maybe the unit for cost element can be added to the cost_element table later???
# # create a value list for debugging
# var_value_lst = []
variables = {}
for var_ind, var_name in enumerate(var_name_lst):
# var_value_lst.append(get_var_value_by_name(c, var_name))
variables['v_{}'.format(var_ind+1)] = self.get_var_value_by_name(c, var_name)
print('[Updating] Cost element [{}], running algorithm: [{}], \n[Updating] with formulation: {}'.format(ce_name, alg_name, alg_form))
alg_value = self.run_pre_alg(alg, **variables)
unit_convert = self.check_unit_conversion('dollar',alg_unit)
if unit_convert:
alg_value = self.convert_unit(alg_value,alg_unit,'dollar')
print('[Updated] Reference value is : ${:<11,.0f}, calculated value is: ${:<11,.0f} '.format(org_ce_value,alg_value))
self.update_cost_element_on_name(c,ce_name,alg_value)
print(' ')
return None
[docs]
def update_new_accounts(self, c):
"""
Updates the affected accounts based on the variables. This function is called
when there is no cost element table.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
"""
# for fusion or user defined table, there is no cost_element table
# so the update_new_cost_elements will not be executed
# instead, the update_new_accounts will be executed
print(' Updating accounts '.center(100,'='))
# DELIMITER $$
# CREATE DEFINER=`root`@`localhost` PROCEDURE `update_new_accounts`(IN acc_tabl_name VARCHAR(50),
# IN var_tabl_name VARCHAR(50),
# IN alg_tabl_name VARCHAR(50))
# BEGIN
# SET SQL_SAFE_UPDATES = 0;
# SET @stmt = CONCAT("SELECT ac.ind, ac.code_of_account,
# ac.total_cost, ac.alg_name,
# ac.variables,
# alg.alg_python, alg.alg_formulation, alg.alg_units
# FROM ", acc_tabl_name, " AS ac
# JOIN ", alg_tabl_name, " AS alg ON ac.alg_name = alg.alg_name
# WHERE EXISTS (
# SELECT 1
# FROM ", var_tabl_name, " AS va
# WHERE va.user_input = 1
# AND FIND_IN_SET(va.var_name, REPLACE(ac.variables, ' ', '')) > 0);");
# PREPARE stmt FROM @stmt;
# EXECUTE stmt;
# DEALLOCATE PREPARE stmt;
# END$$
c.callproc('update_new_accounts',(self.acc_tabl,self.var_tabl,self.alg_tabl))
for row in c.stored_results():
results = row.fetchall()
for row in results:
acc_name = row[1]
# NOTE only for debugging
org_acc_value = row[2]
alg_name = row[3]
var_name_lst = [x.strip() for x in row[4].split(',')]
alg_py = row[5]
alg_form = row[6]
alg_unit = row[7]
variables = {}
for var_ind, var_name in enumerate(var_name_lst):
variables[var_name] = self.get_var_value_by_name(c, var_name)
print('[Updating] Account [{}], running algorithm: [{}], \n[Updating] with formulation: {}'.format(acc_name, alg_name, alg_form))
# alg_py is the algorithm python file name in Algorithm folder
# alg_name is the function name in the alg_py file
# now pass the variables and run the algorithm
alg_value = self.update_account_value(alg_py, alg_name, variables)
unit_convert = self.check_unit_conversion('dollar',alg_unit)
if unit_convert:
alg_value = self.convert_unit(alg_value,alg_unit,'dollar')
self.update_total_cost(c, acc_name, alg_value, 'dollar')
print(' ')
[docs]
def update_account_table_by_cost_elements(self, c):
"""
Updates the account table based on the sum of the cost elements.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
"""
print(' Updating account table '.center(100,'='))
print('\n')
print('[Updating] Updating account table by cost elements')
# DELIMITER $$
# CREATE DEFINER=`root`@`localhost` PROCEDURE `update_account_table_by_cost_elements`(IN acc_tabl_name varchar(50),
# IN cel_tabl_name varchar(50))
# BEGIN
# SET @stmt = CONCAT('UPDATE ', acc_tabl_name, ',',
# '(SELECT ', acc_tabl_name, '.code_of_account,
# ce.total_cost as cost,
# ce.updated as updated,
# ', acc_tabl_name, '.unit
# FROM ', acc_tabl_name, '
# JOIN (SELECT account,
# sum(cost_2017) as total_cost,
# sum(updated) as updated
# FROM ', cel_tabl_name, '
# GROUP BY ', cel_tabl_name, '.account ) as ce
# on ', acc_tabl_name, '.code_of_account = ce.account
# ORDER BY ', acc_tabl_name, '.ind) as updated_account
# SET ', acc_tabl_name, '.total_cost = updated_account.cost,
# review_status = \'Ready for Review\'
# WHERE updated_account.updated > 0
# and ', acc_tabl_name, '.code_of_account = updated_account.code_of_account;');
# PREPARE stmt FROM @stmt;
# EXECUTE stmt;
# DEALLOCATE PREPARE stmt;
# END$$
# DELIMITER ;
c.callproc('update_account_table_by_cost_elements', (self.acc_tabl, self.cel_tabl))
print('[Updated] Account table updated from cost elements\n')
return None
[docs]
def roll_up_cost_elements(self, c):
"""
Rolls up cost elements from level 3 to 0 for pwr. Only rolls up level 3 to 2 for ABR.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
"""
print(' Roll up cost elements '.center(100,'='))
print('\n')
self.roll_up_cost_elements_by_level(c,3,2)
if self.ref_model=="pwr12-be":
self.roll_up_cost_elements_by_level(c,2,1)
self.roll_up_cost_elements_by_level(c,1,0)
print('[Updated] Cost elements rolled up\n')
return None
[docs]
def roll_up_cost_elements_by_level(self, c,from_level,to_level):
"""
Rolls up cost elements from an input lower level to a higher level.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
from_level : int
Roll up from a given level.
to_level : int
Roll up to a given level.
"""
# DELIMITER $$
# CREATE DEFINER=`root`@`localhost` PROCEDURE `roll_up_cost_elements_by_level`(IN table_name varchar(50),
# IN from_level int, IN to_level int)
# BEGIN
# SET @stmt = CONCAT('UPDATE ', table_name, ',',
# '(SELECT c',to_level,'.cost_element as ce',to_level,'_ce, ',
# 'sum(uc',from_level,'.cost_2017) as c',to_level,'_cal_total_cost ',
# 'FROM ', table_name, ' as uc',from_level,
# ' JOIN ', table_name, ' as c',to_level,
# ' on uc',from_level,'.sup_cost_ele=c',to_level,'.cost_element ',
# 'join account as ac',to_level,
# ' on c',to_level,'.account = ac',to_level,'.code_of_account ',
# 'where ac',to_level,'.level=',to_level,
# ' group by c',to_level,'.cost_element) as updated_ce',to_level,
# ' SET ',
# table_name,'.cost_2017 = updated_ce',to_level,'.c',to_level,'_cal_total_cost,',
# table_name,'.updated = 1 ',
# 'WHERE ',
# table_name,'.cost_element = updated_ce',to_level,'.ce',to_level,'_ce');
# PREPARE stmt FROM @stmt;
# EXECUTE stmt
# DEALLOCATE PREPARE stmt;
# END$$
# DELIMITER ;
c.callproc('roll_up_cost_elements_by_level',(self.cel_tabl,from_level,to_level))
print('[Updating] Roll up cost elements from level {} to level {}'.format(from_level,to_level))
return None
[docs]
def roll_up_account_table(self, c, from_level=3, to_level=0, gncoa=False):
"""
Rolls up the account table from level 3 to 0.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
"""
print(' Rolling up account table '.center(100,'='))
print('\n')
for i in range(from_level, to_level, -1):
self.roll_up_account_table_by_level(c,i,i-1,gncoa=gncoa)
print('[Updated] Account table rolled up\n')
return None
[docs]
def roll_up_account_table_by_level(self, c, from_level, to_level, gncoa=False):
"""
Rolls up the account table from an input lower level to a higher level.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
from_level : int
Roll up from a given level.
to_level : int
Roll up to a given level.
"""
print('[Updating] Rolling up account table from level {} to level {} '.format(from_level,to_level))
if gncoa:
c.callproc('roll_up_account_table_by_gn_level',(self.acc_tabl,from_level,to_level))
else:
c.callproc('roll_up_account_table_by_level',(self.acc_tabl,from_level,to_level))
return None
[docs]
def roll_up_account_table_GNCOA(self, c):
"""
Rolls up the account table for the reactor model that only has limited accounts.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
"""
print(' Rolling up account table by GNCOA '.center(100,'='))
# remove 220A first
c.callproc('remove_specific_row',(self.acc_tabl,'220A'))
self.roll_up_account_table(c, from_level=4, to_level=0, gncoa=True)
# print('[Updated] Account table rolled up\n')
return None
[docs]
def sum_cost_elements_2C(self, c):
"""
Sums the cost elements for COA 2C (Calculated cost).
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
"""
def fetch_sum_and_update(cost_type, proc_name):
"""
Fetches the sum of the cost elements and updates the cost element.
Parameters
----------
cost_type : str
Cost type.
proc_name : str
Procedure name.
"""
# Call stored procedure and fetch results
print(f'[Updating] Summing cost element for {cost_type}')
c.callproc(proc_name, (self.cel_tabl, self.acc_tabl))
for row in c.stored_results():
results = row.fetchall()
sum_value = results[0][0]
# Update cost element
self.update_cost_element_on_name(c, cost_type, sum_value)
return sum_value
print(' Summing cost elements for direct cost '.center(100, '='))
# Fetch and update cost elements
sum_2c_fac = fetch_sum_and_update('2c_fac', 'sum_cost_elements_2C_fac')
sum_2c_lab = fetch_sum_and_update('2c_lab', 'sum_cost_elements_2C_lab')
sum_2c_mat = fetch_sum_and_update('2c_mat', 'sum_cost_elements_2C_mat')
print('[Updated] Cost elements 2c_fac, 2c_lab, 2c_mat are: '
f'${sum_2c_fac:<11,.0f}, ${sum_2c_lab:<11,.0f}, ${sum_2c_mat:<11,.0f}')
print('[Updated] Cost elements summed\n')
return None
[docs]
def roll_up_lmt_account_2C(self, c):
"""
Sums up total cost of account 2C for the reactor model that only has limited accounts.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
"""
print(' Rolling up account table '.center(100,'='))
print('\n')
c.callproc('roll_up_lmt_account_2C', (self.acc_tabl,))
print('[Updated] Account table summed up for calculated direct cost.')
return None
[docs]
def roll_up_lmt_direct_cost(self, c):
"""
Sums up the total cost of account 2 from account 2C for
the reactor model that only has limited accounts.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
"""
c.callproc('roll_up_lmt_direct_cost',(self.acc_tabl,))
print('[Updated] Account table rolled up for direct cost.\n')
return None
[docs]
def cal_direct_cost_elements(self, c):
"""
Calculates the direct cost elements for the ABR including the factory, labor, and material costs. (2C_fac, 2C_lab, 2C_mat)
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
"""
c.callproc('cal_direct_cost_elements', (self.acc_tabl, self.cel_tabl))
# After the procedure execution, fetch the OUT parameters from the cursor
# The stored procedure call doesn't return results, but the OUT parameters are updated
for row in c.stored_results():
results = row.fetchall()
fac, lab, mat = results[0]
return fac,lab,mat
[docs]
def roll_up_lmt_account_table(self, c):
"""
Rolls up the account table for the reactor model that only has limited accounts.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements
"""
### only update account 222 and account 2C
self.roll_up_account_table(c, from_level=3, to_level=2)
# print('[Updated] Account table rolled up\n')
self.roll_up_lmt_account_2C(c)
self.roll_up_lmt_direct_cost(c)
return None
[docs]
def print_logo(self):
"""
Prints the ACCERT logo.
"""
print('\n')
print("::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::")
print(":::'###:::::'######:::'######::'########:'########::'########:")
print("::'## ##:::'##... ##:'##... ##: ##.....:: ##.... ##:... ##..::")
print(":'##:. ##:: ##:::..:: ##:::..:: ##::::::: ##:::: ##:::: ##::::")
print("'##:::. ##: ##::::::: ##::::::: ######::: ########::::: ##::::")
print(" #########: ##::::::: ##::::::: ##...:::: ##.. ##:::::: ##::::")
print(" ##.... ##: ##::: ##: ##::: ##: ##::::::: ##::. ##::::: ##::::")
print(" ##:::: ##:. ######::. ######:: ########: ##:::. ##:::: ##::::")
print("..:::::..:::.......::::......::........::..:::::..:::::..:::::")
print('\n')
[docs]
def execute_accert(self, c, ut):
"""
Executes the ACCERT program.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
ut : UserTable
UserTable class instantiates objects that can execute user input statements.
"""
self.print_logo()
accert = self.load_obj(input_path, accert_path).accert
c.execute("USE accert_db")
print(' Reading user input '.center(100, '='))
print('\n')
if accert.ref_model:
self.process_reference_model(c, ut, accert)
else:
print('ERROR: model not found ')
self.exit_with_error(accert)
self.process_power_inputs(c, accert)
self.process_variables(c, accert)
self.process_COA(c, accert)
self.finalize_process(c, ut, accert)
self.generate_results(c, ut, accert)
self.cal_LCOE(c, ut, accert)
conn.close()
sys.stdout.close()
sys.stdout = stdoutOrigin
[docs]
def process_reference_model(self, c, ut, accert):
"""
Processes the reference model.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
ut : Utility class
Utility class for processing user input.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
print('[USER_INPUT]', 'Reference model is', str(accert.ref_model.value), '\n')
self.setup_table_names(accert)
ut.setup_table_names(c, Accert)
# if ref.model is not fusion or user defined then process cost elements:
if self.cel_tabl:
ut.print_user_request_parameter(c)
else:
pass
[docs]
def process_variables(self, c, accert):
"""
Processes the variables.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
if accert.var:
for var_inp in accert.var:
u_i_var_value = float(str(var_inp.value.value))
u_i_var_unit = str(var_inp.unit.value)
var_id = str(var_inp.id).replace('"', '')
self.update_input_variable(c, var_id, u_i_var_value, u_i_var_unit)
self.process_super_values(c, var_id)
[docs]
def process_super_values(self, c, var_id):
"""
Processes the super variables.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
var_id : str
Variable ID.
"""
var_id = str(var_id).replace('"', '').replace("'", "")
sup_val_lst = self.extract_super_val(c, var_id)
if sup_val_lst:
sup_val_lst = sup_val_lst.split(',')
# also remove the space after the comma
sup_val_lst = [x.strip() for x in sup_val_lst]
if sup_val_lst:
print('[Updating] Other variable(s) should be updated based on {} are {} \n'.format(var_id, sup_val_lst))
while sup_val_lst:
sup_val = sup_val_lst.pop(0)
if sup_val:
self.update_super_variable(c, sup_val)
new_sup_val = self.extract_super_val(c, sup_val)
if new_sup_val:
sup_val_lst.extend(new_sup_val.split(','))
[docs]
def process_COA(self, c, accert):
"""
Change the total cost of the account table by user inputs.
This function is called before processing the calculation.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
if accert.l0COA and accert.l0COA.l1COA:
for l1_inp in accert.l0COA.l1COA:
if l1_inp.l2COA:
self.process_level_accounts(c, l1_inp.l2COA, accert, l1_inp.id)
[docs]
def process_level_accounts(self, c, level_accounts, accert, parent_id=None):
"""
Processes the level accounts and begins the calculation.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
level_accounts : list
List of level accounts.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
parent_id : str
Parent ID.
"""
for account in level_accounts:
if "new" in str(account.id):
user_added_coa = str(account.newCOA.id)
user_added_coa_desc = str(account.newCOA.descr.value)
if account.total_cost:
user_added_coa_total_cost = float(str(account.total_cost.value.value))
user_added_coa_total_cost_unit = str(account.total_cost.unit.value)
org_tc_unit = "dollar"
unit_convert = self.check_unit_conversion(org_tc_unit,user_added_coa_total_cost_unit)
if unit_convert:
user_added_coa_total_cost = self.convert_unit(user_added_coa_total_cost,user_added_coa_total_cost_unit,org_tc_unit)
else:
user_added_coa_total_cost = 0
print('[USER_INPUT]', 'New account', user_added_coa, user_added_coa_desc, user_added_coa_total_cost, '\n')
self.insert_COA(c, str(parent_id),user_added_coa,user_added_coa_desc,user_added_coa_total_cost)
# if ref.model is not fusion then process cost elements:
if self.cel_tabl:
self.process_ce(c, account)
else:
if account.alg:
for alg in account.alg:
if alg.var:
for var in alg.var:
if var.alg is None:
self.process_var(c, var)
else:
self.process_alg(c, var)
elif account.var:
for var in account.var:
self.process_var(c, var)
for i in range(3, 7):
next_level = getattr(account, f'l{i}COA', None)
if next_level:
self.process_level_accounts(c, next_level, accert, account.id)
[docs]
def process_ce(self, c, account):
"""
Processes the cost elements, either by changing variables or algorithms.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
account : Account
Account class instantiates objects that can parse the account.
"""
if account.ce:
for ce in account.ce:
if ce.alg:
for alg in ce.alg:
if alg.var:
for var in alg.var:
if var.alg is None:
self.process_var(c, var)
else:
self.process_alg(c, var)
elif ce.var:
for var in ce.var:
self.process_var(c, var)
[docs]
def process_var(self, c, var_inp):
"""
Processes the variables, changing the variables by user inputs, and updating the super values.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
var_inp : Variable
"""
u_i_var_value = float(str(var_inp.value.value))
u_i_var_unit = str(var_inp.unit.value)
var_id = str(var_inp.id).replace('"', '')
self.update_input_variable(c, var_id, u_i_var_value, u_i_var_unit)
self.process_super_values(c, var_id)
[docs]
def process_alg(self, c, alg_inp):
"""
Processes the variables with algorithm.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
alg_inp : Algorithm
"""
for alg_var in alg_inp.alg:
if alg_var.var:
for var in alg_var.var:
var_id = str(var.id).replace('"', '')
u_i_var_value = float(str(var.value.value))
u_i_var_unit = str(var.unit.value)
self.update_input_variable(c, var_id, u_i_var_value, u_i_var_unit, var_type='Sub ')
var_id = str(alg_inp.id).replace('"', '')
self.update_super_variable(c, var_id)
[docs]
def check_and_process_total_cost(self, c, accert):
"""
Checks and processes the total cost at the end of the calculation,
if the total cost has changed by user inputs. It is important to note that
the total cost may not be reflected correctly in the cost elements table. Since
the total cost is a sum of the cost elements, the cost elements may have changed
by user inputs.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
if self.check_total_cost_changed(c, accert):
print(" IMPORTANT NOTE ".center(100, '='))
print("Some cost have changed by user inputs and may not be reflected correctly in the cost elements table.\n")
self.process_total_cost(c, accert)
[docs]
def check_total_cost_changed(self, c, accert):
"""
Checks if the total cost has changed.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
changed = False
if accert.l0COA and accert.l0COA.l1COA:
for l1_inp in accert.l0COA.l1COA:
if l1_inp.l2COA:
changed |= self.check_total_cost_accounts(c, l1_inp.l2COA, accert)
return changed
[docs]
def check_total_cost_accounts(self, c, level_accounts, accert):
"""
Checks if the total cost has changed for the accounts.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
level_accounts : list
List of level accounts.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
changed = False
for account in level_accounts:
if account.total_cost:
changed = True
for i in range(3, 7):
next_level = getattr(account, f'l{i}COA', None)
if next_level:
changed |= self.check_total_cost_accounts(c, next_level, accert)
return changed
[docs]
def process_total_cost(self, c, accert):
"""
Changes the total cost for the accounts using the user inputs.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
if accert.l0COA and accert.l0COA.l1COA:
for l1_inp in accert.l0COA.l1COA:
if l1_inp.l2COA:
self.process_total_cost_accounts(c, l1_inp.l2COA, accert)
[docs]
def process_total_cost_accounts(self, c, level_accounts, accert):
"""
Changes the total cost for the accounts using the user inputs for different levels.
This function is called after calculation is done.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
level_accounts : list
List of level accounts.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
for account in level_accounts:
if account.total_cost:
for total_cost_inp in account.total_cost:
tc_id = str(account.id).replace('"', '')
u_i_tc_value = float(str(total_cost_inp.value.value))
u_i_tc_unit = str(total_cost_inp.unit.value)
if accert.ref_model:
# TODO: change the new into any thing else
# check if the total cost is a new added account in account table check if
# the revivew status is added
if "new" in tc_id:
user_add_coa_name = str(account.newCOA.id)
self.update_total_cost(c, user_add_coa_name, u_i_tc_value, u_i_tc_unit)
else:
self.update_total_cost(c, tc_id, u_i_tc_value, u_i_tc_unit)
else:
self.exit_with_error(accert)
for i in range(3, 7):
next_level = getattr(account, f'l{i}COA', None)
if next_level:
self.process_total_cost_accounts(c, next_level, accert)
[docs]
def exit_with_error(self, accert):
"""
Exits the program with an error message.
Parameters
----------
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
print("ERROR: model not found ")
print(accert.ref_model.value)
print("Exiting")
sys.exit(1)
[docs]
def finalize_process(self, c, ut, accert):
"""
Finalizes the process by extracting the affected variables, cost elements, and accounts.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
ut : Utility class
Utility class for processing user input.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
ut.extract_user_changed_variables(c)
# if the model is not fusion or user assigned then process the cost elements
# NOTE: Accert is the instance of the Accert class use Capital A
if self.cel_tabl:
# NOTE the extract_affected_cost_elements will not be executed for fusion model
ut.extract_affected_cost_elements(c)
self.update_new_cost_elements(c)
ut.print_updated_cost_elements(c)
self.roll_up_cost_elements(c)
else:
# if the model is fusion or user assigned model without cost elements
# then the update_new_accounts will be executed otherwise the update_new_cost_elements should be executed
ut.extract_affected_accounts(c)
self.update_new_accounts(c)
[docs]
def generate_results(self, c, ut, accert):
"""
Generates the results.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
ut : Utility class
Utility class for processing user input.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
model = Accert.ref_model
if model:
# generate results for the models in the future we can add more models
self._generate_common_results(c, ut, accert, model)
if self.cel_tabl:
self.generate_results_table_with_cost_elements(c, conn, level=3)
self.generate_results_table(c, conn, level=3)
[docs]
def _generate_common_results(self, c, ut, accert, model):
"""
Generates the common results for the models.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
ut : Utility class
Utility class for processing user input.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
model : str
"""
if model == "abr1000" or model == "heatpipe" or model == "lfr":
self._common_cost_processing(c, accert)
fac, lab, mat = self.cal_direct_cost_elements(c)
all_flag = model != "lfr"
self._print_results(ut, c, fac, lab, mat, all_flag)
elif model == "pwr12-be":
self._pwr12be_processing(c, ut, accert)
else:
self._no_cost_element_processing(c, ut, accert)
[docs]
def _common_cost_processing(self, c, accert):
"""
Common cost processing for the models with limited accounts.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
self.sum_cost_elements_2C(c)
self.update_account_table_by_cost_elements(c)
self.check_and_process_total_cost(c, accert)
self.roll_up_lmt_account_table(c)
[docs]
def _print_results(self, ut, c, fac, lab, mat, all_flag):
"""
Prints the results.
Parameters
----------
ut : Utility class
Utility class for processing user input.
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
fac : float
Factory cost.
lab : float
Labor cost.
mat : float
Material cost.
all_flag : bool
Flag to print all accounts.
"""
print(' Generating results table for review '.center(100, '='))
print('\n')
if self.use_gncoa:
ut.print_leveled_accounts_gncoa(c, all=False, cost_unit='million', level=3)
else:
ut.print_leveled_accounts(c, all=all_flag, tol_fac=fac, tol_lab=lab, tol_mat=mat, cost_unit='million', level=3)
[docs]
def _pwr12be_processing(self, c, ut, accert):
"""
Processing for the pwr12-be model.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
ut : Utility class
Utility class for processing user input.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
self.update_account_table_by_cost_elements(c)
self.check_and_process_total_cost(c, accert)
if self.use_gncoa:
self.roll_up_account_table_GNCOA(c)
print(' Generating results table for review '.center(100, '='))
print('\n')
ut.print_leveled_accounts_gncoa(c, all=False, cost_unit='million', level=3)
else:
self.roll_up_account_table(c, from_level=3, to_level=0)
print(' Generating results table for review '.center(100, '='))
print('\n')
ut.print_leveled_accounts(c, all=True, cost_unit='million', level=3)
[docs]
def _no_cost_element_processing(self, c, ut, accert):
"""
Processing for the fusion model.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
ut : Utility class
Utility class for processing user input.
accert : ACCERT
xml2obj class instantiates objects that can parse the ACCERT XML file.
"""
self.check_and_process_total_cost(c, accert)
self.roll_up_account_table(c, from_level=4, to_level=0)
print(' Generating results table for review '.center(100, '='))
print('\n')
ut.print_leveled_accounts(c, all=False, cost_unit='million', level=4)
[docs]
def generate_results_table_with_cost_elements(self, c, conn, level=3):
"""
Generates the results table with cost elements.
Parameters
----------
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
conn : MySQLConnection
MySQLConnection class instantiates objects that represent a connection to the MySQL database server.
level : int
Level of the account.
"""
self._generate_excel(c, '_variable_affected_cost_elements.xlsx', 'extract_affected_cost_elements_w_dis', self.cel_tabl, self.var_tabl)
self._generate_excel(c, '_updated_cost_element.xlsx', 'print_updated_cost_elements', self.cel_tabl, remove_last_col=True)
[docs]
def _generate_excel(self, c, filename_suffix, proc_name, *args, remove_last_col=False):
"""
Generate an Excel file from stored procedure results.
Parameters:
c : MySQLCursor
MySQLCursor class instantiates objects that can execute MySQL statements.
proc_name : str
Name of the stored procedure.
filename_suffix : str
Suffix of the filename.
args : tuple
Arguments for the stored procedure.
remove_last_col : bool
Remove the last column if required.
"""
c.callproc(proc_name, args)
for itered in c.stored_results():
results = itered.fetchall()
field_names = [i[0] for i in itered.description]
df = pd.DataFrame(results, columns=field_names)
if remove_last_col:
df = df.iloc[:, :-1] # Remove the last column if required
filename = str(self.ref_model) + filename_suffix
df.to_excel(filename, index=False)
print(f"Successfully created excel file {filename}")
[docs]
def generate_results_table(self, c, conn, level=3):
"""
Generates the results tables.
"""
self._generate_excel(c, '_updated_account.xlsx', 'print_leveled_accounts_simple', self.acc_tabl, level)
if __name__ == "__main__":
"""
main driver
"""
stdoutOrigin=sys.stdout
sys.stdout = open("output.out", "w")
if len(sys.argv) == 1:
print("PLEASE ADD [Input_file_for_ACCERT]")
sys.exit(-1)
code_folder = os.path.dirname(os.path.abspath(__file__))
initfile = os.path.join(code_folder, 'install.conf')
ins = configparser.ConfigParser()
ins.read(initfile)
passwd = ins.get("INSTALL","PASSWD")
conn = mysql.connector.connect(
host="localhost",
user="root",
password=passwd,
database="accert_db",
auth_plugin="mysql_native_password"
)
# conn.commit()
# NOTE: cursor is a class that instantiates objects that can execute MySQL statements
# only commit when you are sure that the transaction is complete
c = conn.cursor()
ut = Utility_methods()
accert_path = os.path.abspath(os.path.join(code_folder, os.pardir))
user_input = sys.argv[2]
if os.path.exists(user_input):
input_path = os.path.abspath(user_input)
else:
print('ACCERT did not find the input file {}'.format(user_input))
raise SystemExit
Accert = Accert(input_path,accert_path)
Accert.execute_accert(c,ut)