concho/concho/hpe.py

432 lines
22 KiB
Python

from typing import Dict, List, Any
from pathlib import Path
from concho.config import Module
from concho.config import Option
from concho.config import Config
from concho.config import Chassis
from concho.config import Cpu, Dimm
from concho.config import IHtmlConfiguratorParser
# from xml.dom.minidom import parse
from lxml.html import parse as parse_html
import re
def clean_string(string):
single_graphic_character_introducer = '\x99' # found in 'AMD EPYC 7262' after EPYC, god knows why
return string.replace(single_graphic_character_introducer, '')
def parse_cpu_label(label: str) -> str:
# Intel Xeon-Silver 4214 (2.2GHz/12-core/85W) FIO Processor Kit for HPE ProLiant DL360 Gen10
match = re.match(r'^Intel Xeon-(?P<cpu_class>Silver|Gold|Platinum) (?P<cpu_number>[0-9][0-9][0-9][0-9][^ ]?).*', label)
if match:
cpu_class = match['cpu_class'].lower()
cpu_id = "intel-xeon-%s-%s" % (cpu_class, match['cpu_number'].lower())
return cpu_id
match = re.match(r'^AMD EPYC (?P<cpu_number>[0-9][0-9][0-9F][0-9]).*', label)
if match:
cpu_id = "amd-epyc-%s" % (match['cpu_number'].lower())
return cpu_id
assert False, 'unhandled label : %s' % label
def hpe_product_get_attr(hpe_product_node: dict, attr_name: str) -> str:
for attr_node in hpe_product_node['attributes']:
if attr_node['id'] == attr_name:
return attr_node['value']
class HpeCatalogParser():
def __init__(self, hpe_catalog: dict):
self.hpe_catalog = hpe_catalog # the catalog of options in the form of a dictionary that is found in the javascript code of hpe's web page
def get_base_price(self):
base_price = 0.0
for component_db in self.hpe_catalog:
print('component %s' % component_db['id'])
for item_node in component_db['products']:
if item_node['selected']:
quantity = 1
# quantity_as_str = item_node['selectedQuantity']
quantity_as_str = hpe_product_get_attr(item_node, 'selectedQty')
if quantity_as_str:
quantity = int(quantity_as_str)
print('HpeCatalogParser.get_base_price : adding %d * %f (%s)' % (quantity, item_node['price'], item_node['desc']))
base_price += float(item_node['price']) * quantity
return base_price
def parse_proc_change_options(self):
prim_proc_db = HpeConfiguratorParser._find_child_with_id(self.hpe_catalog, 'ProcessorSection.PrimaryProcessorChoice')
assert prim_proc_db is not None
proc_options = Module('processor-change')
for proc_node in prim_proc_db['products']:
label = proc_node['desc']
price = float(proc_node['price'])
num_cpus = 1
cpu_id = parse_cpu_label(label)
print('cpu_id: ', cpu_id)
option = Option(Cpu(cpu_id), price / num_cpus)
# print('parse_proc_change_options : adding cpu %s (price = %f)' % (cpu_id, price / num_cpus))
proc_options.add_option(option)
return proc_options
def parse_proc_options(self):
add_proc_db = HpeConfiguratorParser._find_child_with_id(self.hpe_catalog, 'ProcessorSection.AdditionalProcessorsChoice')
assert add_proc_db is not None
proc_options = Module('processor')
for proc_node in add_proc_db['products']:
label = proc_node['desc']
price = float(proc_node['price'])
num_additional_cpus = 1
match = re.match(r'^Intel Xeon-(?P<cpu_class>Silver|Gold|Platinum) (?P<cpu_number>[0-9][0-9][0-9][0-9][RLYU]?).*', label)
if match:
cpu_class = match['cpu_class'].lower()
cpu_id = "intel-xeon-%s-%s" % (cpu_class, match['cpu_number'].lower())
print('cpu_id: ', cpu_id)
assert match, 'unhandled label : %s' % label
option = Option(Cpu(cpu_id), price / num_additional_cpus)
proc_options.add_option(option)
return proc_options
def get_mem_section_id(self):
return 'memory.memorySlots'
def parse_ram_options(self):
mem_slots_db = HpeConfiguratorParser._find_child_with_id(self.hpe_catalog, self.get_mem_section_id())
assert mem_slots_db is not None
ram_options = Module('ram')
for ram_node in mem_slots_db['products']:
label = ram_node['desc']
price = float(ram_node['price'])
# HPE 32GB (1x32GB) Dual Rank x4 DDR4-2933 CAS-21-21-21 Registered Smart Memory Kit
match = re.match(r'^HPE (?P<num_gb>[0-9]+)GB \((?P<num_dimms>[0-9]+)x(?P<num_gb_per_dimm>[0-9]+)GB\) Dual Rank x[48] DDR4-(?P<num_mhz>[0-9]+).*', label)
assert match, 'unhandled label : %s' % label
num_gb = int(match['num_gb'])
num_mhz = int(match['num_mhz'])
mem_type = 'rdimm' # not sure about that
dimm = Dimm(num_gb=num_gb, num_mhz=num_mhz, mem_type=mem_type)
option = Option(dimm, price)
ram_options.add_option(option)
assert len(ram_options.options) > 0
return ram_options
def parse_base_config(self, configurator):
base_config = Config(configurator)
base_config.num_servers = configurator.chassis.item.max_num_servers
base_config.num_cpu_per_server = 1
prim_proc_db = HpeConfiguratorParser._find_child_with_id(self.hpe_catalog, 'ProcessorSection.PrimaryProcessorChoice')
assert prim_proc_db is not None
for proc_node in prim_proc_db['products']:
label = proc_node['desc']
cpu_id = parse_cpu_label(label)
if proc_node['selected']:
base_config.set_cpu(Cpu(cpu_id))
break
# initialize the default ram dimms
mem_slots_db = HpeConfiguratorParser._find_child_with_id(self.hpe_catalog, 'memory.memorySlots')
assert mem_slots_db is not None
selected_ram_node = None
for ram_node in mem_slots_db['products']:
if ram_node['selected']:
selected_ram_node = ram_node
break
label = selected_ram_node['desc']
# HPE 32GB (1x32GB) Dual Rank x4 DDR4-2933 CAS-21-21-21 Registered Smart Memory Kit
match = re.match(r'^HPE (?P<num_gb>[0-9]+)GB \((?P<num_dimms>[0-9]+)x(?P<num_gb_per_dimm>[0-9]+)GB\) Dual Rank x4 DDR4-(?P<num_mhz>[0-9]+).*', label)
assert match, 'unhandled label : %s' % label
dimm = Dimm(num_gb=int(match['num_gb_per_dimm']), num_mhz=int(match['num_mhz']), mem_type='rdimm')
num_dimms = int(match['num_dimms'])
if num_dimms == 1:
assert match['num_gb'] == match['num_gb_per_dimm']
# print(match['cpu_class'], match['cpu_number'])
cpu_slot_index = 0
mem_channel_index = 0
dimm_slot = 0
base_config.cpu_slots_mem[cpu_slot_index].mem_channels[mem_channel_index].dimms[dimm_slot] = dimm
else:
# evenly split dimms on channels
assert (num_dimms % base_config.num_cpu_per_server) == 0
num_dimms_per_cpu = num_dimms // base_config.num_cpu_per_server
for cpu_slot_index in range(base_config.num_cpu_per_server):
cpu_slots_mem = base_config.cpu_slots_mem[cpu_slot_index]
assert len(cpu_slots_mem.mem_channels) >= num_dimms_per_cpu
for channel_index in range(num_dimms_per_cpu):
mem_channel = cpu_slots_mem.mem_channels[channel_index]
dimm_slot = 0
mem_channel.dimms[dimm_slot] = dimm
return base_config
# parser for a second type of catalog where the processor options are not present
class HpeCatalogWoutCpuParser(HpeCatalogParser):
def get_mem_section_id(self):
return 'memory.memorySlotsChoice'
def parse_base_config(self, configurator):
base_config = Config(configurator)
base_config.num_servers = configurator.chassis.item.max_num_servers
base_config.num_cpu_per_server = 1
prim_proc_db = HpeConfiguratorParser._find_child_with_id(self.hpe_catalog, 'ProcessorSection.ProcessorChoice')
assert prim_proc_db is not None
for proc_node in prim_proc_db['products']:
label = proc_node['desc']
cpu_id = parse_cpu_label(label)
if proc_node['selected']:
base_config.set_cpu(Cpu(cpu_id))
break
# initialize the default ram dimms
mem_slots_db = HpeConfiguratorParser._find_child_with_id(self.hpe_catalog, 'memory.memorySlotsChoice')
assert mem_slots_db is not None
selected_ram_node = None
for ram_node in mem_slots_db['products']:
if ram_node['selected']:
selected_ram_node = ram_node
break
label = selected_ram_node['desc']
# HPE 32GB (1x32GB) Dual Rank x4 DDR4-2933 CAS-21-21-21 Registered Smart Memory Kit
match = re.match(r'^HPE (?P<num_gb>[0-9]+)GB \((?P<num_dimms>[0-9]+)x(?P<num_gb_per_dimm>[0-9]+)GB\) Dual Rank x4 DDR4-(?P<num_mhz>[0-9]+).*', label)
assert match, 'unhandled label : %s' % label
dimm = Dimm(num_gb=int(match['num_gb_per_dimm']), num_mhz=int(match['num_mhz']), mem_type='rdimm')
num_dimms = int(match['num_dimms'])
if num_dimms == 1:
assert match['num_gb'] == match['num_gb_per_dimm']
# print(match['cpu_class'], match['cpu_number'])
cpu_slot_index = 0
mem_channel_index = 0
dimm_slot = 0
base_config.cpu_slots_mem[cpu_slot_index].mem_channels[mem_channel_index].dimms[dimm_slot] = dimm
else:
# evenly split dimms on channels
assert (num_dimms % base_config.num_cpu_per_server) == 0
num_dimms_per_cpu = num_dimms // base_config.num_cpu_per_server
for cpu_slot_index in range(base_config.num_cpu_per_server):
cpu_slots_mem = base_config.cpu_slots_mem[cpu_slot_index]
assert len(cpu_slots_mem.mem_channels) >= num_dimms_per_cpu
for channel_index in range(num_dimms_per_cpu):
mem_channel = cpu_slots_mem.mem_channels[channel_index]
dimm_slot = 0
mem_channel.dimms[dimm_slot] = dimm
return base_config
class HpeConfiguratorParser(IHtmlConfiguratorParser):
def __init__(self):
pass
@staticmethod
def _deduce_base_cpu_price(base_cpu, cpu_options, additional_cpu_options):
'''
The price of the base config processor is not always available directly in the section 'additional processors' : as an example, r940's default processors are 2 xeon gold 5215 but it's not possible to add 2 other 5215 (probably because 5215 can only talk to another cpu, not 3). In this case the price of this base cpu can be deduced from the price for other cpus (difference between cpu upgrade and additional cpu)
Args:
base_cpu (Cpu): the cpu of the base configuration
cpu_options (Module): the available cpu options
additional_cpu_options (Module): the available additional cpu options
returns:
float: the estimated price of base_cpu
'''
base_cpu_price = None
for cpu_option in additional_cpu_options.options.values():
cpu = cpu_option.item
# assert cpu.uid in cpu_options.options, "unexpected case : %s is available in additional cpus but not in cpu upgrade options" % cpu.uid
if cpu.uid in cpu_options.options:
cpu_upgrade_option = cpu_options.options[cpu.uid]
deduced_base_cpu_price = cpu_option.price - cpu_upgrade_option.price
print('price of %s estimated from %s : %f (%f-%f)' % (base_cpu.uid, cpu.uid, deduced_base_cpu_price, cpu_option.price, cpu_upgrade_option.price))
if base_cpu_price is None:
base_cpu_price = deduced_base_cpu_price
else:
assert abs(base_cpu_price - deduced_base_cpu_price) <= 0.01
return base_cpu_price
@staticmethod
def _find_child_with_id(parent: List[Dict], child_id: str) -> Dict:
for child_dict in parent:
if child_dict['id'] == child_id:
return child_dict
assert False
@staticmethod
def _get_db_as_tree(hpe_configurator_html_file_path: Path) -> Dict[str, Any]:
html_root = parse_html(str(hpe_configurator_html_file_path)).getroot()
import json
script_elements = html_root.xpath(".//script[@type='text/javascript']")
print('number of javascript scripts:', len(script_elements))
# script type="text/javascript"
db_jscript = None
for script_element in script_elements:
script = script_element.text_content()
match = re.search('"price"', script)
if match:
db_jscript = script
assert db_jscript is not None
start_match = re.search('result: ', db_jscript)
assert start_match is not None
end_match = re.search(',[\n \t]+// productDetails: ,', db_jscript)
assert end_match is not None
db_as_json_str = db_jscript[start_match.end(): end_match.start()]
# print(db_as_json_str[0:20])
# print(db_as_json_str[-20:-1])
# with open('toto.json', 'w') as f:
# f.write(db_as_json_str)
db = json.loads(db_as_json_str)
hardware_db = HpeConfiguratorParser._find_child_with_id(db["configResponse"]["configuration"]["topLevels"], 'hardware')['subCategories']
# print(hardware_db)
# with open(hpe_configurator_html_file_path.with_suffix('.json'), 'w', encoding='utf-8') as f:
# json.dump(hardware_db, f, ensure_ascii=False, indent=4)
return hardware_db
def create_catalog_parser(self, hpe_catalog):
return HpeCatalogParser(hpe_catalog)
def parse_proc_change_options(self, hpe_configurator_html_file_path: Path):
hardware_db = HpeConfiguratorParser._get_db_as_tree(hpe_configurator_html_file_path)
# print(hardware_db)
catalog_parser = self.create_catalog_parser(hardware_db)
proc_change_module = catalog_parser.parse_proc_change_options()
return proc_change_module
def parse_proc_options(self, hpe_configurator_html_file_path: Path):
hardware_db = HpeConfiguratorParser._get_db_as_tree(hpe_configurator_html_file_path)
# print(hardware_db)
catalog_parser = self.create_catalog_parser(hardware_db)
proc_module = catalog_parser.parse_proc_options()
return proc_module
def parse(self, hpe_configurator_html_file_path: Path, configurator):
hardware_db = HpeConfiguratorParser._get_db_as_tree(hpe_configurator_html_file_path)
# print(hardware_db)
catalog_parser = self.create_catalog_parser(hardware_db)
print(type(catalog_parser))
base_model_node = HpeConfiguratorParser._find_child_with_id(hardware_db, 'baseModelSection.baseModelChoice')
assert base_model_node is not None
product_node = [p for p in base_model_node['products'] if p['selected']][0]
label = product_node['desc'] # eg "DL360 Gen10"
match = re.match(r'^[HPE]*[ ]*(?P<chassis_type>DL[0-9][0-9][0-9]) Gen(?P<generation>[0-9]+) *(?P<plus>[+]?)', label)
# match = re.match(r'^(?P<chassis_type>DL[0-9][0-9][0-9]) Gen(?P<generation>[0-9+]+)', label)
if not match:
# HPE ProLiant DL385 Gen10 Plus v2 8SFF Configure-to-order Server
match = re.match(r'^HPE ProLiant (?P<chassis_type>DL[0-9][0-9][0-9]) Gen(?P<generation>[0-9+]+) (?P<plus>Plus)', label)
assert match, 'unhandled label : %s' % label
plus = ''
if match['plus'] and match['plus'] != '':
plus = '+'
chassis_id = "hpe-proliant-%s-gen%s%s" % (match['chassis_type'].lower(), match['generation'].lower(), plus)
configurator.chassis = Option(Chassis(chassis_id), 0.0)
configurator.base_config = catalog_parser.parse_base_config(configurator)
proc_change_module = self.parse_proc_change_options(hpe_configurator_html_file_path)
configurator.add_module(proc_change_module)
print(type(catalog_parser))
configurator.add_module(self.parse_proc_options(hpe_configurator_html_file_path))
configurator.add_module(catalog_parser.parse_ram_options())
# compute the price of the base config cpu because for example in r940 configurator, the xeon gold 5215 appears in the basic config but not in additional cpus
base_cpu = configurator.base_config.cpu
if configurator.get_item_price(base_cpu.uid) is None:
base_cpu_price = HpeConfiguratorParser._deduce_base_cpu_price(base_cpu, proc_change_module, configurator.modules['processor'])
if base_cpu_price is None:
# in the case of r6525, there was no additional processor module, and therefore we have no way to estimate the price of the base processor (amd-epyc-7262)
# so we fallback to an hardcoded estimated price from wikipedia
base_cpu_price = {
'amd-epyc-7262': 550.0,
}[base_cpu.uid]
configurator.modules['processor'].add_option(Option(base_cpu, base_cpu_price))
assert configurator.get_item_price(base_cpu.uid) is not None, 'failed to find the price of base cpu %s' % base_cpu.uid
# compute the price of the chassis
base_price = catalog_parser.get_base_price()
# in case there's no additional processor modules, 'processor' module only has the base processor entry
# So we need to populate it with the processors coming from 'processor-change' module
for proc_change_option in configurator.modules['processor-change'].options.values():
cpu_already_exists = False
for proc_option in configurator.modules['processor'].options.values():
if proc_option.item.uid == proc_change_option.item.uid:
cpu_already_exists = True
break
if not cpu_already_exists:
cpu_price = proc_change_option.price + configurator.modules['processor'].options[base_cpu.uid].price
print('estimated price of cpu %s : %f' % (proc_change_option.item.uid, cpu_price))
configurator.modules['processor'].add_option(Option(Cpu(proc_change_option.item.uid), cpu_price))
# delete the 'processor-change' module as its items ids are the same as the ones in the 'processor' modules but their prices are 'wrong' (upgrade prices rather than item price).
# in a configuration, no item should be found more than once
del configurator.modules['processor-change']
one_cpu_price = configurator.get_item_price(configurator.base_config.cpu.uid)
ram_price = configurator.base_config.ram_price
print('HpeConfiguratorParser.parse : chassis_id=%s cpu.uid = %s, base_price=%f, ram_price=%f' % (chassis_id, configurator.base_config.cpu.uid, base_price, ram_price))
configurator.chassis.price = base_price - configurator.base_config.num_cpus * one_cpu_price - ram_price
# builds a configurator by parsing one of hpe configurator's pages
# as input, it expects a html page saved from a configurator in which the cpu choice only appears when the user sets a quantity of 0 in front of the selected cpu.
# So, to create a html file compatible with this parser:
# 1. on hpe's matinfo web page, choose a configuration for which the the cpu choice only appears when the user sets a quantity of 0 in front of the selected cpu (eg cat2-conf10)
# 2. on this page, set the number of cpu to 0. This causes a popup window to appear; this popup window contains all the cpu choices. Only when this popup window is shown save the page as html (because the catalog contained in the embedded javascript contains all the options except for cpu options).
class HpeCpuChoiceConfiguratorParser(HpeConfiguratorParser):
def get_xpath_filter(self, filter_id):
return {
'root_to_processors_element': ".//ul[@id='subcat_ProcessorSection.ProcessorChoice']",
'module_to_options': ".//li[@class='row']",
'option_to_label': ".//span[@class='lineitemdesc']",
'option_to_price': ".//span[@class='lineitemprice']",
}[filter_id]
def create_catalog_parser(self, hpe_catalog):
return HpeCatalogWoutCpuParser(hpe_catalog)
def parse_proc_change_options(self, hpe_configurator_html_file_path: Path):
# find the proc options in the cpu options popup window as these options are not present in the hpe_catalog of this page
html_root = parse_html(str(hpe_configurator_html_file_path)).getroot()
proc_options = Module('processor-change')
# module_root_element = self._get_module(html_root, 'Processeurs (Passage)')
module_root_element = html_root.xpath(self.get_xpath_filter('root_to_processors_element'))[0]
print('module_root_element :', module_root_element)
for option_root_element in module_root_element.xpath(self.get_xpath_filter('module_to_options')):
label_elements = option_root_element.xpath(self.get_xpath_filter('option_to_label'))
if len(label_elements) > 0:
label = clean_string(re.sub('[\n\t ]+', ' ', label_elements[0].text_content()))
price = float(option_root_element.xpath(self.get_xpath_filter('option_to_price'))[0].text_content().replace(',', ''))
print(label, price)
num_cpus = 1
cpu_id = parse_cpu_label(label)
# print(match['cpu_class'], match['cpu_number'])
option = Option(Cpu(cpu_id), price / num_cpus)
# print('_parse_proc_change_options : adding cpu %s (price = %f)' % (cpu_id, price / num_cpus))
proc_options.add_option(option)
return proc_options
def parse_proc_options(self, hpe_configurator_html_file_path: Path):
proc_options = self.parse_proc_change_options(hpe_configurator_html_file_path)
proc_options.name = 'processor'
return proc_options