276 lines
11 KiB
Python
276 lines
11 KiB
Python
from typing import List, Dict, Union, Any, Optional
|
|
from enum import Enum
|
|
import abc
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from .util import Singleton
|
|
import json
|
|
import re
|
|
|
|
PackageVersion = str # a version string, such as 4.9.3
|
|
PackageId = str # a generic identifier of a package (eg libopenblas-pthread)
|
|
|
|
HostTypeId = str # uniquely identifies a ITargetHost instance eg fr.univ-rennes.ipr.cluster-node
|
|
|
|
|
|
class ITargetHost(abc.ABC):
|
|
"""the host that runs the benchmark"""
|
|
|
|
@abc.abstractmethod
|
|
def get_host_type_id(self) -> HostTypeId:
|
|
"""returns the unique identifier of tyis host type"""
|
|
|
|
@abc.abstractmethod
|
|
def get_package_default_version(self, package_id: PackageId) -> PackageVersion:
|
|
"""returns the latest installed version of the given package (eg '2021.1.2' for 'ifort')"""
|
|
|
|
@abc.abstractmethod
|
|
def get_package_activation_command(self, package_id: PackageId, package_version: PackageVersion) -> str:
|
|
"""returns the bash command to activate the given package
|
|
|
|
eg for package_id=='ifort' and package_version=='2021.1.2' return 'module load compilers/ifort/2021.1.2'
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
|
|
class Package():
|
|
"""a software component required by a benchmark (eg ifort 2019.1.2 as fortran_compiler)"""
|
|
package_id: str # eg 'ifort'
|
|
package_version: str # eg '2021.1.2'
|
|
|
|
def __init__(self, package_id: str, package_version: str, target_host: ITargetHost):
|
|
self.target_host = target_host
|
|
|
|
# resolve the package id, in case it contains keywords
|
|
resolved_package_id = ''
|
|
match = re.match(r'^<(?P<keyword>[a-z_]+)-(?P<arg1>[^>]+)>$', package_id)
|
|
if match:
|
|
keyword = match['keyword']
|
|
arg1 = match['arg1']
|
|
if keyword == 'default':
|
|
package_type = arg1 # eg 'libblas'
|
|
resolved_package_id = target_host.get_default_alternative(package_type)
|
|
else:
|
|
raise ValueError(f'unknown keyword {keyword}')
|
|
else:
|
|
if package_id.find('<') != -1 or package_id.find('>') != -1:
|
|
raise ValueError(f'unexpected syntax for package id {package_id}')
|
|
resolved_package_id = package_id
|
|
assert resolved_package_id != ''
|
|
|
|
if package_version == '<default>':
|
|
resolved_package_version = target_host.get_package_default_version(resolved_package_id)
|
|
else:
|
|
resolved_package_version = package_version
|
|
self.package_id = resolved_package_id
|
|
self.package_version = resolved_package_version
|
|
|
|
def __repr__(self) -> str:
|
|
return f'{self.package_id}:{self.package_version}'
|
|
|
|
|
|
BenchmarkId = str # a unique name for a benchmark, eg 'matmul1'
|
|
ResultsDbId = str # a unique name for a results database, eg 'tsv-files'
|
|
BenchParamId = str
|
|
BenchParamType = Union[int, str, float, datetime, Package]
|
|
BenchmarkConfig = Dict[BenchParamId, BenchParamType] # eg { 'compiler_id': 'gfortran', 'matrix_size': 1024 }
|
|
BenchmarkMeasurements = Dict[BenchParamId, BenchParamType] # eg { 'matrix_multiplication_avg_duration': 3.14 }
|
|
BenchmarkParamValues = Dict[BenchParamId, BenchParamType]
|
|
|
|
|
|
class BenchParam():
|
|
'''a parameter of a benchmark
|
|
|
|
for example the id of the compiler, the cpu id, the size of the matrix, etc.
|
|
|
|
'''
|
|
class Type(Enum):
|
|
PARAM_TYPE_STRING = 0
|
|
PARAM_TYPE_INT = 1
|
|
PARAM_TYPE_FLOAT = 2
|
|
PARAM_TYPE_TIME = 3
|
|
PARAM_TYPE_PACKAGE = 4
|
|
|
|
name: BenchParamId # the name of the parameter, eg 'matrix_size'
|
|
param_type: Type # the type of the parameter, eg 'PARAM_TYPE_INT'
|
|
description: str # the description of the parameter, eg 'the size n of the n*n matrix '
|
|
|
|
def __init__(self, name: str, param_type: Type, description: str):
|
|
self.name = name
|
|
self.param_type = param_type
|
|
self.description = description
|
|
|
|
|
|
BenchmarkAutoParams = List[BenchParam]
|
|
BenchmarkInputParams = List[BenchParam]
|
|
BenchmarkOutputParams = List[BenchParam]
|
|
|
|
|
|
class IBenchmark(abc.ABC):
|
|
|
|
# resultsdb_id: ResultsDbId
|
|
bench_id: BenchmarkId # a unique name for this benchmark, eg 'matmul1'
|
|
common_params: List[BenchParam]
|
|
bench_params: BenchmarkInputParams
|
|
out_params: BenchmarkOutputParams
|
|
|
|
# def __init__(self, resultsdb_id: ResultsDbId, bench_id: str, bench_params: BenchmarkInputParams, out_params: BenchmarkOutputParams):
|
|
def __init__(self, bench_id: str, bench_params: BenchmarkInputParams, out_params: BenchmarkOutputParams, common_params: List[BenchParam]):
|
|
# self.resultsdb_id = resultsdb_id
|
|
self.bench_id = bench_id
|
|
self.common_params = common_params
|
|
self.bench_params = bench_params
|
|
self.out_params = out_params
|
|
|
|
@abc.abstractmethod
|
|
def get_ram_requirements(self, config: BenchmarkConfig) -> int:
|
|
"""returns the ram requirements for this benchmark, in bytes
|
|
"""
|
|
|
|
@abc.abstractmethod
|
|
def execute(self, config: BenchmarkConfig, benchmark_output_dir: Path, target_host: ITargetHost) -> BenchmarkMeasurements:
|
|
"""execute the benchmark for the given config
|
|
"""
|
|
|
|
# @abc.abstractmethod
|
|
# def get_measurements(self, benchmark_output_dir: Path) -> BenchmarkMeasurements:
|
|
# """parses benchmark_output_dir to collect the benchmark's measurements
|
|
# """
|
|
|
|
def load_config(self, config_as_json: str, target_host: ITargetHost) -> BenchmarkConfig:
|
|
benchmark_config = json.loads(config_as_json)
|
|
IBenchmark.validate_config(benchmark_config, self.bench_params + self.common_params, self.bench_id)
|
|
resolved_benchmark_config = self.resolve_config(benchmark_config, target_host)
|
|
return resolved_benchmark_config
|
|
|
|
def get_bench_param(self, param_name: str) -> Optional[BenchParam]:
|
|
for bench_param in self.bench_params + self.common_params:
|
|
if bench_param.name == param_name:
|
|
return bench_param
|
|
return None
|
|
|
|
def resolve_config(self, config: BenchmarkConfig, target_host: ITargetHost) -> BenchmarkConfig:
|
|
|
|
resolved_benchmark_config = {}
|
|
for param_name, param_value in config.items():
|
|
bench_param = self.get_bench_param(param_name)
|
|
if bench_param.param_type == BenchParam.Type.PARAM_TYPE_PACKAGE:
|
|
[package_id, package_version] = param_value.split(':')
|
|
resolved_value = Package(package_id, package_version, target_host)
|
|
else:
|
|
resolved_value = param_value
|
|
resolved_benchmark_config[param_name] = resolved_value
|
|
return resolved_benchmark_config
|
|
|
|
@staticmethod
|
|
def validate_config(config: BenchmarkConfig, params: List[BenchParam], benchid: BenchmarkId):
|
|
"""checks that all benchmark parameters have been set in the given config"""
|
|
for bench_param in params:
|
|
try:
|
|
_ = config[bench_param.name]
|
|
except KeyError:
|
|
assert False, f'failed to find the benchmark parameter {bench_param.name} in the benchmark config'
|
|
# check that all parameters in benchmark config exist as parameters for this benchmark
|
|
for param_name in config.keys():
|
|
param_exists = False
|
|
for bench_param in params:
|
|
if bench_param.name == param_name:
|
|
param_exists = True
|
|
break
|
|
assert param_exists, f'parameter {param_name} doesn\'t exist for benchmark {benchid}'
|
|
|
|
|
|
class IResultsTable(abc.ABC):
|
|
""""""
|
|
results_db: 'IResultsDb'
|
|
benchmark: IBenchmark # the benchmark recorded by this table
|
|
|
|
def __init__(self, results_db: 'IResultsDb', benchmark: IBenchmark):
|
|
self.results_db = results_db
|
|
self.benchmark = benchmark
|
|
|
|
@abc.abstractmethod
|
|
def add_benchmark(self, benchmark_record: BenchmarkParamValues):
|
|
"""adds a benchmark record to this table
|
|
|
|
a benchmark record represents a row of values in a benchmark results table; it contains the benchmark's results, along with the configuration parameters and the BenchmarkAutoParams. For exemple { 'measurement_time': datetime.(2024, 10, 24, 16, 34, 41), 'cpu': 'intel_xeon_6348r', 'matrix_size': 1024, 'duration': 0.522}
|
|
"""
|
|
|
|
def add_results(self, benchmark_config: BenchmarkConfig, benchmark_measurements: BenchmarkMeasurements):
|
|
auto_values = self.results_db.get_auto_param_values()
|
|
benchmark_record = {**auto_values, **benchmark_config, **benchmark_measurements}
|
|
self.add_benchmark(benchmark_record)
|
|
|
|
def get_params(self) -> List[BenchParam]:
|
|
"""returns the ordered list of all columns in this table (a column is described by a parameter)"""
|
|
params = [auto_param.bench_param for auto_param in self.results_db.auto_params] + self.results_db.common_params + self.benchmark.bench_params + self.benchmark.out_params
|
|
return params
|
|
|
|
|
|
class IAutoParam(abc.ABC):
|
|
bench_param: BenchParam
|
|
|
|
def __init__(self, bench_param: BenchParam):
|
|
self.bench_param = bench_param
|
|
|
|
@abc.abstractmethod
|
|
def get_value(self) -> BenchParamType:
|
|
pass
|
|
|
|
|
|
class IResultsDb(abc.ABC):
|
|
"""the results database (contains IResultsTable instances)"""
|
|
auto_params: List[IAutoParam] # parameters that are common to all benchmarks and that are filled automatically
|
|
common_params: List[BenchParam] # parameters that are common to all benchmarks but are not filled automatically (they have to be filled by the user)
|
|
|
|
def __init__(self):
|
|
self.auto_params = []
|
|
self.common_params = []
|
|
|
|
def add_auto_param(self, auto_param: IAutoParam):
|
|
self.auto_params.append(auto_param)
|
|
|
|
def add_common_param(self, param: BenchParam):
|
|
self.common_params.append(param)
|
|
|
|
def get_auto_param_values(self) -> BenchmarkParamValues:
|
|
param_values = {}
|
|
for auto_param in self.auto_params:
|
|
param_values[auto_param.bench_param.name] = auto_param.get_value()
|
|
return param_values
|
|
|
|
@abc.abstractmethod
|
|
def get_table(self, benchmark: IBenchmark) -> IResultsTable:
|
|
pass
|
|
|
|
|
|
ResultsDbParams = Dict[str, Any]
|
|
|
|
|
|
class IResultsDbCreator(abc.ABC):
|
|
resultsdb_id: ResultsDbId
|
|
|
|
def __init__(self, resultsdb_id: ResultsDbId):
|
|
self.resultsdb_id = resultsdb_id
|
|
|
|
def get_resultsdb_id(self) -> ResultsDbId:
|
|
return self.resultsdb_id
|
|
|
|
@abc.abstractmethod
|
|
def create_resultsdb(self, resultsdb_config: ResultsDbParams) -> IResultsDb:
|
|
pass
|
|
|
|
|
|
class ResultsDbFactory(metaclass=Singleton):
|
|
|
|
resultsdb_creators: Dict[ResultsDbId, IResultsDbCreator]
|
|
|
|
def __init__(self):
|
|
self.resultsdb_creators = {}
|
|
|
|
def register_resultsdb_creator(self, resultsdb_creator: IResultsDbCreator):
|
|
self.resultsdb_creators[resultsdb_creator.get_resultsdb_id()] = resultsdb_creator
|
|
|
|
def create_resultsdb(self, resultsdb_id: ResultsDbId, params: ResultsDbParams) -> IResultsDb:
|
|
return self.resultsdb_creators[resultsdb_id].create_resultsdb(params)
|