"""Paralleltools.py
Module to data-parallelize simulations and write pbs scripts.
See the manual for examples of parallel usage.
last revision 070313-MLW
"""
import subprocess
#import os.path
import os
from random import randint
from shutil import copyfile
import numpy as np
#import tools - python3 conversion: never referenced
import itertools
from copy import deepcopy
#from .mpo import MPO
#from .tools import WriteHparams, WriteMainFile, WriteFiles, runMPS
#from .tools import MPSFortLibError
#from .dynamics import CheckOrDefault
from sys import version_info
if(version_info[0] < 3):
from itertools import izip as myzip
else:
myzip = zip
[docs]def Writepbsscript(fileStub, nodes, ExecutableName='Execute_MPSParallelMain',
InputFileNames=[], MyPyFile=None, time='12:00:00',
computer='mio', myusername=None):
"""
Write a pbs script for the given parallel job. Returns name of the pbs
script written.
**Arguments**
fileStub : str
the job id to identify the simulation
nodes : str (maybe int, less likely list of nodes???)
specifying the number of nodes on the cluster.
ExecutableName : str
name of the OpenMPS parallel executable name
default to `ExecuteMPSParallelMain`
InputFileNames : list
list what to execute
default to empty list
MyPyFile : str
corresponding python script copied to keep
track of the input for the job.
default to None
time : str
walltime for the simulation.
default to `12:00:00`
computer : str
Choosing between HPC mio and ra.
default to `mio`
myusername : str
used to email information about the job status to
your @mines.edu email adress.
default to None
"""
if not isinstance(InputFileNames,list):
InputFileNames=[InputFileNames]
outputdir='$PBS_JOBID'
pbsName=fileStub+'.pbs'
pbsFile=open(pbsName,'w')
pbsFile.write('#!/bin/csh\n')
if computer=='mio':
pbsFile.write('#PBS -l nodes='+str(nodes)+':ppn=8:lcarr'+'\n')
elif computer=='Ra':
pbsFile.write('#PBS -l nodes='+str(nodes)+':ppn=8'+'\n')
else:
raise Exception("Computer "+computer+" not recognized in Writepbsscript!")
pbsFile.write('#PBS -l walltime='+time+'\n')
pbsFile.write('#PBS -N '+fileStub+'\n')
pbsFile.write('#PBS -e '+outputdir+'/'+fileStub+'err'+'\n')
pbsFile.write('#PBS -V'+'\n')
if myusername is not None:
pbsFile.write('#PBS -m abe'+'\n')
pbsFile.write('#PBS -M '+str(myusername)+'@mines.edu'+'\n')
pbsFile.write('#------------------------'+'\n')
pbsFile.write('cd $PBS_O_WORKDIR'+'\n')
pbsFile.write('mkdir '+outputdir+'\n')
pbsFile.write('sort -u $PBS_NODEFILE >'+outputdir+'/mynodes'+'\n')
if MyPyFile is not None:
#generate a random string, and copy our python file to there
# this ensures that this file exists at runtime even if we have changed its parameters in between
#once the parallel job starts, this random string is overwritten by the PBS_JOBID
random_string = str(randint(0, 10000))
if not os.path.isfile(MyPyFile):
raise Exception("The python file "+str(MyPyFile)+" to be copied in pbs script does not exist!")
copiedfile=str(MyPyFile)[0:-3] #remove .py
copiedfile=copiedfile+random_string+'.py'
copyfile(str(MyPyFile),copiedfile)
pbsFile.write('cp '+str(copiedfile)+' '+outputdir+'/'+str(fileStub)+'.py\n')
pbsFile.write('rm '+str(copiedfile)+'\n')
for file in InputFileNames:
pbsFile.write('mpiexec '+str(ExecutableName)+' '+str(file)+' > '+outputdir+'/'+fileStub+'out\n')
pbsFile.close()
return pbsName
[docs]def WriteMPSParallelFiles(Parameters, Operators, HamiltonianMPO,
comp_info, PostProcess=False):
"""
Write MPI files for fortran. Returns either name of the pbs/slurm
script for ``PostProcess=False`` or the dictionaries with simulations
(being consistent with WriteMPSParallelTemplate).
**Arguments**
Parameters : list of dictionaries
contains the simulation for each simulation in dictionaries.
Operators : dictionary
containing all operators necessary for the simulation.
HamiltonianMPO : instance of :py:class:`MPO.MPO`
defines the Hamiltonian of the system.
comp_info : dictionary
information for setting up a simulation on the cluster. Key ``queueing``
decides between using pbs (`pbs`) or slurm/sbatch (`slurm`) format.
If key not given, the default key `pbs` for pbs is set automatically.
Keys are for the pbs-script ``computer`` (default `mio`),
``myusername`` (`None`, used for email), ``time`` (`12:00:00`),
``nodes`` (obligatory), and ``ThisFileName`` (`None`, uses then random
string). For the slurm/sbatch please view the function description
of the dictionary `cinfo` in
:py:func:`Paralleltools.write_sbatch_script`.
PostProcess : Bool, optional
flag if simulation should be run (``False``) or if only data is
analyzed (``True``).
default to False
"""
# Disable HDF5 for MPI simulations
for param in Parameters:
param['hdf5'] = False
files = WriteFiles(Parameters, Operators, HamiltonianMPO,
PostProcess=PostProcess)
if(not PostProcess):
stub = WriteParallelfiles(files, Parameters[0]['job_ID'],
Parameters[0]['Write_Directory'],
Parameters[0]['Output_Directory'])
CheckOrDefault(comp_info, 'queueing', 'pbs')
if(comp_info['queueing'] == 'pbs'):
CheckOrDefault(comp_info, 'computer', 'mio')
sn = Writepbsscript(Parameters[0]['job_ID'], comp_info['nodes'],
ExecutableName='Execute_MPSParallelMain',
InputFileNames=[stub],
MyPyFile=comp_info['ThisFileName'],
time=comp_info['time'],
computer=comp_info['computer'],
myusername=comp_info['myusername'])
elif(comp_info['queueing'] == 'slurm'):
sn = write_sbatch_script(Parameters[0]['job_ID'], comp_info,
InputFileNames=[stub])
else:
raise ValueError('Unknown queueing system. Neither "pbs" ' +
'nor "slurm".')
return sn
else:
return Parameters
[docs]def WriteMPSParallelTemplate(ptemplate, Operators, H, comp_info,
staticParamsforIteration=[], staticIterators=[],
itermethod='CartesianProduct', PostProcess=False):
"""
Write parallel files that the master can send to worker units. Returns
list or dictionary with simulation settings for `PostProcess=True`,
and otherwise the name of the pbs/slurm script.
**Arguments**
ptemplate : dictionary
dictionary containing the simulation settings
Operators : dictionary
containing all operators necessary for the simulation.
H : instance of :py:class:`MPO.MPO`
defines the Hamiltonian of the system.
comp_info : dictionary
information for setting up a simulation on the cluster. Key ``queueing``
decides between using pbs (`pbs`) or slurm/sbatch (`slurm`) format.
If key not given, the default key `pbs` for pbs is set automatically.
Keys are for the pbs-script ``computer`` (default `mio`),
``myusername`` (`None`, used for email), ``time`` (`12:00:00`),
``nodes`` (obligatory), and ``ThisFileName`` (`None`, uses then random
string). For the slurm/sbatch please view the function description
of the dictionary `cinfo` in
:py:func:`Paralleltools.write_sbatch_script`.
staticParamsforIteration : list, optional
parameters to be iterated over given through their identifier string.
default to empty list
staticIterators : list, optional
corresponding values for the parameters to be iterated over. Order
should correspond to the order in `staticParamsforIteration`.
default to empty list
itermethod : string
Defining how to derive the simulation parameters from the list of
iterators. Options are 1) `CartesianProduct`, e.g. building from
parameters values [a1, a2] and [b1, b2] the four simulations
(a1, b1), (a1, b2), (a2, b1) and (a2, b2). 2) `Linear`, e.g.
building from the parameter values [a1, a2] and [b1, b2] the two
simulations (a1, b1) and (a2, b2).
default to `CartesianProduct`
PostProcess : Bool, optional
flag if simulation should be run (``False``) or if only data is
analyzed (``True``).
default to False
"""
CheckOrDefault(comp_info, 'queueing', 'pbs')
#at the end, get a long list of [H] and [p]s, and then pass that guy into writefiles
bigH=[]
bigsp=[]
# Form the actual list to be iterated over
if staticIterators==[]:
longiterator = None
elif(itermethod == "CartesianProduct"):
longiterator = itertools.product(*staticIterators)
elif(itermethod == "Linear"):
longiterator = myzip(staticIterators)
else:
raise ValueError("Unknown `itermethod`.")
if not isinstance(H,list):
H=[H]
ptemplate=[ptemplate]
if not isinstance(ptemplate,list):
raise Exception("If H is a list in WriteMPSParallelFiles then ptemplate must be, too...")
if len(ptemplate)!=len(H):
raise Exception("Lengths of ptemplate and H must agree in WriteMPSParallelFiles")
if 'Write_Directory' in ptemplate[0]:
writedir=ptemplate[0]['Write_Directory']
else:
writedir=''
if 'Output_Directory' in ptemplate[0]:
outdir=ptemplate[0]['Output_Directory']
else:
outdir=''
stub=writedir+ptemplate[0]['job_ID']
jobPoolfileName=stub+'jobPool.dat'
if not PostProcess:
try:
#restart simulation
jobPool=open(jobPoolfileName,"r")
jobPool.close()
files=['']
stub=WriteParallelfiles(files,ptemplate[0]['job_ID'],writedir,outdir)
if(comp_info['queueing'] == 'pbs'):
CheckOrDefault(comp_info, 'computer', 'mio')
sn = Writepbsscript(ptemplate[0]['job_ID'], comp_info['nodes'],
ExecutableName='Execute_MPSParallelMain',
InputFileNames=[stub],
time=comp_info['time'],
computer=comp_info['computer'],
myusername=comp_info['myusername'])
elif(comp_info['queueing'] == 'slurm'):
sn = write_sbatch_script(ptemplate[0]['job_ID'], comp_info,
InputFileNames=[stub])
else:
raise ValueError('Unknown queueing system. Neither "pbs" ' +
'nor "slurm".')
print('restarting from old values!')
return sn
except:
pass
if longiterator is None:
bigsp=ptemplate
bigH=H
else:
for i in longiterator:
for k in range(len(H)):
littleH=H[k]
p=ptemplate[k]
if 'unique_ID' in p:
baseUnique=p['unique_ID']
else:
baseUnique=''
myunique=deepcopy(baseUnique)
copytmp=deepcopy(p)
for j in range(len(staticParamsforIteration)):
copytmp[staticParamsforIteration[j]]=i[j]
myunique=staticParamsforIteration[j]+str(i[j])+myunique
copytmp['unique_ID']=myunique
bigsp.append(copytmp)
bigH.append(littleH)
files=WriteFiles(bigsp,Operators,bigH,PostProcess=PostProcess)
if not PostProcess:
stub=WriteParallelfiles(files,ptemplate[0]['job_ID'],writedir,outdir)
if(comp_info['queueing'] == 'pbs'):
sn = Writepbsscript(ptemplate[0]['job_ID'], comp_info['nodes'],
ExecutableName='Execute_MPSParallelMain',
InputFileNames=[stub],
MyPyFile=comp_info['ThisFileName'],
time=comp_info['time'],
computer=comp_info['computer'],
myusername=comp_info['myusername'])
elif(comp_info['queueing'] == 'slurm'):
sn = write_sbatch_script(ptemplate[0]['job_ID'], comp_info,
InputFileNames=[stub])
else:
raise ValueError('Unknown queueing system. Neither "pbs" ' +
'nor "slurm".')
return sn
else:
return bigsp
[docs]def WriteParallelfiles(commands,job_ID,writedir='',outdir=''):
"""
Write parallel files from Params
**Arguments**
commands : list
containing the main setting files passed to fortran for each
simulation, e.g. as returned by :py:func:`tools.WriteFiles`.
job_ID : str
job identifier of the parameter dictionary.
writedir : str, optional
could the temporary directory of the MPS simulation specified in
the parameter dictionary
default to empyt string
outdir : str, optional
could be the output directory specified in the parameter dictionary.
default to empty string
"""
#empty return
if commands is None:
return None
stub=writedir+job_ID
jobPoolfileName=stub+'jobPool.dat'
progressfileName=stub+'progress.dat'
try:
jobPool=open(jobPoolfileName,"r")
progress=open(progressfileName,"r")
nfiles=int(jobPool.readline().replace('\n',''))
inwritedir=jobPool.readline().replace('\n','')
#if both exist, take jobs from the completed file and remove them from the jobPool
completed_ids=[]
for line in progress:
completed_ids.append(int(line.replace('\n','')))
jobs=[]
fresh_id=0
ids=[]
for line in jobPool:
vec=line.replace('\n','').split(' ')
vec=[elem for elem in vec if elem!='']
if int(vec[0]) not in completed_ids:
ids.append(fresh_id)
jobs.append(vec[1:])
fresh_id+=1
else:
#remove id from progress
completed_ids=[thing for thing in completed_ids if thing!=int(vec[0])]
#remove job's files?
#Delete files and re-write
jobPool.close()
progress.close()
os.remove(jobPoolfileName)
os.remove(progressfileName)
progress=open(progressfileName,"w")
progress.close()
jobPool=open(jobPoolfileName,"w")
jobPool.write('%16i'%(nfiles)+'\n')
jobPool.write('%132s'%(inwritedir)+'\n')
i=0
for job in jobs:
rest=job[0:]
jobPool.write('%16i'%(ids[i])+''.join(['%132s'%(thing) for thing in rest])+'\n')
i+=1
jobPool.close()
except:
jobPool=open(jobPoolfileName,"w")
progress=open(progressfileName,"w")
try:
jobPool.write('%16i'%(1)+'\n')
except:
jobPool.write('%16i'%(0)+'\n')
jobPool.write('%132s'%(writedir)+'\n')
fresh_id=0
for command in commands:
vec='%132s%16i'%(command[0], int(command[1]))
jobPool.write('%16i'%(fresh_id)+vec+'\n')
fresh_id+=1
jobPool.close()
return stub
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# FUNCTIONS TO WRITE SLURM/SBATCH SCRIPTS
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
[docs]def write_sbatch_script(fileStub, cinfo,
ExecutableName="Execute_MPSParallelMain",
InputFileNames=[]):
"""
Write a slurm/sbatch script for submitting a job on a cluster. Returns
name of slurm script written.
**Arguments**
fileStub : str
the job id to identify the simulation. If you run multiple simulation
this one must already be unique at the current stage
cinfo : dictionary
containing all the setting for the cluster. The following keys can
defined:
* ``myusername`` : used to email information about the job status to
your @mines.edu email adress. Default to `None`.
* ``computer`` : used to deduct the number of cores. More about the
the number of tasks specified you can find in
:py:func:`Paralleltools.get_ntasks`. Default to `mio`.
* ``time`` : walltime for the simulation, default to `12:00:00`.
* ``nodes`` : can be an integer with the number of nodes or a list
with three-character string specifying the nodes. Default to 1.
* ``ThisFileName`` : corresponding python script copied to keep
track of the input for the job. Default to `None`.
* ``partition`` : set partition flag of SBATCH, default to `None`.
* ``mem`` : set memory for job, default to `None`.
* ``exclusive`` : if `True`, nodes are blocked for any other jobs
in case any remaining cores would be available. Default to `True`.
* ``mpi`` : choose mpi executer, default to `srun` (alternative
`mpiexec`)
ExecutableName : str
name of the OpenMPS parallel executable name
default to `ExecuteMPSParallelMain`
InputFileNames : list
list what to execute
default to empty list
"""
# Take settings from cinfo or set default
# -------------------------------------------
CheckOrDefault(cinfo, 'myusername', None)
CheckOrDefault(cinfo, 'computer', 'mio')
CheckOrDefault(cinfo, 'time', '12:00:00')
CheckOrDefault(cinfo, 'nodes', 1)
CheckOrDefault(cinfo, 'ThisFileName', None)
CheckOrDefault(cinfo, 'partition', None)
CheckOrDefault(cinfo, 'mem', None)
CheckOrDefault(cinfo, 'exclusive', True)
CheckOrDefault(cinfo, 'mpi', 'srun')
outputdir = '$SLURM_JOBID'
# Prepare file and directories
sbatch_name = fileStub + '.slurm'
sbatch_file = open(sbatch_name, 'w')
# Write header of batch file
# ==========================
sbatch_file.write('#!/bin/bash -x \n')
sbatch_file.write('#SBATCH --job-name=' + fileStub + '\n')
# node setting
# ------------
if(not (cinfo['partition'] is None)):
sbatch_file.write('#SBATCH --partition=' + cinfo['partition'] + ' \n')
if(hasattr(cinfo['nodes'], '__len__')):
# Assume a list of nodes is passed
sbatch_file.write('#SBATCH --nodelist=' + get_nodelist(cinfo) + ' \n')
sbatch_file.write('#SBATCH --nodes=' + str(len(cinfo['nodes'])) + '\n')
else:
# Assume it is an integer with the number of nodes
sbatch_file.write('#SBATCH --nodes=' + str(cinfo['nodes']) + ' \n')
sbatch_file.write('#SBATCH --ntasks-per-core=1 \n')
sbatch_file.write('#SBATCH --ntasks=' + get_ntasks(cinfo) + ' \n')
if(cinfo['exclusive']): sbatch_file.write('#SBATCH --exclusive \n')
sbatch_file.write('#SBATCH --time=' + cinfo['time'] + ' \n')
# Standard out/error and mailing
sbatch_file.write('#SBATCH -o ' + fileStub + '.out \n')
sbatch_file.write('#SBATCH -e ' + fileStub + '.err \n')
if(not (cinfo['myusername'] is None)):
sbatch_file.write('#SBATCH --mail-type=ALL \n')
sbatch_file.write('#SBATCH --mail-user=' + cinfo['myusername'] +
'@mines.edu \n')
# export environment variables
sbatch_file.write('#SBATCH --export=ALL \n')
# Write executing part of batch file
# ==================================
sbatch_file.write('\n')
sbatch_file.write('\n')
sbatch_file.write('date\n')
sbatch_file.write('cd $SLURM_SUBMIT_DIR \n')
sbatch_file.write('mkdir ' + outputdir + ' \n')
# what is this good for
#sbatch_file.write('sort -u $SBATCH_NODEFILE > ' + outputdir + '/mynodes \n')
if(cinfo['ThisFileName'] is not None):
# Generate a random string, and copy our python file to there
# this ensures that this file exists at runtime even if we have changed
# its parameters in between once the parallel job starts, this random
# string is overwritten by the PBS_JOBID
if(not os.path.isfile(cinfo['ThisFileName'])):
raise Exception("The python file " + str(cinfo['ThisFileName']) +
" to be copied in pbs script does not exist!")
# Remove appendix `.py` and construct random file name
copiedfile = cinfo['ThisFileName'][:-3] + str(randint(0, 10000)) + '.py'
copyfile(cinfo['ThisFileName'], copiedfile)
sbatch_file.write('cp '+ str(copiedfile) + ' ' + outputdir + '/' +
str(fileStub) + '.py\n')
sbatch_file.write('rm ' + str(copiedfile) + '\n')
for file in InputFileNames:
sbatch_file.write(cinfo['mpi'] + ' ' + str(ExecutableName) + ' ' +
str(file) + ' > ' + outputdir + '/' + fileStub +
'out\n')
sbatch_file.write('date\n')
sbatch_file.write('mv ' + fileStub + '.out $SLURM_JOBID/. \n')
sbatch_file.write('mv ' + fileStub + '.err $SLURM_JOBID/. \n')
sbatch_file.close()
return sbatch_name
[docs]def get_ntasks(cinfo):
"""
Return the number of nodes as string based on a nodelist or on the number
of nodes. If no information is available, eight cores per node are assumed.
**Arguments**
cinfo : dictionary
containing the setting of the job for a cluster, used
within here are the dictionary keys ``computer`` and
``nodes``
"""
if(cinfo['computer'] == 'mio'):
if(not hasattr(cinfo['nodes'], '__len__')):
# Request for n node without specifying node, be safe, return 8
return str(8 * cinfo['nodes'])
# Look up number of cores for each nodes depending on dictionary
cores = {}
# 8-core nodes
for ii in range(32):
cores[str(ii).zfill(3)] = 8
# 12-core nodes
for ii in range(32, 102):
if((ii == 48) or (ii == 53)): continue
cores[str(ii).zfill(3)] = 12
# 16-core nodes
for ii in range(102, 126):
cores[str(ii).zfill(3)] = 16
# 20-core nodes
for ii in range(126, 132):
cores[str(ii).zfill(3)] = 20
# 24-core nodes
for ii in range(132, 175):
cores[str(ii).zfill(3)] = 24
ntasks = 0
for node in cinfo['nodes']:
try:
ntasks += cores[node]
except KeyError:
# Avoid cases where node was not found
print('Warning: Check that key "nodes" contains 3-digits ' +
'strings or add missing node to dictionary in ' +
'Paralleltools.py, function get_ntasks')
ntasks += 8
return str(ntasks)
else:
print('Warning: no specific setting for your computer available. '+
'Using default setting with 8 cores per node.')
if(hasattr(cinfo['nodes'], '__len__')):
return str(8 * len(cinfo['nodes']))
else:
return str(8 * cinfo['nodes'])
[docs]def get_nodelist(cinfo):
"""
Return the nodelist as string
**Arguments**
cinfo : dictionary
containing the setting of the job for a cluster, used within here
is the dictionary key ``nodes`` which should be a list when calling
this function.
"""
nodelist = "compute["
for node in cinfo['nodes']:
nodelist += node + ','
return nodelist[:-1] + ']'
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# FUNCTIONS TO RUN MPI4PY
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
#
# Enable mpi4py by command line argument - too many accidental errors when
# mpi4py present, but no intention to use it
# Documentation for sphinx always in non-mpi section
import sys
has_mpi = '--mpi4py' in sys.argv
try:
if(has_mpi):
from mpi4py import MPI as mpi
import threading
except ImportError:
has_mpi = False
if(not has_mpi):
def WriteFiles_mpi(Parameters, Operators, HamiltonianMPO, PostProcess=False):
"""
This files has the same purpose as :py:func:`tools.WriteFiles` for MPI
runs with `mpi4py` and the argument description should be read there.
The files are only written once and broadcasted to every core.
"""
print("Cannot run mpi: no module mpi4py or threading!")
return []
else:
[docs] def WriteFiles_mpi(Parameters, Operators, HamiltonianMPO, PostProcess=False):
comm = mpi.COMM_WORLD
rank = comm.Get_rank()
if(rank == 0):
filelist = WriteFiles(Parameters, Operators, HamiltonianMPO,
PostProcess=PostProcess)
data = encode_list(filelist)
else:
data = None
data = comm.bcast(data, root=0)
return decode_list(data)
if(not has_mpi):
def encode_list(mylist):
"""
Put a list into a dictionary where the key is the position (as string)
of the entry.
**Arguments**
mylist : list
is converted to the described dictionary format.
"""
pass
else:
[docs] def encode_list(mylist):
dic = {}
for ii in range(len(mylist)):
dic[ii] = mylist[ii]
return dic
if(not has_mpi):
def decode_list(mydic):
"""
Decode a dictionary from :py:func:`Paralleltools.encode_list` back
into a list.
**Arguments**
mydic : dictionary
dictionary in the corresponding form, keys are 0 to lenght - 1 (as
integer)
"""
pass
else:
[docs] def decode_list(mydic):
mylist = []
for ii in range(len(mydic)):
mylist.append(mydic[ii])
return mylist
if(not has_mpi):
def runMPS_mpi(infileList, RunDir=None, customAppl=None):
"""
This file has the same purpose as :py:func:`tools.runMPS` for MPI runs with
`mpi4py` and the arguments are restricted. You have to call the python
script with the command line argument ``--mpi4py`` to load mpi4py modules.
**Arguments**
infileList : list
list of of the main files passed to fortran executable as
command line argument.
RunDir : str, optional
Specifies path of the fortran-executable (local or global
installation). This is available for default-simulation and
custom application. (@Developers: Debugging with valgrind always
works with a local installation.)
`None` : if executable available within local folder, use local
installation, otherwise assume global installation, default
'' (empty string) : for global installation
'./' : local installation
PATH+'/' : path to executable ending on slash `/`. Additional steps
may be necessary when using this.
customAppl : str, optional
define custom executable. Global and local installation work
as before. Custom applications cannot run with valgrind.
default to `None` (running default executable)
"""
print("Cannot run mpi: no module mpi4py or threading!")
return
else:
[docs] def runMPS_mpi(infileList, RunDir=None, customAppl=None):
comm = mpi.COMM_WORLD
rank = comm.Get_rank()
ncores = comm.Get_size()
if(rank == 0):
master_mpi(comm, ncores, rank, infileList, RunDir, customAppl)
else:
worker_mpi(comm, rank, infileList, RunDir, customAppl)
comm.Barrier()
return
if(not has_mpi):
def master_mpi(comm, ncores, rank, infileList, RunDir, customAppl):
"""
This is the master for mpi runs taking care of the work distribution and
calculating jobs via threading.
**Arguments**
comm : instance of mpi4py.MPI.COMM_WORLD
communicator for the MPI program
ncores : int
number of processes in the MPI-run
rank : int
rank of this process (should be 0)
infileList : list
list of of the main files passed to fortran executable as
command line argument.
RunDir : str, optional
Specifies path of the fortran-executable (local or global
installation). This is available for default-simulation and
custom application. (@Developers: Debugging with valgrind always
works with a local installation.)
`None` : if executable available within local folder, use local
installation, otherwise assume global installation, default
'' (empty string) : for global installation
'./' : local installation
PATH+'/' : path to executable ending on slash `/`. Additional steps
may be necessary when using this.
customAppl : str, optional
define custom executable. Global and local installation work
as before. Custom applications cannot run with valgrind.
default to `None` (running default executable)
"""
pass
else:
[docs] def master_mpi(comm, ncores, rank, infileList, RunDir, customAppl):
# Open a log file
log = open('parallel.log', 'w+')
# Get number of jobs
njobs = len(infileList)
if(njobs < ncores): raise ValueError("Don't waste resources!")
# jobcounters
job_j = 0
# Send out initial jobs
# ---------------------
newlist = [infileList[job_j]]
debug_runmps = False
log.write(str(job_j) + ' send to master.\n')
t1 = threading.Thread(target=runMPS_wrapper, args=(newlist, RunDir,
debug_runmps, customAppl))
t1.start()
job_j += 1
for ii in range(1, ncores):
data = {'jj' : job_j}
req = comm.isend(data, dest=ii, tag=1000+ii)
req.wait()
log.write(str(job_j) + ' send out to core ' + str(ii) + '.\n')
job_j += 1
# Send out jobs subsequently
# --------------------------
status = mpi.Status()
for jj in range(job_j, njobs):
if(not t1.isAlive()):
newlist = [infileList[jj]]
debug_runmps = False
log.write(str(jj) + ' send to master.\n')
t1 = threading.Thread(target=runMPS_wrapper, args=(newlist, RunDir,
debug_runmps, customAppl))
t1.start()
continue
# Receive rank of finished job
req = comm.recv(source=mpi.ANY_SOURCE, tag=mpi.ANY_TAG,
status=status)
source = status.Get_source()
tag = status.Get_tag()
# Send next job there
data = {'jj' : jj}
req = comm.isend(data, dest=source, tag=source+1000)
req.wait()
log.write(str(jj) + ' send out to core ' + str(source) + '.\n')
# Receive the last jobs
# ---------------------
for ii in range(ncores - 1):
# Receive rank of finished job
data = comm.recv(source=mpi.ANY_SOURCE, tag=mpi.ANY_TAG,
status=status)
source = status.Get_source()
tag = status.Get_tag()
# Send signal that there are no more jobs
data = {'jj' : -1}
req = comm.isend(data, dest=source, tag=source+1000)
req.wait()
# Wait for master thread to join
t1.join()
# Close log-file
log.close()
return
if(not has_mpi):
def worker_mpi(comm, rank, infileList, RunDir, customAppl):
"""
This is a worker for mpi runs taking care of calculating jobs
received from the master.
**Arguments**
comm : instance of mpi4py.MPI.COMM_WORLD
communicator for the MPI program
ncores : int
number of processes in the MPI-run
rank : int
rank of this process (should be 0)
infileList : list
list of of the main files passed to fortran executable as
command line argument.
RunDir : str, optional
Specifies path of the fortran-executable (local or global
installation). This is available for default-simulation and
custom application. (@Developers: Debugging with valgrind always
works with a local installation.)
`None` : if executable available within local folder, use local
installation, otherwise assume global installation, default
'' (empty string) : for global installation
'./' : local installation
PATH+'/' : path to executable ending on slash `/`. Additional steps
may be necessary when using this.
customAppl : str, optional
define custom executable. Global and local installation work
as before. Custom applications cannot run with valgrind.
default to `None` (running default executable)
"""
pass
else:
[docs] def worker_mpi(comm, rank, infileList, RunDir, customAppl):
# Run initial job
# ---------------
data = comm.recv(source=0, tag=rank+1000)
jj = data['jj']
newlist = [infileList[jj]]
runMPS_wrapper(newlist, RunDir=RunDir, customAppl=customAppl)
# Ask for additional jobs
# -----------------------
while(True):
req = comm.isend(data, dest=0, tag=rank+2000)
req.wait()
data = comm.recv(source=0, tag=rank+1000)
jj = data['jj']
if(jj < 0):
break
else:
newlist = [infileList[jj]]
runMPS_wrapper(newlist, RunDir=RunDir, customAppl=customAppl)
return
if(not has_mpi):
def runMPS_wrapper(infileList, RunDir=None, Debug=False, customAppl=None):
"""
Pass call to :py:func:`tools.runMPS` but handle errors from fortran
library. Up to now we ignore those errors and continue with the next
job.
For arguments see in :py:func:`tools.runMPS`.
"""
pass
else:
[docs] def runMPS_wrapper(infileList, RunDir=None, Debug=False, customAppl=None):
try:
runMPS(infileList, RunDir=RunDir, Debug=Debug,
customAppl=customAppl)
except MPSFortLibError:
print("Job " + infileList[0] + " failed.")
return