Source code for utility_accert

from prettytable import PrettyTable
import textwrap

class Utility_methods:
    """
    Utility class
    """

[docs] def __init__(self): self.acc_tabl = None self.cel_tabl = None self.var_tabl = None self.vlk_tabl = None self.alg_tabl = None self.esc_tabl = None self.fac_tabl = None pass
[docs] def setup_table_names(self,c,Accert): self.acc_tabl = Accert.acc_tabl self.cel_tabl = Accert.cel_tabl self.var_tabl = Accert.var_tabl self.alg_tabl = Accert.alg_tabl self.esc_tabl = Accert.esc_tabl self.fac_tabl = Accert.fac_tabl self.gncoa_map = Accert.gncoa_map return None
[docs] def print_table(self, c, align_key=None,align=None,format_col=None): """Prints the table in an organized format via the PrettyTable library. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. align_key : list[str], optional List of column names to align. (By default none) align : list[str], optional List of alignments. Left, right or center. (By default none) format_col : list[str], optional List of column names to format. (By default none) """ for itered in c.stored_results(): results = itered.fetchall() field_names = [i[0] for i in itered.description] # results = c.fetchall() # columns = c.description # field_names = [i[0] for i in c.description] x = PrettyTable(field_names) for row in results: row = list(row) if format_col: for i in format_col: row[i-1]= '{:,.2f}'.format(row[i-1]) x.add_row(row) if align_key: for i,k in enumerate(align_key): x.align[k] = align[i] print('\n') print (x) print('\n') return None
[docs] def print_account(self, c, all=False, cost_unit='dollar',level=3): """Prints the account table. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. all : bool, optional If True, print all the accounts columns. (By default False) cost_unit : str, optional Unit of the total cost. (By default 'dollar') level : int, optional Level of account. (By default 3) """ if all: # DELIMITER $$ # CREATE DEFINER=`root`@`localhost` PROCEDURE `print_account_all`(IN table_name varchar(50), # IN level int) # BEGIN # SET @stmt=CONCAT('SELECT * FROM ',table_name,' WHERE level <= ?'); # PREPARE stmt FROM @stmt; # SET @level=level; # EXECUTE stmt USING @level; # DEALLOCATE PREPARE stmt; # END$$ # DELIMITER ; c.callproc('print_account_all', (self.acc_tabl,level)) # c.execute("""SELECT * # FROM account # WHERE level <= %(u_i_level)s;""",{'u_i_level': str(level)}) else: # DELIMITER $$ # CREATE DEFINER=`root`@`localhost` PROCEDURE `print_account_simple`(IN table_name varchar(50), # IN level int) # BEGIN # SET @stmt=CONCAT('SELECT ind, # code_of_account, # account_description, # total_cost, # unit, # level, # review_status # FROM ',table_name,' WHERE level <= ?'); # PREPARE stmt FROM @stmt; # SET @level=level; # EXECUTE stmt USING @level; # DEALLOCATE PREPARE stmt; # END$$ # DELIMITER ; c.callproc('print_account_simple', (self.acc_tabl,level)) # c.execute("""SELECT ind, # code_of_account, # account_description, # total_cost, # unit, # level, # review_status # FROM account # WHERE level <= %(u_i_level)s;""",{'u_i_level': str(level)}) align_key=["code_of_accout", "account_description", "total_cost"] align=[ "l", "l", "r"] if cost_unit=='million': for row in c.stored_results(): results = row.fetchall() field_names = [i[0] for i in row.description] # results = c.fetchall() # columns = c.description # field_names = [i[0] for i in c.description] x = PrettyTable(field_names) for row in results: row = list(row) # NOTE the index of the row need to have a function row[3]= '{:,.3f}'.format(row[3]/1000000) row[4]= 'million' x.add_row(row) if align_key: for i,k in enumerate(align_key): x.align[k] = align[i] print (x) else: # print(c) self.print_table(c, align_key, align) return None
[docs] def print_leveled_accounts(self, c, all=False,tol_fac=None,tol_lab= None,tol_mat=None, cost_unit='dollar',level=3): """Prints the output account table with COA line up as a nested list. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. all : bool, optional If True, print all the accounts columns. (By default False) cost_unit : str, optional Unit of the total cost. (By default 'dollar') level : int, optional Level of account. (By default 3) """ if all: # DELIMITER $$ # CREATE DEFINER=`root`@`localhost` PROCEDURE `print_leveled_accounts_all`(IN acc_table varchar(50), # IN cel_table varchar(50), # IN level int) # BEGIN # SET @stmt=CONCAT('SELECT acc.level, # rankedcoa.COA as code_of_account, # acc.account_description, # sorted_ce.fac_cost, # sorted_ce.lab_cost, # sorted_ce.mat_cost, # acc.total_cost, # acc.unit, # acc.review_status # FROM ',acc_table,' as acc # JOIN # (SELECT node.code_of_account, # CONCAT( REPEAT(" ", COUNT(parent.code_of_account) - 1), node.code_of_account) AS COA # FROM ',acc_table,' AS node, # ',acc_table,' AS parent # WHERE node.lft BETWEEN parent.lft AND parent.rgt # GROUP BY node.code_of_account) as rankedcoa # ON acc.code_of_account=rankedcoa.code_of_account # JOIN (SELECT splt_act.code_of_account, # cef.cost_2017 as fac_cost, # cel.cost_2017 as lab_cost, # cem.cost_2017 as mat_cost # FROM # (SELECT code_of_account,total_cost, # SUBSTRING_INDEX(SUBSTRING_INDEX(cost_elements, ",", 1), ",", -1) AS fac_name, # SUBSTRING_INDEX(SUBSTRING_INDEX(cost_elements, ",", 2), ",", -1) AS lab_name, # SUBSTRING_INDEX(SUBSTRING_INDEX(cost_elements, ",", 3), ",", -1) AS mat_name # FROM ',acc_table,') as splt_act # LEFT JOIN ',cel_table,' as cef # ON cef.cost_element= splt_act.fac_name # LEFT JOIN ',cel_table,' as cel # ON cel.cost_element= splt_act.lab_name # LEFT JOIN ',cel_table,' as cem # ON cem.cost_element= splt_act.mat_name) as sorted_ce # ON sorted_ce.code_of_account=acc.code_of_account # WHERE acc.level <= ? # ORDER BY acc.lft;'); # PREPARE stmt FROM @stmt; # SET @level=level; # EXECUTE stmt USING @level; # DEALLOCATE PREPARE stmt; # END$$ # DELIMITER ; c.callproc('print_leveled_accounts_all', (self.acc_tabl,self.cel_tabl,level)) align_key=["code_of_account", "account_description", "fac_cost", "lab_cost", "mat_cost", "total_cost"] align=[ "l", "l", "r", "r", "r", "r"] else: # DELIMITER $$ # CREATE DEFINER=`root`@`localhost` PROCEDURE `print_leveled_accounts_simple`(IN acc_table VARCHAR(255), IN level INT) # BEGIN # SET @stmt = CONCAT('SELECT rankedcoa.code_of_account, # acc.account_description, # acc.total_cost, # acc.unit, # acc.level, # acc.review_status # FROM ',acc_table,' as acc # JOIN # (SELECT node.code_of_account AS COA , CONCAT( REPEAT(" ", COUNT(parent.code_of_account) - 1), node.code_of_account) AS code_of_account # FROM ',acc_table,' AS node, # ',acc_table,' AS parent # WHERE node.lft BETWEEN parent.lft AND parent.rgt # GROUP BY node.code_of_account) as rankedcoa # ON acc.code_of_account=rankedcoa.COA # WHERE acc.level <= ?'); # PREPARE stmt FROM @stmt; # SET @level=level; # EXECUTE stmt USING @level; # DEALLOCATE PREPARE stmt; # END$$ # DELIMITER ; c.callproc('print_leveled_accounts_simple', (self.acc_tabl,level)) # c.execute("""SELECT rankedcoa.code_of_account, # account.account_description, # account.total_cost, # account.unit, # account.level, # account.review_status # FROM account # JOIN # ( # SELECT node.code_of_account AS COA, CONCAT( REPEAT(" ", COUNT(parent.code_of_account) - 1), node.code_of_account) AS code_of_account # FROM account AS node, # account AS parent # WHERE node.lft BETWEEN parent.lft AND parent.rgt # GROUP BY node.code_of_account) as rankedcoa # ON account.code_of_account=rankedcoa.COA # WHERE account.level <= %(u_i_level)s # ORDER BY account.lft;""",{'u_i_level': str(level)}) align_key=["code_of_account", "account_description", "total_cost"] align=[ "l", "l", "r"] if cost_unit=='million': for row in c.stored_results(): results = row.fetchall() field_names = [i[0] for i in row.description] # results = c.fetchall() # columns = c.description # field_names = [i[0] for i in c.description] x = PrettyTable(field_names) for idx, row in enumerate(results): row = list(row) if all: # if index is 0, and tol_fac, tol_lab, tol_mat are not None, format the values if idx == 0 and tol_fac and tol_lab and tol_mat: # First row special formatting row[3] = "{:,.2f}".format(tol_fac / 1000000) row[4] = "{:,.2f}".format(tol_lab / 1000000) row[5] = "{:,.2f}".format(tol_mat / 1000000) row[6] = "{:,.2f}".format(row[6] / 1000000) else: # Format other rows or print 0 if value is None row[3:7] = ['{:,.2f}'.format(x / 1000000) if x else '0' for x in row[3:7]] else: # Format only the third column for other cases row[2] = '{:,.2f}'.format(row[2] / 1000000) x.add_row(row) if align_key: for i,k in enumerate(align_key): x.align[k] = align[i] print(x) else: self.print_table(c, align_key, align) return None
[docs] def print_leveled_accounts_gncoa(self, c, all=False, cost_unit='dollar',level=3): """Prints the output account table with GNCOA line up as a nested list. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. all : bool, optional If True, print all the accounts columns. (By default False) cost_unit : str, optional Unit of the total cost. (By default 'dollar') level : int, optional Level of account. (By default 3) """ # place holder for printing the GNCOA cost elements in a nested list # this needs to be implemented in the future since the GNCOA cost elements # are not rolled up in the current database # c.callproc('print_leveled_accounts_gn_all', (self.acc_tabl,self.cel_tabl,level)) # all=True # tol_fac=None # tol_lab=None # tol_mat=None c.callproc('print_leveled_accounts_gn', (self.acc_tabl,self.gncoa_map,level)) align_key=["gncoa","gncoa_description", "total_cost"] align=[ "l","l", "r"] if cost_unit=='million': for row in c.stored_results(): results = row.fetchall() field_names = [i[0] for i in row.description] x = PrettyTable(field_names) for idx, row in enumerate(results): row = list(row) if all: # if index is 0, and tol_fac, tol_lab, tol_mat are not None, format the values if idx == 0 and tol_fac and tol_lab and tol_mat: # First row special formatting row[3] = "{:,.2f}".format(tol_fac / 1000000) row[4] = "{:,.2f}".format(tol_lab / 1000000) row[5] = "{:,.2f}".format(tol_mat / 1000000) row[6] = "{:,.2f}".format(row[6] / 1000000) else: # Format other rows or print 0 if value is None row[3:7] = ['{:,.2f}'.format(x / 1000000) if x else '0' for x in row[3:7]] else: # Format only the third column for other cases if row[2]: row[2] = '{:,.2f}'.format(row[2] / 1000000) else: row[2] = 0 x.add_row(row) if align_key: for i,k in enumerate(align_key): x.align[k] = align[i] print(x) else: self.print_table(c) return None
[docs] def print_algorithm(self, c): """Prints the output algorithm table. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. """ # DELIMITER $$ # CREATE DEFINER=`root`@`localhost` PROCEDURE `print_table`(IN table_name VARCHAR(255)) # BEGIN # SET @stmt = CONCAT('SELECT * FROM ',table_name); # PREPARE stmt FROM @stmt; # EXECUTE stmt; # DEALLOCATE PREPARE stmt; # END$$ # DELIMITER ; c.callproc('print_table', (self.alg_tabl,)) self.print_table(c) return None
[docs] def print_cost_element(self, c): """Prints the output cost element table. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. """ c.callproc('print_table', (self.cel_tabl,)) self.print_table(c) return None
[docs] def print_facility(self, c): """Prints the output facility table. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. """ c.callproc('print_table', (self.fac_tabl,)) # c.execute("""SELECT * # FROM facility; # """) self.print_table(c) return None
[docs] def print_escalation(self, c): """Prints the output escalation table. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. """ c.callproc('print_table', (self.esc_tabl,)) # c.execute("""SELECT * # FROM escalation; # """) self.print_table(c) return None
[docs] def print_variable(self, c): """Prints the output variable table. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. """ c.callproc('print_table', (self.var_tabl,)) # c.execute("""SELECT * # FROM variable; # """) self.print_table(c) return None
[docs] def print_user_request_parameter(self,c, all=False): """Prints the output user request parameter table. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. all : bool, optional If True, prints all columns. (By default False) """ # DELIMITER $$ # CREATE DEFINER=`root`@`localhost` PROCEDURE `print_user_request_parameter`(IN all_col BOOLEAN, # IN var_table VARCHAR(50), # IN vlk_table VARCHAR(50)) # BEGIN # IF all_col THEN # SET @stmt = CONCAT('SELECT va.ind, va.var_name, affectv.ce_affected FROM ',var_table,' as va JOIN # (SELECT variable, group_concat(ce) as ce_affected # FROM ',vlk_table,' as vlk # group by variable) as affectv on va.var_name = affectv.variable # where va.var_value IS NULL # order by va.ind'); # ELSE # SET @stmt = CONCAT('SELECT va.var_name, affectv.ce_affected FROM ',var_table,' as va JOIN # (SELECT variable, group_concat(ce) as ce_affected # FROM ',vlk_table,' as vlk # group by variable) as affectv on va.var_name = affectv.variable # where va.var_value IS NULL # order by va.ind;'); # END IF; # PREPARE stmt FROM @stmt; # EXECUTE stmt; # DEALLOCATE PREPARE stmt; # END$$ # DELIMITER ; if all: c.callproc('print_user_request_parameter', (True, self.var_tabl, self.cel_tabl)) self.print_table(c) else: c.callproc('print_user_request_parameter', (False, self.var_tabl, self.cel_tabl)) # c.execute("""SELECT va.var_name,affectv.ce_affected # FROM accert_db_test.variable as va JOIN # (SELECT variable,group_concat(ce) as ce_affected # FROM accert_db_test.variable_links # group by variable) as affectv # on va.var_name = affectv.variable # where va.var_value IS NULL # order by va.ind;""") for row in c.stored_results(): results = row.fetchall() for row in results: print('Parameter "{}" is required for cost elements:'.format(row[0])) # print(row[1]) print('{}\n'.format(textwrap.fill(row[1], 100))) # print('Parameter "{}" is required\n'.format(row[0])) return None
[docs] def print_updated_cost_elements(self, c): """Prints the output updated cost elements table. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. """ # DELIMITER $$ # CREATE DEFINER=`root`@`localhost` PROCEDURE `print_updated_cost_elements`(IN cel_table VARCHAR(50)) # BEGIN # SET @stmt = CONCAT('SELECT ind, # cost_element, # cost_2017, # sup_cost_ele, # account, # updated # FROM ',cel_table,' # WHERE updated = 1'); # PREPARE stmt FROM @stmt; # EXECUTE stmt; # DEALLOCATE PREPARE stmt; # END$$ # DELIMITER ; c.callproc('print_updated_cost_elements', (self.cel_tabl,)) self.print_table(c)
[docs] def extract_affected_cost_elements(self,c): """Extracts affected cost elements from cost element table and groups them by changed variables. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. """ print(' Extracting affected cost elements '.center(100,'=')) print('\n') # DELIMITER $$ # CREATE DEFINER=`root`@`localhost` PROCEDURE `extract_affected_cost_elements`(IN cel_table varchar(50), # IN var_table varchar(50)) # BEGIN # SET @stmt = CONCAT("SELECT va.var_name, (SELECT GROUP_CONCAT(ce.cost_element SEPARATOR ', ') # FROM ", cel_table, " ce # WHERE FIND_IN_SET(va.var_name, REPLACE(ce.variables, ' ', '')) > 0) AS ce_affected # FROM # (SELECT * FROM ",var_table," # WHERE user_input = 1) as va # WHERE (SELECT GROUP_CONCAT(ce.cost_element SEPARATOR ', ') # FROM ", cel_table, " ce # WHERE FIND_IN_SET(va.var_name, REPLACE(ce.variables, ' ', '')) > 0) IS NOT NULL # "); # PREPARE stmt FROM @stmt; # EXECUTE stmt; # DEALLOCATE PREPARE stmt; # END # DELIMITER ; c.callproc('extract_affected_cost_elements',(self.cel_tabl,self.var_tabl)) for row in c.stored_results(): results = row.fetchall() for row in results: print('variable "{}" affects cost element(s):'.format(row[0])) print('{}\n'.format(textwrap.fill(row[1], 100))) return None
[docs] def extract_affected_accounts(self,c): """ Extracts affected accounts from account table. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. """ print('Extracting affected accounts'.center(100,'=')) # DELIMITER $$ # CREATE DEFINER=`root@`localhost` PROCEDURE `extract_affected_accounts`(IN acc_table VARCHAR(50), # IN var_table VARCHAR(50)) # BEGIN # SET @stmt = CONCAT('SELECT va.var_name, # (SELECT GROUP_CONCAT(ac.code_of_account SEPARATOR ", ") # FROM ',acc_table,' ac # WHERE FIND_IN_SET(va.var_name, REPLACE(ac.variables, " ", "")) > 0) AS ac_affected # FROM # (SELECT * FROM ',var_table,' # WHERE user_input = 1) as va # WHERE (SELECT GROUP_CONCAT(ac.code_of_account SEPARATOR ", ") # FROM ',acc_table,' ac # WHERE FIND_IN_SET(va.var_name, REPLACE(ac.variables, " ", "")) > 0) IS NOT NULL;'); # PREPARE stmt FROM @stmt; # EXECUTE stmt; # DEALLOCATE PREPARE stmt; # END$$ # DELIMITER ; c.callproc('extract_affected_accounts',(self.acc_tabl,self.var_tabl)) for row in c.stored_results(): results = row.fetchall() for row in results: print('variable "{}" affects account(s):'.format(row[0])) print('{}\n'.format(textwrap.fill(row[1], 100))) return None
[docs] def extract_user_changed_variables(self,c): """Extracts user changed variables from variable table. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. """ print('Extracting user changed variables'.center(100,'=')) # DELIMITER $$ # CREATE DEFINER=`root`@`localhost` PROCEDURE `extract_user_changed_variables`(IN table_name VARCHAR(50)) # BEGIN # SET @stmt = CONCAT('SELECT var_name,var_description, var_value, var_unit # FROM ', table_name, ' WHERE user_input = 1 ORDER BY var_name;'); # PREPARE stmt FROM @stmt; # EXECUTE stmt; # DEALLOCATE PREPARE stmt; # END$$ # DELIMITER ; c.callproc('extract_user_changed_variables',(self.var_tabl,)) # c.execute("""SELECT var_name,var_description, var_value, var_unit # FROM `accert_db_test`.`variable` # WHERE user_input = 1 # ORDER BY var_name;""") self.print_table(c,format_col=[3]) return None
[docs] def extract_changed_cost_elements(self,c): """Extracts changed cost elements from the cost element table. Parameters ---------- c : MySQLCursor MySQLCursor class instantiates objects that can execute MySQL statements. """ # DELIMITER $$ # CREATE DEFINER=`root`@`localhost` PROCEDURE `extract_changed_cost_elements`(IN cel_table VARCHAR(50)) # BEGIN # SET @stmt = CONCAT('SELECT cost_element, cost_2017 # FROM ',cel_table,' # WHERE updated != 0 # ORDER BY account, cost_element;'); # PREPARE stmt FROM @stmt; # EXECUTE stmt; # DEALLOCATE PREPARE stmt; # END$$ # DELIMITER ; print('Extracting changed cost elements'.center(100,'=')) c.callproc('extract_changed_cost_elements',(self.cel_tabl,)) # c.execute("""SELECT cost_element, cost_2017 # FROM `accert_db_test`.`cost_element` # WHERE updated != 0 # ORDER BY account, cost_element;""") self.print_table(c,format_col=[2]) return None