Source code for paralleltools

 Module to data-parallelize simulations and write pbs scripts.
 See the manual for examples of parallel usage.

 last revision 070313-MLW
import subprocess
#import os.path
import os
from random import randint
from shutil import copyfile
import numpy as np
#import tools - python3 conversion: never referenced
import itertools
from copy import deepcopy
#from .mpo import MPO
#from .tools import WriteHparams, WriteMainFile, WriteFiles, runMPS
#from .tools import MPSFortLibError
#from .dynamics import CheckOrDefault

from sys import version_info
if(version_info[0] < 3):
    from itertools import izip as myzip
    myzip = zip

[docs]def Writepbsscript(fileStub, nodes, ExecutableName='Execute_MPSParallelMain', InputFileNames=[], MyPyFile=None, time='12:00:00', computer='mio', myusername=None): """ Write a pbs script for the given parallel job. Returns name of the pbs script written. **Arguments** fileStub : str the job id to identify the simulation nodes : str (maybe int, less likely list of nodes???) specifying the number of nodes on the cluster. ExecutableName : str name of the OpenMPS parallel executable name default to `ExecuteMPSParallelMain` InputFileNames : list list what to execute default to empty list MyPyFile : str corresponding python script copied to keep track of the input for the job. default to None time : str walltime for the simulation. default to `12:00:00` computer : str Choosing between HPC mio and ra. default to `mio` myusername : str used to email information about the job status to your email adress. default to None """ if not isinstance(InputFileNames,list): InputFileNames=[InputFileNames] outputdir='$PBS_JOBID' pbsName=fileStub+'.pbs' pbsFile=open(pbsName,'w') pbsFile.write('#!/bin/csh\n') if computer=='mio': pbsFile.write('#PBS -l nodes='+str(nodes)+':ppn=8:lcarr'+'\n') elif computer=='Ra': pbsFile.write('#PBS -l nodes='+str(nodes)+':ppn=8'+'\n') else: raise Exception("Computer "+computer+" not recognized in Writepbsscript!") pbsFile.write('#PBS -l walltime='+time+'\n') pbsFile.write('#PBS -N '+fileStub+'\n') pbsFile.write('#PBS -e '+outputdir+'/'+fileStub+'err'+'\n') pbsFile.write('#PBS -V'+'\n') if myusername is not None: pbsFile.write('#PBS -m abe'+'\n') pbsFile.write('#PBS -M '+str(myusername)+''+'\n') pbsFile.write('#------------------------'+'\n') pbsFile.write('cd $PBS_O_WORKDIR'+'\n') pbsFile.write('mkdir '+outputdir+'\n') pbsFile.write('sort -u $PBS_NODEFILE >'+outputdir+'/mynodes'+'\n') if MyPyFile is not None: #generate a random string, and copy our python file to there # this ensures that this file exists at runtime even if we have changed its parameters in between #once the parallel job starts, this random string is overwritten by the PBS_JOBID random_string = str(randint(0, 10000)) if not os.path.isfile(MyPyFile): raise Exception("The python file "+str(MyPyFile)+" to be copied in pbs script does not exist!") copiedfile=str(MyPyFile)[0:-3] #remove .py copiedfile=copiedfile+random_string+'.py' copyfile(str(MyPyFile),copiedfile) pbsFile.write('cp '+str(copiedfile)+' '+outputdir+'/'+str(fileStub)+'.py\n') pbsFile.write('rm '+str(copiedfile)+'\n') for file in InputFileNames: pbsFile.write('mpiexec '+str(ExecutableName)+' '+str(file)+' > '+outputdir+'/'+fileStub+'out\n') pbsFile.close() return pbsName
[docs]def WriteMPSParallelFiles(Parameters, Operators, HamiltonianMPO, comp_info, PostProcess=False): """ Write MPI files for fortran. Returns either name of the pbs/slurm script for ``PostProcess=False`` or the dictionaries with simulations (being consistent with WriteMPSParallelTemplate). **Arguments** Parameters : list of dictionaries contains the simulation for each simulation in dictionaries. Operators : dictionary containing all operators necessary for the simulation. HamiltonianMPO : instance of :py:class:`MPO.MPO` defines the Hamiltonian of the system. comp_info : dictionary information for setting up a simulation on the cluster. Key ``queueing`` decides between using pbs (`pbs`) or slurm/sbatch (`slurm`) format. If key not given, the default key `pbs` for pbs is set automatically. Keys are for the pbs-script ``computer`` (default `mio`), ``myusername`` (`None`, used for email), ``time`` (`12:00:00`), ``nodes`` (obligatory), and ``ThisFileName`` (`None`, uses then random string). For the slurm/sbatch please view the function description of the dictionary `cinfo` in :py:func:`Paralleltools.write_sbatch_script`. PostProcess : Bool, optional flag if simulation should be run (``False``) or if only data is analyzed (``True``). default to False """ # Disable HDF5 for MPI simulations for param in Parameters: param['hdf5'] = False files = WriteFiles(Parameters, Operators, HamiltonianMPO, PostProcess=PostProcess) if(not PostProcess): stub = WriteParallelfiles(files, Parameters[0]['job_ID'], Parameters[0]['Write_Directory'], Parameters[0]['Output_Directory']) CheckOrDefault(comp_info, 'queueing', 'pbs') if(comp_info['queueing'] == 'pbs'): CheckOrDefault(comp_info, 'computer', 'mio') sn = Writepbsscript(Parameters[0]['job_ID'], comp_info['nodes'], ExecutableName='Execute_MPSParallelMain', InputFileNames=[stub], MyPyFile=comp_info['ThisFileName'], time=comp_info['time'], computer=comp_info['computer'], myusername=comp_info['myusername']) elif(comp_info['queueing'] == 'slurm'): sn = write_sbatch_script(Parameters[0]['job_ID'], comp_info, InputFileNames=[stub]) else: raise ValueError('Unknown queueing system. Neither "pbs" ' + 'nor "slurm".') return sn else: return Parameters
[docs]def WriteMPSParallelTemplate(ptemplate, Operators, H, comp_info, staticParamsforIteration=[], staticIterators=[], itermethod='CartesianProduct', PostProcess=False): """ Write parallel files that the master can send to worker units. Returns list or dictionary with simulation settings for `PostProcess=True`, and otherwise the name of the pbs/slurm script. **Arguments** ptemplate : dictionary dictionary containing the simulation settings Operators : dictionary containing all operators necessary for the simulation. H : instance of :py:class:`MPO.MPO` defines the Hamiltonian of the system. comp_info : dictionary information for setting up a simulation on the cluster. Key ``queueing`` decides between using pbs (`pbs`) or slurm/sbatch (`slurm`) format. If key not given, the default key `pbs` for pbs is set automatically. Keys are for the pbs-script ``computer`` (default `mio`), ``myusername`` (`None`, used for email), ``time`` (`12:00:00`), ``nodes`` (obligatory), and ``ThisFileName`` (`None`, uses then random string). For the slurm/sbatch please view the function description of the dictionary `cinfo` in :py:func:`Paralleltools.write_sbatch_script`. staticParamsforIteration : list, optional parameters to be iterated over given through their identifier string. default to empty list staticIterators : list, optional corresponding values for the parameters to be iterated over. Order should correspond to the order in `staticParamsforIteration`. default to empty list itermethod : string Defining how to derive the simulation parameters from the list of iterators. Options are 1) `CartesianProduct`, e.g. building from parameters values [a1, a2] and [b1, b2] the four simulations (a1, b1), (a1, b2), (a2, b1) and (a2, b2). 2) `Linear`, e.g. building from the parameter values [a1, a2] and [b1, b2] the two simulations (a1, b1) and (a2, b2). default to `CartesianProduct` PostProcess : Bool, optional flag if simulation should be run (``False``) or if only data is analyzed (``True``). default to False """ CheckOrDefault(comp_info, 'queueing', 'pbs') #at the end, get a long list of [H] and [p]s, and then pass that guy into writefiles bigH=[] bigsp=[] # Form the actual list to be iterated over if staticIterators==[]: longiterator = None elif(itermethod == "CartesianProduct"): longiterator = itertools.product(*staticIterators) elif(itermethod == "Linear"): longiterator = myzip(staticIterators) else: raise ValueError("Unknown `itermethod`.") if not isinstance(H,list): H=[H] ptemplate=[ptemplate] if not isinstance(ptemplate,list): raise Exception("If H is a list in WriteMPSParallelFiles then ptemplate must be, too...") if len(ptemplate)!=len(H): raise Exception("Lengths of ptemplate and H must agree in WriteMPSParallelFiles") if 'Write_Directory' in ptemplate[0]: writedir=ptemplate[0]['Write_Directory'] else: writedir='' if 'Output_Directory' in ptemplate[0]: outdir=ptemplate[0]['Output_Directory'] else: outdir='' stub=writedir+ptemplate[0]['job_ID'] jobPoolfileName=stub+'jobPool.dat' if not PostProcess: try: #restart simulation jobPool=open(jobPoolfileName,"r") jobPool.close() files=[''] stub=WriteParallelfiles(files,ptemplate[0]['job_ID'],writedir,outdir) if(comp_info['queueing'] == 'pbs'): CheckOrDefault(comp_info, 'computer', 'mio') sn = Writepbsscript(ptemplate[0]['job_ID'], comp_info['nodes'], ExecutableName='Execute_MPSParallelMain', InputFileNames=[stub], time=comp_info['time'], computer=comp_info['computer'], myusername=comp_info['myusername']) elif(comp_info['queueing'] == 'slurm'): sn = write_sbatch_script(ptemplate[0]['job_ID'], comp_info, InputFileNames=[stub]) else: raise ValueError('Unknown queueing system. Neither "pbs" ' + 'nor "slurm".') print('restarting from old values!') return sn except: pass if longiterator is None: bigsp=ptemplate bigH=H else: for i in longiterator: for k in range(len(H)): littleH=H[k] p=ptemplate[k] if 'unique_ID' in p: baseUnique=p['unique_ID'] else: baseUnique='' myunique=deepcopy(baseUnique) copytmp=deepcopy(p) for j in range(len(staticParamsforIteration)): copytmp[staticParamsforIteration[j]]=i[j] myunique=staticParamsforIteration[j]+str(i[j])+myunique copytmp['unique_ID']=myunique bigsp.append(copytmp) bigH.append(littleH) files=WriteFiles(bigsp,Operators,bigH,PostProcess=PostProcess) if not PostProcess: stub=WriteParallelfiles(files,ptemplate[0]['job_ID'],writedir,outdir) if(comp_info['queueing'] == 'pbs'): sn = Writepbsscript(ptemplate[0]['job_ID'], comp_info['nodes'], ExecutableName='Execute_MPSParallelMain', InputFileNames=[stub], MyPyFile=comp_info['ThisFileName'], time=comp_info['time'], computer=comp_info['computer'], myusername=comp_info['myusername']) elif(comp_info['queueing'] == 'slurm'): sn = write_sbatch_script(ptemplate[0]['job_ID'], comp_info, InputFileNames=[stub]) else: raise ValueError('Unknown queueing system. Neither "pbs" ' + 'nor "slurm".') return sn else: return bigsp
[docs]def WriteParallelfiles(commands,job_ID,writedir='',outdir=''): """ Write parallel files from Params **Arguments** commands : list containing the main setting files passed to fortran for each simulation, e.g. as returned by :py:func:`tools.WriteFiles`. job_ID : str job identifier of the parameter dictionary. writedir : str, optional could the temporary directory of the MPS simulation specified in the parameter dictionary default to empyt string outdir : str, optional could be the output directory specified in the parameter dictionary. default to empty string """ #empty return if commands is None: return None stub=writedir+job_ID jobPoolfileName=stub+'jobPool.dat' progressfileName=stub+'progress.dat' try: jobPool=open(jobPoolfileName,"r") progress=open(progressfileName,"r") nfiles=int(jobPool.readline().replace('\n','')) inwritedir=jobPool.readline().replace('\n','') #if both exist, take jobs from the completed file and remove them from the jobPool completed_ids=[] for line in progress: completed_ids.append(int(line.replace('\n',''))) jobs=[] fresh_id=0 ids=[] for line in jobPool: vec=line.replace('\n','').split(' ') vec=[elem for elem in vec if elem!=''] if int(vec[0]) not in completed_ids: ids.append(fresh_id) jobs.append(vec[1:]) fresh_id+=1 else: #remove id from progress completed_ids=[thing for thing in completed_ids if thing!=int(vec[0])] #remove job's files? #Delete files and re-write jobPool.close() progress.close() os.remove(jobPoolfileName) os.remove(progressfileName) progress=open(progressfileName,"w") progress.close() jobPool=open(jobPoolfileName,"w") jobPool.write('%16i'%(nfiles)+'\n') jobPool.write('%132s'%(inwritedir)+'\n') i=0 for job in jobs: rest=job[0:] jobPool.write('%16i'%(ids[i])+''.join(['%132s'%(thing) for thing in rest])+'\n') i+=1 jobPool.close() except: jobPool=open(jobPoolfileName,"w") progress=open(progressfileName,"w") try: jobPool.write('%16i'%(1)+'\n') except: jobPool.write('%16i'%(0)+'\n') jobPool.write('%132s'%(writedir)+'\n') fresh_id=0 for command in commands: vec='%132s%16i'%(command[0], int(command[1])) jobPool.write('%16i'%(fresh_id)+vec+'\n') fresh_id+=1 jobPool.close() return stub
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% # FUNCTIONS TO WRITE SLURM/SBATCH SCRIPTS # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
[docs]def write_sbatch_script(fileStub, cinfo, ExecutableName="Execute_MPSParallelMain", InputFileNames=[]): """ Write a slurm/sbatch script for submitting a job on a cluster. Returns name of slurm script written. **Arguments** fileStub : str the job id to identify the simulation. If you run multiple simulation this one must already be unique at the current stage cinfo : dictionary containing all the setting for the cluster. The following keys can defined: * ``myusername`` : used to email information about the job status to your email adress. Default to `None`. * ``computer`` : used to deduct the number of cores. More about the the number of tasks specified you can find in :py:func:`Paralleltools.get_ntasks`. Default to `mio`. * ``time`` : walltime for the simulation, default to `12:00:00`. * ``nodes`` : can be an integer with the number of nodes or a list with three-character string specifying the nodes. Default to 1. * ``ThisFileName`` : corresponding python script copied to keep track of the input for the job. Default to `None`. * ``partition`` : set partition flag of SBATCH, default to `None`. * ``mem`` : set memory for job, default to `None`. * ``exclusive`` : if `True`, nodes are blocked for any other jobs in case any remaining cores would be available. Default to `True`. * ``mpi`` : choose mpi executer, default to `srun` (alternative `mpiexec`) ExecutableName : str name of the OpenMPS parallel executable name default to `ExecuteMPSParallelMain` InputFileNames : list list what to execute default to empty list """ # Take settings from cinfo or set default # ------------------------------------------- CheckOrDefault(cinfo, 'myusername', None) CheckOrDefault(cinfo, 'computer', 'mio') CheckOrDefault(cinfo, 'time', '12:00:00') CheckOrDefault(cinfo, 'nodes', 1) CheckOrDefault(cinfo, 'ThisFileName', None) CheckOrDefault(cinfo, 'partition', None) CheckOrDefault(cinfo, 'mem', None) CheckOrDefault(cinfo, 'exclusive', True) CheckOrDefault(cinfo, 'mpi', 'srun') outputdir = '$SLURM_JOBID' # Prepare file and directories sbatch_name = fileStub + '.slurm' sbatch_file = open(sbatch_name, 'w') # Write header of batch file # ========================== sbatch_file.write('#!/bin/bash -x \n') sbatch_file.write('#SBATCH --job-name=' + fileStub + '\n') # node setting # ------------ if(not (cinfo['partition'] is None)): sbatch_file.write('#SBATCH --partition=' + cinfo['partition'] + ' \n') if(hasattr(cinfo['nodes'], '__len__')): # Assume a list of nodes is passed sbatch_file.write('#SBATCH --nodelist=' + get_nodelist(cinfo) + ' \n') sbatch_file.write('#SBATCH --nodes=' + str(len(cinfo['nodes'])) + '\n') else: # Assume it is an integer with the number of nodes sbatch_file.write('#SBATCH --nodes=' + str(cinfo['nodes']) + ' \n') sbatch_file.write('#SBATCH --ntasks-per-core=1 \n') sbatch_file.write('#SBATCH --ntasks=' + get_ntasks(cinfo) + ' \n') if(cinfo['exclusive']): sbatch_file.write('#SBATCH --exclusive \n') sbatch_file.write('#SBATCH --time=' + cinfo['time'] + ' \n') # Standard out/error and mailing sbatch_file.write('#SBATCH -o ' + fileStub + '.out \n') sbatch_file.write('#SBATCH -e ' + fileStub + '.err \n') if(not (cinfo['myusername'] is None)): sbatch_file.write('#SBATCH --mail-type=ALL \n') sbatch_file.write('#SBATCH --mail-user=' + cinfo['myusername'] + ' \n') # export environment variables sbatch_file.write('#SBATCH --export=ALL \n') # Write executing part of batch file # ================================== sbatch_file.write('\n') sbatch_file.write('\n') sbatch_file.write('date\n') sbatch_file.write('cd $SLURM_SUBMIT_DIR \n') sbatch_file.write('mkdir ' + outputdir + ' \n') # what is this good for #sbatch_file.write('sort -u $SBATCH_NODEFILE > ' + outputdir + '/mynodes \n') if(cinfo['ThisFileName'] is not None): # Generate a random string, and copy our python file to there # this ensures that this file exists at runtime even if we have changed # its parameters in between once the parallel job starts, this random # string is overwritten by the PBS_JOBID if(not os.path.isfile(cinfo['ThisFileName'])): raise Exception("The python file " + str(cinfo['ThisFileName']) + " to be copied in pbs script does not exist!") # Remove appendix `.py` and construct random file name copiedfile = cinfo['ThisFileName'][:-3] + str(randint(0, 10000)) + '.py' copyfile(cinfo['ThisFileName'], copiedfile) sbatch_file.write('cp '+ str(copiedfile) + ' ' + outputdir + '/' + str(fileStub) + '.py\n') sbatch_file.write('rm ' + str(copiedfile) + '\n') for file in InputFileNames: sbatch_file.write(cinfo['mpi'] + ' ' + str(ExecutableName) + ' ' + str(file) + ' > ' + outputdir + '/' + fileStub + 'out\n') sbatch_file.write('date\n') sbatch_file.write('mv ' + fileStub + '.out $SLURM_JOBID/. \n') sbatch_file.write('mv ' + fileStub + '.err $SLURM_JOBID/. \n') sbatch_file.close() return sbatch_name
[docs]def get_ntasks(cinfo): """ Return the number of nodes as string based on a nodelist or on the number of nodes. If no information is available, eight cores per node are assumed. **Arguments** cinfo : dictionary containing the setting of the job for a cluster, used within here are the dictionary keys ``computer`` and ``nodes`` """ if(cinfo['computer'] == 'mio'): if(not hasattr(cinfo['nodes'], '__len__')): # Request for n node without specifying node, be safe, return 8 return str(8 * cinfo['nodes']) # Look up number of cores for each nodes depending on dictionary cores = {} # 8-core nodes for ii in range(32): cores[str(ii).zfill(3)] = 8 # 12-core nodes for ii in range(32, 102): if((ii == 48) or (ii == 53)): continue cores[str(ii).zfill(3)] = 12 # 16-core nodes for ii in range(102, 126): cores[str(ii).zfill(3)] = 16 # 20-core nodes for ii in range(126, 132): cores[str(ii).zfill(3)] = 20 # 24-core nodes for ii in range(132, 175): cores[str(ii).zfill(3)] = 24 ntasks = 0 for node in cinfo['nodes']: try: ntasks += cores[node] except KeyError: # Avoid cases where node was not found print('Warning: Check that key "nodes" contains 3-digits ' + 'strings or add missing node to dictionary in ' + ', function get_ntasks') ntasks += 8 return str(ntasks) else: print('Warning: no specific setting for your computer available. '+ 'Using default setting with 8 cores per node.') if(hasattr(cinfo['nodes'], '__len__')): return str(8 * len(cinfo['nodes'])) else: return str(8 * cinfo['nodes'])
[docs]def get_nodelist(cinfo): """ Return the nodelist as string **Arguments** cinfo : dictionary containing the setting of the job for a cluster, used within here is the dictionary key ``nodes`` which should be a list when calling this function. """ nodelist = "compute[" for node in cinfo['nodes']: nodelist += node + ',' return nodelist[:-1] + ']'
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% # FUNCTIONS TO RUN MPI4PY # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% # # Enable mpi4py by command line argument - too many accidental errors when # mpi4py present, but no intention to use it # Documentation for sphinx always in non-mpi section import sys has_mpi = '--mpi4py' in sys.argv try: if(has_mpi): from mpi4py import MPI as mpi import threading except ImportError: has_mpi = False if(not has_mpi): def WriteFiles_mpi(Parameters, Operators, HamiltonianMPO, PostProcess=False): """ This files has the same purpose as :py:func:`tools.WriteFiles` for MPI runs with `mpi4py` and the argument description should be read there. The files are only written once and broadcasted to every core. """ print("Cannot run mpi: no module mpi4py or threading!") return [] else:
[docs] def WriteFiles_mpi(Parameters, Operators, HamiltonianMPO, PostProcess=False): comm = mpi.COMM_WORLD rank = comm.Get_rank() if(rank == 0): filelist = WriteFiles(Parameters, Operators, HamiltonianMPO, PostProcess=PostProcess) data = encode_list(filelist) else: data = None data = comm.bcast(data, root=0) return decode_list(data)
if(not has_mpi): def encode_list(mylist): """ Put a list into a dictionary where the key is the position (as string) of the entry. **Arguments** mylist : list is converted to the described dictionary format. """ pass else:
[docs] def encode_list(mylist): dic = {} for ii in range(len(mylist)): dic[ii] = mylist[ii] return dic
if(not has_mpi): def decode_list(mydic): """ Decode a dictionary from :py:func:`Paralleltools.encode_list` back into a list. **Arguments** mydic : dictionary dictionary in the corresponding form, keys are 0 to lenght - 1 (as integer) """ pass else:
[docs] def decode_list(mydic): mylist = [] for ii in range(len(mydic)): mylist.append(mydic[ii]) return mylist
if(not has_mpi): def runMPS_mpi(infileList, RunDir=None, customAppl=None): """ This file has the same purpose as :py:func:`tools.runMPS` for MPI runs with `mpi4py` and the arguments are restricted. You have to call the python script with the command line argument ``--mpi4py`` to load mpi4py modules. **Arguments** infileList : list list of of the main files passed to fortran executable as command line argument. RunDir : str, optional Specifies path of the fortran-executable (local or global installation). This is available for default-simulation and custom application. (@Developers: Debugging with valgrind always works with a local installation.) `None` : if executable available within local folder, use local installation, otherwise assume global installation, default '' (empty string) : for global installation './' : local installation PATH+'/' : path to executable ending on slash `/`. Additional steps may be necessary when using this. customAppl : str, optional define custom executable. Global and local installation work as before. Custom applications cannot run with valgrind. default to `None` (running default executable) """ print("Cannot run mpi: no module mpi4py or threading!") return else:
[docs] def runMPS_mpi(infileList, RunDir=None, customAppl=None): comm = mpi.COMM_WORLD rank = comm.Get_rank() ncores = comm.Get_size() if(rank == 0): master_mpi(comm, ncores, rank, infileList, RunDir, customAppl) else: worker_mpi(comm, rank, infileList, RunDir, customAppl) comm.Barrier() return
if(not has_mpi): def master_mpi(comm, ncores, rank, infileList, RunDir, customAppl): """ This is the master for mpi runs taking care of the work distribution and calculating jobs via threading. **Arguments** comm : instance of mpi4py.MPI.COMM_WORLD communicator for the MPI program ncores : int number of processes in the MPI-run rank : int rank of this process (should be 0) infileList : list list of of the main files passed to fortran executable as command line argument. RunDir : str, optional Specifies path of the fortran-executable (local or global installation). This is available for default-simulation and custom application. (@Developers: Debugging with valgrind always works with a local installation.) `None` : if executable available within local folder, use local installation, otherwise assume global installation, default '' (empty string) : for global installation './' : local installation PATH+'/' : path to executable ending on slash `/`. Additional steps may be necessary when using this. customAppl : str, optional define custom executable. Global and local installation work as before. Custom applications cannot run with valgrind. default to `None` (running default executable) """ pass else:
[docs] def master_mpi(comm, ncores, rank, infileList, RunDir, customAppl): # Open a log file log = open('parallel.log', 'w+') # Get number of jobs njobs = len(infileList) if(njobs < ncores): raise ValueError("Don't waste resources!") # jobcounters job_j = 0 # Send out initial jobs # --------------------- newlist = [infileList[job_j]] debug_runmps = False log.write(str(job_j) + ' send to master.\n') t1 = threading.Thread(target=runMPS_wrapper, args=(newlist, RunDir, debug_runmps, customAppl)) t1.start() job_j += 1 for ii in range(1, ncores): data = {'jj' : job_j} req = comm.isend(data, dest=ii, tag=1000+ii) req.wait() log.write(str(job_j) + ' send out to core ' + str(ii) + '.\n') job_j += 1 # Send out jobs subsequently # -------------------------- status = mpi.Status() for jj in range(job_j, njobs): if(not t1.isAlive()): newlist = [infileList[jj]] debug_runmps = False log.write(str(jj) + ' send to master.\n') t1 = threading.Thread(target=runMPS_wrapper, args=(newlist, RunDir, debug_runmps, customAppl)) t1.start() continue # Receive rank of finished job req = comm.recv(source=mpi.ANY_SOURCE, tag=mpi.ANY_TAG, status=status) source = status.Get_source() tag = status.Get_tag() # Send next job there data = {'jj' : jj} req = comm.isend(data, dest=source, tag=source+1000) req.wait() log.write(str(jj) + ' send out to core ' + str(source) + '.\n') # Receive the last jobs # --------------------- for ii in range(ncores - 1): # Receive rank of finished job data = comm.recv(source=mpi.ANY_SOURCE, tag=mpi.ANY_TAG, status=status) source = status.Get_source() tag = status.Get_tag() # Send signal that there are no more jobs data = {'jj' : -1} req = comm.isend(data, dest=source, tag=source+1000) req.wait() # Wait for master thread to join t1.join() # Close log-file log.close() return
if(not has_mpi): def worker_mpi(comm, rank, infileList, RunDir, customAppl): """ This is a worker for mpi runs taking care of calculating jobs received from the master. **Arguments** comm : instance of mpi4py.MPI.COMM_WORLD communicator for the MPI program ncores : int number of processes in the MPI-run rank : int rank of this process (should be 0) infileList : list list of of the main files passed to fortran executable as command line argument. RunDir : str, optional Specifies path of the fortran-executable (local or global installation). This is available for default-simulation and custom application. (@Developers: Debugging with valgrind always works with a local installation.) `None` : if executable available within local folder, use local installation, otherwise assume global installation, default '' (empty string) : for global installation './' : local installation PATH+'/' : path to executable ending on slash `/`. Additional steps may be necessary when using this. customAppl : str, optional define custom executable. Global and local installation work as before. Custom applications cannot run with valgrind. default to `None` (running default executable) """ pass else:
[docs] def worker_mpi(comm, rank, infileList, RunDir, customAppl): # Run initial job # --------------- data = comm.recv(source=0, tag=rank+1000) jj = data['jj'] newlist = [infileList[jj]] runMPS_wrapper(newlist, RunDir=RunDir, customAppl=customAppl) # Ask for additional jobs # ----------------------- while(True): req = comm.isend(data, dest=0, tag=rank+2000) req.wait() data = comm.recv(source=0, tag=rank+1000) jj = data['jj'] if(jj < 0): break else: newlist = [infileList[jj]] runMPS_wrapper(newlist, RunDir=RunDir, customAppl=customAppl) return
if(not has_mpi): def runMPS_wrapper(infileList, RunDir=None, Debug=False, customAppl=None): """ Pass call to :py:func:`tools.runMPS` but handle errors from fortran library. Up to now we ignore those errors and continue with the next job. For arguments see in :py:func:`tools.runMPS`. """ pass else:
[docs] def runMPS_wrapper(infileList, RunDir=None, Debug=False, customAppl=None): try: runMPS(infileList, RunDir=RunDir, Debug=Debug, customAppl=customAppl) except MPSFortLibError: print("Job " + infileList[0] + " failed.") return