# This code is part of KQCircuits
# Copyright (C) 2022 IQM Finland Oy
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program. If not, see
# https://www.gnu.org/licenses/gpl-3.0.html.
#
# The software distribution should follow IQM trademark policy for open-source software
# (meetiqm.com/iqm-open-source-trademark-policy). IQM welcomes contributions to the code.
# Please see our contribution agreements for individuals (meetiqm.com/iqm-individual-contributor-license-agreement)
# and organizations (meetiqm.com/iqm-organization-contributor-license-agreement).
import os
import stat
import logging
import json
import argparse
import platform
import copy
from dataclasses import replace
from pathlib import Path
from typing import Sequence, Union, Tuple, Dict, Optional
from kqcircuits.simulations.export.simulation_export import (
copy_content_into_directory,
get_post_process_command_lines,
get_combined_parameters,
export_simulation_json,
)
from kqcircuits.simulations.export.simulation_validate import validate_simulation
from kqcircuits.simulations.export.util import export_layers
from kqcircuits.util.export_helper import write_commit_reference_file
from kqcircuits.defaults import ELMER_SCRIPT_PATHS, KQC_REMOTE_ACCOUNT, SIM_SCRIPT_PATH
from kqcircuits.simulations.simulation import Simulation
from kqcircuits.simulations.cross_section_simulation import CrossSectionSimulation
from kqcircuits.simulations.export.elmer.elmer_solution import ElmerEPR3DSolution, ElmerSolution, get_elmer_solution
from kqcircuits.simulations.post_process import PostProcess
[docs]
def export_elmer_json(
simulation: Union[Simulation, CrossSectionSimulation], solution: ElmerSolution, path: Path, workflow: dict
):
"""
Export Elmer simulation into json and gds files.
Args:
simulation: The simulation to be exported.
solution: The solution to be exported.
path: Location where to write json and gds files.
workflow: Parameters for simulation workflow
Returns:
Path to exported json file.
"""
is_cross_section = isinstance(simulation, CrossSectionSimulation)
if simulation is None or not isinstance(simulation, (Simulation, CrossSectionSimulation)):
raise ValueError("Cannot export without simulation")
# write .gds file
gds_file = simulation.name + ".gds"
gds_file_path = str(path.joinpath(gds_file))
if not Path(gds_file_path).exists():
export_layers(
gds_file_path,
simulation.layout,
[simulation.cell],
output_format="GDS2",
layers=simulation.get_layers(),
)
sim_data = simulation.get_simulation_data()
sol_data = solution.get_solution_data()
full_name = simulation.name + solution.name
if is_cross_section:
sif_names = [f"{full_name}_C"]
if sol_data["run_inductance_sim"]:
if any((london > 0 for london in sim_data["london_penetration_depth"].values())):
sif_names += [f"{full_name}_L"]
else:
sif_names += [f"{full_name}_C0"]
elif solution.tool == "wave_equation":
if solution.sweep_type == "interpolating":
sif_names = []
else:
sif_names = [full_name + "_f" + str(f).replace(".", "_") for f in sol_data["frequency"]]
else:
sif_names = [full_name]
json_data = {
"name": full_name,
"workflow": workflow,
**sim_data,
**sol_data,
"sif_names": sif_names,
"gds_file": gds_file,
"parameters": get_combined_parameters(simulation, solution),
}
# write .json file
json_file_path = str(path.joinpath(full_name + ".json"))
export_simulation_json(json_data, json_file_path)
return json_file_path
[docs]
def export_elmer_script(
json_filenames,
path: Path,
workflow=None,
script_folder: str = "scripts",
file_prefix: str = "simulation",
script_file: str = "run.py",
post_process=None,
compile_elmer_modules=False,
):
"""
Create script files for running one or more simulations.
Create also a main script to launch all the simulations at once.
Args:
json_filenames: List of paths to json files to be included into the script.
path: Location where to write the script file.
workflow: Parameters for simulation workflow
script_folder: Path to the Elmer-scripts folder.
file_prefix: File prefix of the script file to be created.
script_file: Name of the script file to run.
post_process: List of PostProcess objects, a single PostProcess object, or None to be executed after simulations
compile_elmer_modules: Compile custom Elmer energy integration module at runtime. Not supported on Windows.
Returns:
Path of exported main script file
"""
if workflow is None:
workflow = {}
sbatch = "sbatch_parameters" in workflow
python_executable = workflow.get("python_executable", "python")
main_script_filename = str(path.joinpath(file_prefix + ".sh"))
execution_script = Path(script_folder).joinpath(script_file)
n_jsons = len(json_filenames)
elmer_compile_str = (
'echo "Compiling Elmer modules"\n'
f"elmerf90 -fcheck=all {script_folder}/SaveBoundaryEnergy.F90 -o SaveBoundaryEnergy > /dev/null\n"
)
path.joinpath("log_files").mkdir(parents=True, exist_ok=True)
def _write_script(filename, lines, interp_line="#!/bin/bash\n"):
"""Writes a script and makes it executable.
`lines` is a string or list of strings that will be written into `filename`"""
with open(filename, "w", encoding="utf-8") as file:
if isinstance(lines, list):
lines = "".join(lines)
file.write(interp_line + lines)
# change permissions
os.chmod(filename, os.stat(filename).st_mode | stat.S_IEXEC)
def _get_from_json(json_filename, keys):
with open(json_filename, encoding="utf-8") as f:
json_data = json.load(f)
# return as a tuple
return (*[json_data[k] for k in keys],)
def _sim_part_echo(i, part):
return f'echo "Simulation {i + 1}/{n_jsons} {part}"\n'
if sbatch:
def _multiply_time(time_str, multiplier):
"""
Helper function to multiply a time of format "HH:MM:SS" with a constant. In
this case, we multiply timeout per simulation by the number of simulations
Args:
time_str (str): Time in format "HH:MM:SS"
multiplier (float): multiplier
Returns:
New time string in format "HH:MM:SS"
"""
time_str = time_str.strip()
if len(time_str) != 8:
raise ValueError('Invalid sbatch/slurm time formatting! Format has to be "HH:MM:SS"')
hours = int(int(time_str[0:2]) * multiplier)
minutes = int(int(time_str[3:5]) * multiplier)
seconds = int(int(time_str[6:8]) * multiplier)
if seconds > 60:
minutes = minutes + seconds // 60
seconds = seconds % 60
if minutes > 60:
hours = hours + minutes // 60
minutes = minutes % 60
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def _multiply_mem(mem_str, multiplier):
"""
Helper function to multiply a memory specification with a constant. If multiplier < 1,
will convert to using smaller units if possible
Args:
mem_str (str): Amount of memory in format "1234X" where "X" specifies the unit
and anything before is a number specifying the amount
multiplier (float): multiplier
Returns:
New memory string
"""
original_memstr = mem_str.strip()
unit = original_memstr[-1]
if unit in ("K", "M", "G", "T"):
original_mem_int = int(original_memstr.partition(unit)[0])
else:
original_mem_int = int(original_memstr)
unit = "M"
downconversion = {"M": "K", "G": "M", "T": "G"}
if multiplier < 1.0 and unit != "K":
elmer_mem_per_f = int(original_mem_int * 1024 * multiplier)
elmer_mem_per_f = str(elmer_mem_per_f) + downconversion[unit]
else:
elmer_mem_per_f = int(original_mem_int * multiplier)
elmer_mem_per_f = str(elmer_mem_per_f) + unit
return elmer_mem_per_f
def _divup(a, b):
return -(a // -b)
def _get_srun_command(nodes, tasks, cores, memory):
return f"srun -N {nodes} -n {tasks} -c {cores} --cpu-bind none --exact --mem={memory}"
def _get_sbatch_lines(sbatch_settings):
lines = []
for s_key, s_value in sbatch_settings.items():
lines.append(f"#SBATCH {s_key}={s_value}\n")
lines += [
"\n# set the number of threads based on --cpus-per-task\n",
"export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK\n\n",
]
return lines
sbatch_parameters = workflow["sbatch_parameters"]
parallelization_level = workflow["_parallelization_level"]
n_simulations = workflow["_n_simulations"]
_n_workers = int(sbatch_parameters.pop("n_workers", 1))
if parallelization_level == "full_simulation":
n_workers_full = _n_workers
n_workers_elmer_only = 1
n_simulations_gmsh = n_simulations
elif parallelization_level == "elmer":
n_workers_full = 1
n_workers_elmer_only = _n_workers
n_simulations_gmsh = 1
elif parallelization_level == "none":
n_workers_full = 1
n_workers_elmer_only = 1
n_simulations_gmsh = 1
else:
logging.warning(f"Unknown parallelization level {parallelization_level}")
if sbatch_parameters.get("--account", "project_0") == "project_0":
sbatch_parameters["--account"] = KQC_REMOTE_ACCOUNT
common_keys = [k for k in sbatch_parameters.keys() if k.startswith("--")]
sbatch_settings_elmer = {k: sbatch_parameters.pop(k) for k in common_keys}
sbatch_settings_meshes = sbatch_settings_elmer.copy()
max_cpus_per_node = int(sbatch_parameters.pop("max_threads_per_node", 40))
elmer_tasks_per_worker = int(sbatch_parameters.pop("elmer_n_processes", 10))
elmer_cpus_per_task = int(sbatch_parameters.pop("elmer_n_threads", 1))
if elmer_cpus_per_task > 1 and elmer_tasks_per_worker > 1:
logging.warning(
"Using both process and thread level parallelization with Elmer might result in poor performance"
)
elmer_cpus_per_worker = elmer_tasks_per_worker * elmer_cpus_per_task
elmer_mem_per_worker = sbatch_parameters.pop("elmer_mem", "64G")
if elmer_cpus_per_worker > max_cpus_per_node:
elmer_nodes_per_worker = _divup(elmer_cpus_per_worker, max_cpus_per_node)
elmer_total_nodes = elmer_nodes_per_worker * _n_workers
elmer_tasks_per_node = min(elmer_tasks_per_worker, max_cpus_per_node)
elmer_mem_per_node = _multiply_mem(elmer_mem_per_worker, _n_workers / elmer_total_nodes)
else:
elmer_nodes_per_worker = 1
elmer_max_workers_per_node = max_cpus_per_node // elmer_cpus_per_worker
elmer_total_nodes = _divup(_n_workers, elmer_max_workers_per_node)
elmer_workers_per_node = min(_n_workers, elmer_max_workers_per_node)
elmer_tasks_per_node = elmer_workers_per_node * elmer_tasks_per_worker
elmer_mem_per_node = _multiply_mem(elmer_mem_per_worker, elmer_workers_per_node)
gmsh_cpus_per_worker = int(sbatch_parameters.pop("gmsh_n_threads", 10))
if gmsh_cpus_per_worker > max_cpus_per_node:
raise RuntimeError(
f"Requested more gmsh threads per worker {gmsh_cpus_per_worker}"
f" than the limit per node {max_cpus_per_node}"
)
gmsh_max_workers_per_node = max_cpus_per_node // gmsh_cpus_per_worker
gmsh_n_nodes = _divup(n_workers_full, gmsh_max_workers_per_node)
gmsh_mem_per_worker = sbatch_parameters.pop("gmsh_mem", "64G")
gmsh_workers_per_node = min(n_workers_full, gmsh_max_workers_per_node)
gmsh_mem_per_node = _multiply_mem(gmsh_mem_per_worker, gmsh_workers_per_node)
sbatch_settings_elmer["--time"] = _multiply_time(
sbatch_parameters.pop("elmer_time", "00:10:00"), _divup(n_simulations, _n_workers)
)
sbatch_settings_elmer["--nodes"] = elmer_total_nodes
sbatch_settings_elmer["--ntasks-per-node"] = elmer_tasks_per_node
sbatch_settings_elmer["--cpus-per-task"] = elmer_cpus_per_task
sbatch_settings_elmer["--mem"] = elmer_mem_per_node
sbatch_settings_meshes["--time"] = _multiply_time(
sbatch_parameters.pop("gmsh_time", "00:10:00"), _divup(n_simulations_gmsh, n_workers_full)
)
sbatch_settings_meshes["--nodes"] = gmsh_n_nodes
sbatch_settings_meshes["--ntasks-per-node"] = gmsh_workers_per_node
sbatch_settings_meshes["--cpus-per-task"] = gmsh_cpus_per_worker
sbatch_settings_meshes["--mem"] = gmsh_mem_per_node
srun_cmd_gmsh = _get_srun_command(1, 1, gmsh_cpus_per_worker, gmsh_mem_per_worker)
srun_cmd_script = _get_srun_command(1, 1, 1, elmer_mem_per_worker)
srun_cmd_elmer = _get_srun_command(
elmer_nodes_per_worker, elmer_tasks_per_worker, elmer_cpus_per_task, elmer_mem_per_worker
)
if len(sbatch_parameters) > 0:
logging.warning("Unused sbatch parameters: ")
for k, v in sbatch_parameters.items():
logging.warning(f"{k} : {v}")
meshes_script_lines = _get_sbatch_lines(sbatch_settings_meshes)
if compile_elmer_modules:
meshes_script_lines.append(elmer_compile_str)
for i, json_filename in enumerate(json_filenames):
(simulation_name,) = _get_from_json(json_filename, ["name"])
python_run_cmd = f'{python_executable} -u "{execution_script}" "{Path(json_filename).relative_to(path)}"'
def get_log_cmd(logfile_suffix, filename=simulation_name):
return f'2>&1 >> "log_files/{filename}.{logfile_suffix}.log"\n'
script_lines = [
"set -e\n",
_sim_part_echo(i, "Gmsh"),
f'{srun_cmd_gmsh} {python_run_cmd} --only-gmsh -q {get_log_cmd("Gmsh")}',
_sim_part_echo(i, "ElmerGrid"),
f'{srun_cmd_gmsh} ElmerGrid 14 2 "{simulation_name}.msh" {get_log_cmd("ElmerGrid")}',
]
if int(elmer_tasks_per_worker) > 1:
script_lines.append(
f'{srun_cmd_gmsh} ElmerGrid 2 2 "{simulation_name}" -metis {elmer_tasks_per_worker}'
f' 4 --partdual --removeunused {get_log_cmd("ElmerGrid")}'
)
script_lines += [
_sim_part_echo(i, "Write Elmer sif files"),
f'{srun_cmd_gmsh} {python_run_cmd} --only-elmer-sifs {get_log_cmd("Elmer_sifs")}',
]
script_filename_meshes = str(path.joinpath(simulation_name + "_meshes.sh"))
_write_script(script_filename_meshes, script_lines)
meshes_script_lines += [
f'echo "Submitting gmsh and ElmerGrid part {i + 1}/{n_jsons}"\n',
'echo "--------------------------------------------"\n',
f'source "{Path(script_filename_meshes).relative_to(path)}" &\n',
]
if (i + 1) % n_workers_full == 0 or (i + 1) == n_jsons:
meshes_script_lines.append("wait\n")
_write_script(str(path.joinpath(file_prefix + "_meshes.sh")), meshes_script_lines)
main_script_lines = _get_sbatch_lines(sbatch_settings_elmer)
for i, json_filename in enumerate(json_filenames):
simulation_name, sif_names = _get_from_json(json_filename, ["name", "sif_names"])
sifs_split = [
sif_names[i : min(i + n_workers_elmer_only, len(sif_names))]
for i in range(0, len(sif_names), n_workers_elmer_only)
]
python_run_cmd = f'{python_executable} -u "{execution_script}" "{Path(json_filename).relative_to(path)}"'
def get_log_cmd(logfile_suffix, filename=simulation_name): # pylint: disable=function-redefined
return f'2>&1 >> "log_files/{filename}.{logfile_suffix}.log"\n'
script_lines = ["set -e\n", _sim_part_echo(i, "Elmer")]
for sif_list in sifs_split:
for sif in sif_list:
sif_path = f"{simulation_name}/{sif}.sif"
script_lines.append(
f'{srun_cmd_elmer} ElmerSolver_mpi "{sif_path}" 2>&1 >> "log_files/{sif}.Elmer.log" & \n'
)
script_lines.append("wait\n")
script_lines += [
_sim_part_echo(i, "write results json"),
f'{srun_cmd_script} {python_run_cmd} --write-project-results {get_log_cmd("write_project_results")}',
]
script_filename = str(path.joinpath(simulation_name + ".sh"))
_write_script(script_filename, script_lines)
main_script_lines += [
f'echo "Submitting ElmerSolver part {i + 1}/{n_jsons}"\n',
'echo "--------------------------------------------"\n',
f'source "{Path(script_filename).relative_to(path)}" &\n',
]
if (i + 1) % n_workers_full == 0 or (i + 1) == n_jsons:
main_script_lines.append("wait\n")
main_script_lines += [
'echo "--------------------------------------------"\n',
'echo "Write versions file"\n',
f"{srun_cmd_script} {python_run_cmd} --write-versions-file\n",
]
else: # local workflow
n_workers = workflow.get("n_workers", 1)
parallelization_level = workflow["_parallelization_level"]
parallelize_workload = parallelization_level == "full_simulation" and n_workers > 1
main_script_lines = []
if compile_elmer_modules:
main_script_lines.append(elmer_compile_str)
if parallelize_workload:
main_script_lines.append(f"export OMP_NUM_THREADS={workflow['elmer_n_threads']}\n")
main_script_lines.append(f"{python_executable} {script_folder}/simple_workload_manager.py {n_workers}")
for i, json_filename in enumerate(json_filenames):
(simulation_name,) = _get_from_json(json_filename, ["name"])
python_run_cmd = f'{python_executable} "{execution_script}" "{Path(json_filename).relative_to(path)}"'
def get_log_cmd(logfile_suffix, filename=simulation_name):
return f'2>&1 >> "log_files/{filename}.{logfile_suffix}.log"\n'
script_filename = str(path.joinpath(simulation_name + ".sh"))
script_lines = [
"set -e\n",
_sim_part_echo(i, "Gmsh"),
f'{python_run_cmd} --only-gmsh {get_log_cmd("Gmsh")}',
_sim_part_echo(i, "ElmerGrid"),
f'{python_run_cmd} --only-elmergrid {get_log_cmd("ElmerGrid")}',
_sim_part_echo(i, "Write Elmer sif files"),
f'{python_run_cmd} --only-elmer-sifs {get_log_cmd("Elmer_sifs")}',
_sim_part_echo(i, "Elmer"),
f"{python_run_cmd} --only-elmer\n",
_sim_part_echo(i, "Paraview"),
f"{python_run_cmd} --only-paraview\n",
_sim_part_echo(i, "Write results json"),
f'{python_run_cmd} --write-project-results {get_log_cmd("write_project_results")}',
]
_write_script(script_filename, script_lines)
if parallelize_workload:
main_script_lines.append(f' "./{Path(script_filename).relative_to(path)}"')
else:
main_script_lines += [
f'echo "Submitting the main script of simulation {i + 1}/{n_jsons}"\n',
'echo "--------------------------------------------"\n',
f'"./{Path(script_filename).relative_to(path)}"\n',
]
main_script_lines += [
'echo "--------------------------------------------"\n',
'echo "Write versions file"\n',
f"{python_run_cmd} --write-versions-file\n",
]
main_script_lines.append("\n" + get_post_process_command_lines(post_process, path, json_filenames))
_write_script(main_script_filename, main_script_lines)
return main_script_filename
[docs]
def export_elmer(
simulations: Sequence[
Union[
Simulation,
Tuple[Simulation, ElmerSolution],
CrossSectionSimulation,
Tuple[CrossSectionSimulation, ElmerSolution],
]
],
path: Path,
script_folder: str = "scripts",
file_prefix: str = "simulation",
script_file: str = "run.py",
workflow: Optional[Dict] = None,
skip_errors: bool = False,
post_process: Optional[Union[PostProcess, Sequence[PostProcess]]] = None,
**solution_params,
) -> Path:
"""
Exports an elmer simulation model to the simulation path.
Args:
simulations: List of Simulation objects or tuples containing Simulation and Solution objects.
path: Location where to output the simulation model
script_folder: Path to the Elmer-scripts folder.
file_prefix: File prefix of the script file to be created.
script_file: Name of the script file to run.
workflow: Parameters for simulation workflow
skip_errors: Skip simulations that cause errors. (Default: False)
.. warning::
**Use this carefully**, some of your simulations might not make sense physically and
you might end up wasting time on bad simulations.
post_process: List of PostProcess objects, a single PostProcess object, or None to be executed after simulations
solution_params: ElmerSolution parameters if simulations is a list of Simulation objects.
Returns:
Path to exported script file.
"""
common_sol = None if all(isinstance(s, Sequence) for s in simulations) else get_elmer_solution(**solution_params)
workflow = _update_elmer_workflow(simulations, common_sol, workflow)
# If doing 3D epr simulations the custom Elmer energy integration module is compiled at runtime
epr_sim = _is_epr_sim(simulations, common_sol)
script_paths = ELMER_SCRIPT_PATHS + [SIM_SCRIPT_PATH / "elmer_modules"] if epr_sim else ELMER_SCRIPT_PATHS
write_commit_reference_file(path)
copy_content_into_directory(script_paths, path, script_folder)
def make_names_elmer_compatible(sim, sol):
"""Replace dots with dashes and make lowercase"""
sim.name = sim.name.replace(".", "-").lower()
sol_name = sol.name.replace(".", "-").lower()
if 2 * (len(sim.name) + len(sol_name)) + len("_20np01_t0001.vtu") >= 128:
logging.warning(
"Simulation and solution names might be too long for Elmer."
"Try to shorten them or create the sweep manually"
)
return (sim, replace(sol, name=sol_name))
json_filenames = []
for sim_sol in simulations:
simulation, solution = sim_sol if isinstance(sim_sol, Sequence) else (sim_sol, common_sol)
simulation, solution = make_names_elmer_compatible(simulation, solution)
validate_simulation(simulation, solution)
try:
json_filenames.append(export_elmer_json(simulation, solution, path, workflow))
except (IndexError, ValueError, Exception) as e: # pylint: disable=broad-except
if skip_errors:
logging.warning(
f"Simulation {simulation.name} skipped due to {e.args}. "
"Some of your other simulations might not make sense geometrically. "
"Disable `skip_errors` to see the full traceback."
)
else:
raise UserWarning(
"Generating simulation failed. You can discard the errors using `skip_errors` in `export_elmer`. "
"Moreover, `skip_errors` enables visual inspection of failed and successful simulation "
"geometry files."
) from e
return export_elmer_script(
json_filenames,
path,
workflow,
script_folder=script_folder,
file_prefix=file_prefix,
script_file=script_file,
post_process=post_process,
compile_elmer_modules=epr_sim,
)
def _is_epr_sim(simulations, common_sol):
"""Helper to check if doing 3D epr simulation"""
epr_sim = False
if common_sol is None:
if any(isinstance(simsol[1], ElmerEPR3DSolution) for simsol in simulations):
epr_sim = True
elif isinstance(common_sol, ElmerEPR3DSolution):
epr_sim = True
if epr_sim and platform.system() == "Windows":
logging.warning("Elmer 3D EPR Simulations are not supported on Windows")
return epr_sim
def _update_elmer_workflow(simulations, common_solution, workflow):
"""
Modify workflow based on number of simulations and available computing resources
Args:
simulations: List of Simulation objects or tuples containing Simulation and Solution objects.
common_solution: Solution object if not contained in `simulations`
workflow: workflow to be updated
Returns:
Updated workflow
"""
if workflow is None:
workflow = {}
workflow = copy.deepcopy(workflow)
parallelization_level = "none"
n_worker_lim = 1
num_sims = len(simulations)
if num_sims == 1:
sol_obj = simulations[0][1] if common_solution is None else common_solution
if sol_obj.tool == "wave_equation" and len(sol_obj.frequency) > 1:
parallelization_level = "elmer"
n_worker_lim = len(sol_obj.frequency)
elif num_sims > 1:
# TODO enable Elmer level parallelism with solution sweep
n_worker_lim = num_sims
parallelization_level = "full_simulation"
workflow["_parallelization_level"] = parallelization_level
workflow["_n_simulations"] = n_worker_lim
if "sbatch_parameters" in workflow:
n_workers = workflow["sbatch_parameters"].get("n_workers", 1.0)
workflow["sbatch_parameters"]["n_workers"] = min(int(n_workers), n_worker_lim)
workflow.pop("elmer_n_processes", "")
workflow.pop("elmer_n_threads", "")
workflow.pop("n_workers", "")
workflow.pop("gmsh_n_threads", "")
else:
n_workers = workflow.get("n_workers", 1)
n_processes = workflow.get("elmer_n_processes", 1)
n_threads = workflow.get("elmer_n_threads", 1)
if n_processes > 1 and n_threads > 1:
logging.warning(
"Using both process and thread level parallelization with Elmer might result in poor performance"
)
# for the moment avoid psutil.cpu_count(logical=False)
max_cpus = int(os.cpu_count() / 2 + 0.5)
workflow["local_machine_cpu_count"] = max_cpus
if n_workers == -1:
n_processes = 1 if n_processes == -1 else n_processes
n_threads = 1 if n_threads == -1 else n_threads
n_workers = max(max_cpus // (n_threads * n_processes), 1)
n_workers = min(n_workers, n_worker_lim)
elif n_processes == -1:
n_workers = min(n_workers, n_worker_lim)
n_threads = 1 if n_threads == -1 else n_threads
n_processes = max(max_cpus // (n_threads * n_workers), 1)
elif n_threads == -1:
n_workers = min(n_workers, n_worker_lim)
n_threads = max(max_cpus // (n_processes * n_workers), 1)
requested_cpus = n_workers * n_processes * n_threads
if requested_cpus > max_cpus:
logging.warning(f"Requested more CPUs ({requested_cpus}) than available ({max_cpus})")
gmsh_n_threads = workflow.get("gmsh_n_threads", 1)
if gmsh_n_threads == -1:
if parallelization_level == "full_simulation":
gmsh_n_threads = max(max_cpus // n_workers, 1)
else:
gmsh_n_threads = max_cpus
workflow["n_workers"] = n_workers
workflow["elmer_n_processes"] = n_processes
workflow["elmer_n_threads"] = n_threads
workflow["gmsh_n_threads"] = gmsh_n_threads
parser = argparse.ArgumentParser()
parser.add_argument("-q", "--quiet", action="store_true")
args, _ = parser.parse_known_args()
if args.quiet:
workflow.update(
{
"run_gmsh_gui": False, # For GMSH: if true, the mesh is shown after it is done
# (for large meshes this can take a long time)
"run_paraview": False, # this is visual view of the results
}
)
return workflow