# This code is part of KQCircuits
# Copyright (C) 2022 IQM Finland Oy
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program. If not, see
# https://www.gnu.org/licenses/gpl-3.0.html.
#
# The software distribution should follow IQM trademark policy for open-source software
# (meetiqm.com/developers/osstmpolicy). IQM welcomes contributions to the code. Please see our contribution agreements
# for individuals (meetiqm.com/developers/clas/individual) and organizations (meetiqm.com/developers/clas/organization).
import os
import stat
import logging
import json
import argparse
from pathlib import Path
from typing import Sequence
from kqcircuits.simulations.export.simulation_export import copy_content_into_directory, get_post_process_command_lines
from kqcircuits.simulations.export.util import export_layers
from kqcircuits.util.export_helper import write_commit_reference_file
from kqcircuits.defaults import ELMER_SCRIPT_PATHS, KQC_REMOTE_ACCOUNT
from kqcircuits.simulations.simulation import Simulation
from kqcircuits.simulations.cross_section_simulation import CrossSectionSimulation
from kqcircuits.util.geometry_json_encoder import GeometryJsonEncoder
import numpy as np
[docs]def export_elmer_json(
simulation,
path: Path,
tool="capacitance",
linear_system_method="bicgstab",
p_element_order=1,
frequency=5,
frequency_batch=3,
sweep_type="explicit",
max_delta_s=0.01,
mesh_size=None,
boundary_conditions=None,
workflow=None,
percent_error=0.005,
max_error_scale=2,
max_outlier_fraction=1e-3,
maximum_passes=1,
minimum_passes=1,
integrate_energies=False,
is_axisymmetric=False,
):
"""
Export Elmer simulation into json and gds files.
Args:
simulation: The simulation to be exported.
path: Location where to write json.
tool(str): Available: "capacitance", "wave_equation" and "cross-section" (Default: capacitance)
linear_system_method(str): Available: 'bicgstab', 'mg' (Default: bicgstab)
p_element_order(int): polynomial order of p-elements (Default: 1)
frequency: Units are in GHz. To set up multifrequency analysis, use list of numbers.
mesh_size(dict): Parameters to determine mesh element sizes
boundary_conditions(dict): Parameters to determine boundary conditions
workflow(dict): Parameters for simulation workflow
percent_error(float): Stopping criterion in adaptive meshing.
max_error_scale(float): Maximum element error, relative to percent_error, allowed in individual elements.
max_outlier_fraction(float): Maximum fraction of outliers from the total number of elements
maximum_passes(int): Maximum number of adaptive meshing iterations.
minimum_passes(int): Minimum number of adaptive meshing iterations.
integrate_energies: Calculate energy integrals over each object
is_axisymmetric(bool): Simulate with Axi Symmetric coordinates along :math:`y\\Big|_{x=0}` (Default: False)
Returns:
Path to exported json file.
"""
is_cross_section = isinstance(simulation, CrossSectionSimulation)
if simulation is None or not isinstance(simulation, (Simulation, CrossSectionSimulation)):
raise ValueError("Cannot export without simulation")
# collect data for .json file
if is_cross_section:
layers = simulation.layer_dict
sim_data = simulation.get_simulation_data()
simname = sim_data["parameters"]["name"]
if is_cross_section:
if sim_data.get("london_penetration_depth", 0.0) > 0:
sif_names = [f"{simname}_C", f"{simname}_L"]
else:
sif_names = [f"{simname}_C", f"{simname}_C0"]
elif tool == "wave_equation":
if sweep_type == "interpolating":
sif_names = []
else:
sif_names = [simname + "_f" + str(f).replace(".", "_") for f in frequency]
else:
sif_names = [simname]
json_data = {
"tool": tool,
**sim_data,
**({"layers": {k: (v.layer, v.datatype) for k, v in layers.items()}} if is_cross_section else {}),
"mesh_size": {} if mesh_size is None else mesh_size,
"boundary conditions": boundary_conditions,
"workflow": workflow,
"percent_error": percent_error,
"max_error_scale": max_error_scale,
"max_outlier_fraction": max_outlier_fraction,
"maximum_passes": maximum_passes,
"minimum_passes": minimum_passes,
"frequency": frequency,
"frequency_batch": frequency_batch,
"sweep_type": sweep_type,
"max_delta_s": max_delta_s,
"integrate_energies": integrate_energies,
"linear_system_method": linear_system_method,
"p_element_order": p_element_order,
"is_axisymmetric": is_axisymmetric,
"sif_names": sif_names,
}
# write .json file
json_filename = str(path.joinpath(simulation.name + ".json"))
with open(json_filename, "w") as fp:
json.dump(json_data, fp, cls=GeometryJsonEncoder, indent=4)
# write .gds file
gds_filename = str(path.joinpath(simulation.name + ".gds"))
export_layers(
gds_filename,
simulation.layout,
[simulation.cell],
output_format="GDS2",
layers=layers.values() if is_cross_section else simulation.get_layers(),
)
return json_filename
[docs]def export_elmer_script(
json_filenames,
path: Path,
workflow=None,
file_prefix="simulation",
execution_script="scripts/run.py",
post_process=None,
n_simulations=1,
):
"""
Create script files for running one or more simulations.
Create also a main script to launch all the simulations at once.
Args:
json_filenames: List of paths to json files to be included into the script.
path: Location where to write the script file.
workflow(dict): Parameters for simulation workflow
file_prefix: Name of the script file to be created.
execution_script: The script file to be executed.
n_simulations: Total number of simulations
post_process: List of PostProcess objects, a single PostProcess object, or None to be executed after simulations
Returns:
Path of exported main script file
"""
if workflow is None:
workflow = dict()
sbatch = "sbatch_parameters" in workflow
python_executable = workflow.get("python_executable", "python")
main_script_filename = str(path.joinpath(file_prefix + ".sh"))
path.joinpath("log_files").mkdir(parents=True, exist_ok=True)
if sbatch:
def _multiply_time(time_str, multiplier):
"""
Helper function to multiply a time of format "HH:MM:SS" with a constant. In
this case, we multiply timeout per simulation by the number of simulations
Args:
time_str (str): Time in format "HH:MM:SS"
multiplier (float): multiplier
Returns:
New time string in format "HH:MM:SS"
"""
time_str = time_str.strip()
if len(time_str) != 8:
raise ValueError('Invalid sbatch/slurm time formatting! Format has to be "HH:MM:SS"')
hours = int(int(time_str[0:2]) * multiplier)
minutes = int(int(time_str[3:5]) * multiplier)
seconds = int(int(time_str[6:8]) * multiplier)
if seconds > 60:
minutes = minutes + seconds // 60
seconds = seconds % 60
if minutes > 60:
hours = hours + minutes // 60
minutes = minutes % 60
return "{:02d}:{:02d}:{:02d}".format(hours, minutes, seconds)
def _multiply_mem(mem_str, multiplier):
"""
Helper function to multiply a memory specification with a constant. If multiplier < 1,
will convert to using smaller units if possible
Args:
mem_str (str): Amount of memory in format "1234X" where "X" specifies the unit
and anything before is a number specifying the amount
multiplier (float): multiplier
Returns:
New memory string
"""
original_memstr = mem_str.strip()
unit = original_memstr[-1]
if unit in ("K", "M", "G", "T"):
original_mem_int = int(original_memstr.partition(unit)[0])
else:
original_mem_int = int(original_memstr)
unit = "M"
downconversion = {"M": "K", "G": "M", "T": "G"}
if multiplier < 1.0 and unit != "K":
elmer_mem_per_f = int(original_mem_int * 1024 * multiplier)
elmer_mem_per_f = str(elmer_mem_per_f) + downconversion[unit]
else:
elmer_mem_per_f = int(original_mem_int * multiplier)
elmer_mem_per_f = str(elmer_mem_per_f) + unit
return elmer_mem_per_f
def _divup(a, b):
return -(a // -b)
sbatch_parameters = workflow["sbatch_parameters"]
parallelization_level = sbatch_parameters.pop("_parallelization_level", "none")
_n_workers = int(sbatch_parameters.pop("n_workers", 1))
if parallelization_level == "full_simulation":
n_workers_full = _n_workers
n_workers_elmer_only = 1
n_simulations_gmsh = n_simulations
elif parallelization_level == "elmer":
n_workers_full = 1
n_workers_elmer_only = _n_workers
n_simulations_gmsh = 1
elif parallelization_level == "none":
n_workers_full = 1
n_workers_elmer_only = 1
n_simulations_gmsh = 1
else:
logging.warning(f"Unknown parallelization level {parallelization_level}")
if sbatch_parameters.get("--account", "project_0") == "project_0":
sbatch_parameters["--account"] = KQC_REMOTE_ACCOUNT
common_keys = [k for k in sbatch_parameters.keys() if k.startswith("--")]
sbatch_settings_elmer = {k: sbatch_parameters.pop(k) for k in common_keys}
sbatch_settings_meshes = sbatch_settings_elmer.copy()
max_cpus_per_node = int(sbatch_parameters.pop("max_threads_per_node", 40))
elmer_tasks_per_worker = int(sbatch_parameters.pop("elmer_n_processes", 10))
elmer_cpus_per_task = int(sbatch_parameters.pop("elmer_n_threads", 1))
if elmer_cpus_per_task > 1 and elmer_tasks_per_worker > 1:
logging.warning(
"Using both process and thread level parallelization" " with Elmer might result in poor performance"
)
elmer_cpus_per_worker = elmer_tasks_per_worker * elmer_cpus_per_task
elmer_mem_per_worker = sbatch_parameters.pop("elmer_mem", "64G")
if elmer_cpus_per_worker > max_cpus_per_node:
elmer_nodes_per_worker = _divup(elmer_cpus_per_worker, max_cpus_per_node)
elmer_total_nodes = elmer_nodes_per_worker * _n_workers
elmer_tasks_per_node = min(elmer_tasks_per_worker, max_cpus_per_node)
elmer_mem_per_node = _multiply_mem(elmer_mem_per_worker, _n_workers / elmer_total_nodes)
else:
elmer_nodes_per_worker = 1
elmer_max_workers_per_node = max_cpus_per_node // elmer_cpus_per_worker
elmer_total_nodes = _divup(_n_workers, elmer_max_workers_per_node)
elmer_workers_per_node = min(_n_workers, elmer_max_workers_per_node)
elmer_tasks_per_node = elmer_workers_per_node * elmer_tasks_per_worker
elmer_mem_per_node = _multiply_mem(elmer_mem_per_worker, elmer_workers_per_node)
gmsh_cpus_per_worker = int(sbatch_parameters.pop("gmsh_n_threads", 10))
if gmsh_cpus_per_worker > max_cpus_per_node:
raise RuntimeError(
"Requested more gmsh threads per worker {gmsh_cpus_per_worker}"
" than the limit per node {max_cpus_per_node}"
)
gmsh_max_workers_per_node = max_cpus_per_node // gmsh_cpus_per_worker
gmsh_n_nodes = _divup(n_workers_full, gmsh_max_workers_per_node)
gmsh_mem_per_worker = sbatch_parameters.pop("gmsh_mem", "64G")
gmsh_workers_per_node = min(n_workers_full, gmsh_max_workers_per_node)
gmsh_cpus_per_node = gmsh_cpus_per_worker * gmsh_workers_per_node
gmsh_mem_per_node = _multiply_mem(gmsh_mem_per_worker, gmsh_workers_per_node)
sbatch_settings_elmer["--time"] = _multiply_time(
sbatch_parameters.pop("elmer_time", "00:10:00"), _divup(n_simulations, _n_workers)
)
sbatch_settings_elmer["--nodes"] = elmer_total_nodes
sbatch_settings_elmer["--ntasks-per-node"] = elmer_tasks_per_node
sbatch_settings_elmer["--cpus-per-task"] = elmer_cpus_per_task
sbatch_settings_elmer["--mem"] = elmer_mem_per_node
sbatch_settings_meshes["--time"] = _multiply_time(
sbatch_parameters.pop("gmsh_time", "00:10:00"), _divup(n_simulations_gmsh, n_workers_full)
)
sbatch_settings_meshes["--nodes"] = gmsh_n_nodes
sbatch_settings_meshes["--ntasks-per-node"] = 1
sbatch_settings_meshes["--cpus-per-task"] = gmsh_cpus_per_node
sbatch_settings_meshes["--mem"] = gmsh_mem_per_node
if len(sbatch_parameters) > 0:
logging.warning("Unused sbatch parameters: ")
for k, v in sbatch_parameters.items():
logging.warning(f"{k} : {v}")
main_script_filename_meshes = str(path.joinpath(file_prefix + "_meshes.sh"))
with open(main_script_filename_meshes, "w") as main_file:
main_file.write("#!/bin/bash\n")
for s_key, s_value in sbatch_settings_meshes.items():
main_file.write(f"#SBATCH {s_key}={s_value}\n")
main_file.write("\n")
main_file.write("# set the number of threads based on --cpus-per-task\n")
main_file.write("export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK\n")
main_file.write("\n")
for i, json_filename in enumerate(json_filenames):
with open(json_filename) as f:
json_data = json.load(f)
simulation_name = json_data["parameters"]["name"]
script_filename_meshes = str(path.joinpath(simulation_name + "_meshes.sh"))
with open(script_filename_meshes, "w") as file:
file.write('echo "Simulation {}/{} Gmsh"\n'.format(i + 1, len(json_filenames)))
file.write(
'srun -N 1 -n 1 -c {3} {2} "{0}" "{1}" --only-gmsh -q 2>&1 >> '
'"log_files/{4}.Gmsh.log"\n'.format(
execution_script,
Path(json_filename).relative_to(path),
python_executable,
str(gmsh_cpus_per_worker),
simulation_name,
)
)
file.write('echo "Simulation {}/{} ElmerGrid"\n'.format(i + 1, len(json_filenames)))
file.write(
'srun -N 1 -n 1 -c {2} ElmerGrid 14 2 "{0}.msh" 2>&1 >> '
'"log_files/{0}.ElmerGrid.log"\n'.format(
simulation_name,
Path(json_filename).relative_to(path),
str(gmsh_cpus_per_worker),
)
)
if int(elmer_tasks_per_worker) > 1:
file.write(
'srun -N 1 -n 1 -c {3} ElmerGrid 2 2 "{0}" 2>&1 -metis {1} 4'
' --partdual --removeunused >> "log_files/{0}.ElmerGrid_part.log"\n'.format(
simulation_name,
str(elmer_tasks_per_worker),
Path(json_filename).relative_to(path),
str(gmsh_cpus_per_worker),
)
)
file.write('echo "Simulation {}/{} Write Elmer sif files"\n'.format(i + 1, len(json_filenames)))
file.write(
'srun -N 1 -n 1 -c {3} {2} "{0}" "{1}" --only-elmer-sifs '
'2>&1 >> "log_files/{4}.Elmer_sifs.log"\n'.format(
execution_script,
Path(json_filename).relative_to(path),
python_executable,
str(gmsh_cpus_per_worker),
simulation_name,
)
)
os.chmod(script_filename_meshes, os.stat(script_filename_meshes).st_mode | stat.S_IEXEC)
main_file.write('echo "Submitting gmsh and ElmerGrid part {}/{}"\n'.format(i + 1, len(json_filenames)))
main_file.write('echo "--------------------------------------------"\n')
main_file.write('source "{}" &\n'.format(Path(script_filename_meshes).relative_to(path)))
if (i + 1) % n_workers_full == 0:
main_file.write("wait\n")
# change permission
os.chmod(main_script_filename_meshes, os.stat(main_script_filename_meshes).st_mode | stat.S_IEXEC)
with open(main_script_filename, "w") as main_file:
main_file.write("#!/bin/bash\n")
for s_key, s_value in sbatch_settings_elmer.items():
main_file.write(f"#SBATCH {s_key}={s_value}\n")
main_file.write("\n")
main_file.write("# set the number of threads based on --cpus-per-task\n")
main_file.write("export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK\n")
main_file.write("\n")
for i, json_filename in enumerate(json_filenames):
with open(json_filename) as f:
json_data = json.load(f)
simulation_name = json_data["parameters"]["name"]
sif_names = json_data["sif_names"]
script_filename = str(path.joinpath(simulation_name + ".sh"))
with open(script_filename, "w") as file:
file.write('echo "Simulation {}/{} Elmer"\n'.format(i + 1, len(json_filenames)))
n_sifs = len(sif_names)
sifs_split = [
sif_names[i : min(i + n_workers_elmer_only, n_sifs)]
for i in range(0, n_sifs, n_workers_elmer_only)
]
for sif_list in sifs_split:
for sif in sif_list:
file.write(
"srun --cpu-bind none --exact --mem={4} -N {5} -n {0} -c {3} "
'ElmerSolver_mpi "{1}/{2}.sif" 2>&1 >> "log_files/{2}.Elmer.log" & \n'.format(
elmer_tasks_per_worker,
simulation_name,
sif,
elmer_cpus_per_task,
elmer_mem_per_worker,
elmer_nodes_per_worker,
)
)
file.write("wait\n")
file.write('echo "Simulation {}/{} write results json"\n'.format(i + 1, len(json_filenames)))
file.write(
'srun -N 1 -n 1 -c {3} {2} "{0}" "{1}" --write-project-results 2>&1 >> '
"log_files/{4}.write_project_results.log\n".format(
execution_script,
Path(json_filename).relative_to(path),
python_executable,
1,
simulation_name,
)
)
file.write('echo "Simulation {}/{} write versions json"\n'.format(i + 1, len(json_filenames)))
file.write(
'srun -N 1 -n 1 -c {3} {2} "{0}" "{1}" --write-versions-file 2>&1 >> '
"log_files/{4}.write_versions_file.log\n".format(
execution_script,
Path(json_filename).relative_to(path),
python_executable,
1,
simulation_name,
)
)
os.chmod(script_filename, os.stat(script_filename).st_mode | stat.S_IEXEC)
main_file.write('echo "Submitting ElmerSolver part{}/{}"\n'.format(i + 1, len(json_filenames)))
main_file.write('echo "--------------------------------------------"\n')
main_file.write('source "{}" &\n'.format(Path(script_filename).relative_to(path)))
if (i + 1) % n_workers_full == 0:
main_file.write("wait\n")
main_file.write(get_post_process_command_lines(post_process, path, json_filenames))
else:
n_workers = workflow.get("n_workers", 1)
parallelization_level = workflow.get("_parallelization_level")
parallelize_workload = parallelization_level == "full_simulation" and n_workers > 1
with open(main_script_filename, "w") as main_file:
if parallelize_workload:
main_file.write("export OMP_NUM_THREADS={}\n".format(workflow["elmer_n_threads"]))
main_file.write("{} scripts/simple_workload_manager.py {}".format(python_executable, n_workers))
for i, json_filename in enumerate(json_filenames):
with open(json_filename) as f:
json_data = json.load(f)
simulation_name = json_data["parameters"]["name"]
sif_names = json_data["sif_names"]
script_filename = str(path.joinpath(simulation_name + ".sh"))
with open(script_filename, "w") as file:
file.write("#!/bin/bash\n")
file.write('echo "Simulation {}/{} Gmsh"\n'.format(i + 1, len(json_filenames)))
file.write(
'{2} "{0}" "{1}" --only-gmsh 2>&1 >> "log_files/{3}.Gmsh.log"\n'.format(
execution_script, Path(json_filename).relative_to(path), python_executable, simulation_name
)
)
file.write('echo "Simulation {}/{} ElmerGrid"\n'.format(i + 1, len(json_filenames)))
file.write(
'{2} "{0}" "{1}" --only-elmergrid 2>&1 >> "log_files/{3}.ElmerGrid.log"\n'.format(
execution_script, Path(json_filename).relative_to(path), python_executable, simulation_name
)
)
file.write('echo "Simulation {}/{} Write Elmer sif files"\n'.format(i + 1, len(json_filenames)))
file.write(
'{2} "{0}" "{1}" --only-elmer-sifs 2>&1 >> "log_files/{3}.Elmer_sifs.log"\n'.format(
execution_script, Path(json_filename).relative_to(path), python_executable, simulation_name
)
)
file.write('echo "Simulation {}/{} Elmer"\n'.format(i + 1, len(json_filenames)))
file.write(
'{2} "{0}" "{1}" --only-elmer\n'.format(
execution_script, Path(json_filename).relative_to(path), python_executable
)
)
file.write('echo "Simulation {}/{} Paraview"\n'.format(i + 1, len(json_filenames)))
file.write(
'{2} "{0}" "{1}" --only-paraview\n'.format(
execution_script, Path(json_filename).relative_to(path), python_executable
)
)
file.write('echo "Simulation {}/{} write results json"\n'.format(i + 1, len(json_filenames)))
file.write(
'{2} "{0}" "{1}" --write-project-results 2>&1 >> '
'"log_files/{3}_write_project_results.log"\n'.format(
execution_script, Path(json_filename).relative_to(path), python_executable, simulation_name
)
)
file.write('echo "Simulation {}/{} write versions file"\n'.format(i + 1, len(json_filenames)))
file.write(
'{2} "{0}" "{1}" --write-versions-file\n'.format(
execution_script, Path(json_filename).relative_to(path), python_executable
)
)
# change permission
os.chmod(script_filename, os.stat(script_filename).st_mode | stat.S_IEXEC)
if parallelize_workload:
main_file.write(' "./{}"'.format(Path(script_filename).relative_to(path)))
else:
main_file.write(
'echo "Submitting the main script of simulation {}/{}"\n'.format(i + 1, len(json_filenames))
)
main_file.write('echo "--------------------------------------------"\n')
main_file.write('"./{}"\n'.format(Path(script_filename).relative_to(path)))
main_file.write(get_post_process_command_lines(post_process, path, json_filenames))
# change permission
os.chmod(main_script_filename, os.stat(main_script_filename).st_mode | stat.S_IEXEC)
return main_script_filename
[docs]def export_elmer(
simulations: Sequence[Simulation],
path: Path,
tool="capacitance",
script_folder="scripts",
linear_system_method="bicgstab",
p_element_order=3,
frequency=5,
frequency_batch=3,
sweep_type="explicit",
max_delta_s=0.01,
file_prefix="simulation",
script_file="run.py",
mesh_size=None,
boundary_conditions=None,
workflow=None,
percent_error=0.005,
max_error_scale=2,
max_outlier_fraction=1e-3,
maximum_passes=1,
minimum_passes=1,
integrate_energies=False,
is_axisymmetric=False,
skip_errors=False,
post_process=None,
):
"""
Exports an elmer simulation model to the simulation path.
Args:
simulations(list(Simulation)): list of all the simulations
path(Path): Location where to output the simulation model
tool(str): Available: "capacitance", "wave_equation" and "cross-section"
script_folder: Path to the Elmer-scripts folder.
linear_system_method(str): Available: 'bicgstab', 'mg'
p_element_order(int): polynomial order of p-elements
frequency: Units are in GHz. To set up multifrequency analysis, use list of numbers.
file_prefix: File prefix of the script file to be created.
script_file: Name of the script file to run.
mesh_size(dict): Parameters to determine mesh element sizes
boundary_conditions(dict): Parameters to determine boundary conditions
workflow(dict): Parameters for simulation workflow
percent_error(float): Stopping criterion in adaptive meshing.
max_error_scale(float): Maximum element error, relative to percent_error, allowed in individual elements.
max_outlier_fraction(float): Maximum fraction of outliers from the total number of elements
maximum_passes(int): Maximum number of adaptive meshing iterations.
minimum_passes(int): Minimum number of adaptive meshing iterations.
integrate_energies: Calculate energy integrals over each object
is_axisymmetric(bool): Simulate with Axi Symmetric coordinates along :math:`y\\Big|_{x=0}` (Default: False)
skip_errors(bool): Skip simulations that cause errors. (Default: False)
.. warning::
**Use this carefully**, some of your simulations might not make sense physically and
you might end up wasting time on bad simulations.
post_process: List of PostProcess objects, a single PostProcess object, or None to be executed after simulations
Returns:
Path to exported script file.
"""
parser = argparse.ArgumentParser()
parser.add_argument("-q", "--quiet", action="store_true")
args, _ = parser.parse_known_args()
if args.quiet:
workflow.update(
{
"run_gmsh_gui": False, # For GMSH: if true, the mesh is shown after it is done
# (for large meshes this can take a long time)
"run_paraview": False, # this is visual view of the results
}
)
if isinstance(frequency, np.ndarray):
frequency = frequency.tolist()
elif not isinstance(frequency, list):
frequency = [frequency]
if workflow is not None:
num_freqs = len(frequency)
num_sims = len(simulations)
parallelization_level = "none"
n_worker_lim = 1
if num_freqs > 1:
if num_sims > 1:
raise NotImplementedError(
"Simultaneous sweep of frequency and other" "simulation parameters is not supported"
)
parallelization_level = "elmer"
n_worker_lim = num_freqs
if tool != "wave_equation":
raise NotImplementedError(
"Elmer level parallelization currently" "supported only with wave-equation tool"
)
elif num_sims > 1:
parallelization_level = "full_simulation"
n_worker_lim = num_sims
if "sbatch_parameters" in workflow:
n_workers = workflow["sbatch_parameters"].get("n_workers", 1.0)
workflow["sbatch_parameters"]["n_workers"] = min(int(n_workers), n_worker_lim)
workflow["sbatch_parameters"]["_parallelization_level"] = parallelization_level
workflow.pop("elmer_n_processes", "")
workflow.pop("elmer_n_threads", "")
workflow.pop("n_workers", "")
workflow.pop("gmsh_n_threads", "")
else:
n_workers = workflow.get("n_workers", 1)
n_processes = workflow.get("elmer_n_processes", 1)
n_threads = workflow.get("elmer_n_threads", 1)
if n_processes > 1 and n_threads > 1:
logging.warning(
"Using both process and thread level parallelization" " with Elmer might result in poor performance"
)
# for the moment avoid psutil.cpu_count(logical=False)
max_cpus = int(os.cpu_count() / 2 + 0.5)
workflow["local_machine_cpu_count"] = max_cpus
if n_workers == -1:
n_processes = 1 if n_processes == -1 else n_processes
n_threads = 1 if n_threads == -1 else n_threads
n_workers = max(max_cpus // (n_threads * n_processes), 1)
n_workers = min(n_workers, n_worker_lim)
elif n_processes == -1:
n_workers = min(n_workers, n_worker_lim)
n_threads = 1 if n_threads == -1 else n_threads
n_processes = max(max_cpus // (n_threads * n_workers), 1)
elif n_threads == -1:
n_workers = min(n_workers, n_worker_lim)
n_threads = max(max_cpus // (n_processes * n_workers), 1)
requested_cpus = n_workers * n_processes * n_threads
if requested_cpus > max_cpus:
logging.warning(f"Requested more CPUs ({requested_cpus}) than available ({max_cpus})")
workflow["n_workers"] = n_workers
workflow["elmer_n_processes"] = n_processes
workflow["elmer_n_threads"] = n_threads
workflow["_parallelization_level"] = parallelization_level
gmsh_n_threads = workflow.get("gmsh_n_threads", 1)
if gmsh_n_threads == -1:
if parallelization_level == "full_simulation":
workflow["gmsh_n_threads"] = max(max_cpus // n_workers, 1)
else:
workflow["gmsh_n_threads"] = max_cpus
else:
workflow = {}
write_commit_reference_file(path)
copy_content_into_directory(ELMER_SCRIPT_PATHS, path, script_folder)
json_filenames = []
for simulation in simulations:
try:
json_filenames.append(
export_elmer_json(
simulation=simulation,
path=path,
tool=tool,
linear_system_method=linear_system_method,
p_element_order=p_element_order,
frequency=frequency,
frequency_batch=frequency_batch,
sweep_type=sweep_type,
max_delta_s=max_delta_s,
mesh_size=mesh_size,
boundary_conditions=boundary_conditions,
workflow=workflow,
percent_error=percent_error,
max_error_scale=max_error_scale,
max_outlier_fraction=max_outlier_fraction,
maximum_passes=maximum_passes,
minimum_passes=minimum_passes,
integrate_energies=integrate_energies,
is_axisymmetric=is_axisymmetric,
)
)
except (IndexError, ValueError, Exception) as e: # pylint: disable=broad-except
if skip_errors:
logging.warning(
f"Simulation {simulation.name} skipped due to {e.args}. "
"Some of your other simulations might not make sense geometrically. "
"Disable `skip_errors` to see the full traceback."
)
else:
raise UserWarning(
"Generating simulation failed. You can discard the errors using `skip_errors` in `export_elmer`. "
"Moreover, `skip_errors` enables visual inspection of failed and successful simulation "
"geometry files."
) from e
return export_elmer_script(
json_filenames,
path,
workflow,
file_prefix=file_prefix,
execution_script=Path(script_folder).joinpath(script_file),
n_simulations=n_worker_lim,
post_process=post_process,
)