HWRF  trunk@4391
1 ## @namespace produtil.mpi_impl.lsf_cray_intel
2 # Adds support for LSF+aprun with the Intel OpenMP to produtil.run
3 #
4 # This module is part of the mpi_impl package -- see produtil.mpi_impl
5 # for details. This implements the bizarre combination of LSF, Cray
6 # aprun with Intel OpenMP.
8 import os, socket, logging
11 from .mpi_impl_base import MPIMixed,CMDFGen
13 module_logger=logging.getLogger('lsf_cray_intel')
15 ##@var mpirun_lsf_path
16 # Path to the mpirun.lsf program, or None if it isn't found.
17 aprun_path=produtil.fileop.find_exe('aprun',raise_missing=False)
19 class Syncer(object):
20  """!Calls sync and aprun -B sync. This is meant to be run before
21  aprunning another program."""
22  def __init__(self):
23  super(Syncer,self).__init__()
24  def __call__(self,runner=None,logger=None):
25  if logger is None: logger=module_logger
26  logger.info("Running sync...")
27  produtil.pipeline.simple_run(["/bin/sync"],logger=logger)
28  logger.info('Running aprun -B sync...')
29  produtil.pipeline.simple_run([aprun_path,'-B','/bin/sync'],logger=logger)
31 def runsync(logger=None):
32  """!Runs the "sync" command as an exe()."""
33  s=Syncer()
34  s(logger=logger)
36 def openmp(arg,threads):
37  """!Adds OpenMP support to the provided object
39  @param arg An produtil.prog.Runner or
40  produtil.mpiprog.MPIRanksBase object tree
41  @param threads the number of threads, or threads per rank, an
42  integer"""
44  if threads is None:
45  try:
46  ont=os.environ.get('OMP_NUM_THREADS','')
47  ont=int(ont)
48  if ont>0:
49  threads=ont
50  except (KeyError,TypeError,ValueError) as e:
51  pass
53  if threads is None:
54  nodesize=os.environ.get('PRODUTIL_RUN_NODESIZE','24')
55  nodesize=int(nodesize)
56  threads=max(1,nodesize-1)
58  assert(threads>0)
59  threads=int(threads)
60  module_logger.info('Threaded with %s threads so add -cc depth.'%(
61  repr(threads),))
62  for a in reversed(['-cc','depth','-d',str(threads)]):
63  arg=arg.argins(1,a)
65  arg=arg.env(KMP_AFFINITY='disabled',OMP_NUM_THREADS=threads)
66  arg.threads=threads
68  return arg
70 def aprun_ln_sf(source,target,content,logger=None):
72  aprun_path,"-N","1","-n","1","-j","1","/bin/ln","-sf",
73  str(source),str(target)], logger=logger)
74  content=os.readlink(target)
75  if content==source: return True
77  "FILESYSTEM FAILURE: Python and aprun ln -sf both cannot symlink \"%s\" -> \"%s\". Instead, the symlink is to \"%s\"."%(
78  target,source,content),target)
80 def detect():
81  """!Determines if Cray aprun should be used to run MPI programs by
82  looking for the aprun program in $PATH"""
83  detected=aprun_path is not None
84  if detected:
85  produtil.fileop.ln_sf = aprun_ln_sf
86  return detected
89  """!Does this module represent an MPI implementation? Returns True."""
90  return True
92 def make_bigexe(exe,**kwargs):
93  """!Returns an ImmutableRunner that will run the specified program.
94  @returns an empty list
95  @note This function does NOT search $PATH. That ensures the $PATH
96  will be expanded on the compute node instead. Use
97  produtil.fileop.find_exe() if you want to explicitly search the
98  PATH before execution
99  @param exe The executable to run on compute nodes.
100  @param kwargs Ignored."""
101  exe=str(exe)
102  inside_aprun=int(os.environ.get('INSIDE_APRUN','0'))
103  inside_aprun+=1
104  r=produtil.prog.Runner(['aprun','-N','1','-j','1','-n','1',exe])\
105  .env(INSIDE_APRUN=str(inside_aprun))
106  r.prerun(Syncer())
109 def mpirunner(arg,allranks=False,logger=None,**kwargs):
110  if logger is None:
111  logger=module_logger
112  m=mpirunner2(arg,allranks=allranks,logger=logger,**kwargs)
113  logger.info('mpirunner: %s => %s'%(repr(arg),repr(m)))
114  return m
116 def mpirunner2(arg,allranks=False,logger=None,**kwargs):
117  """!Turns a produtil.mpiprog.MPIRanksBase tree into a produtil.prog.Runner
118  @param arg a tree of produtil.mpiprog.MPIRanksBase objects
119  @param allranks if True, and only one rank is requested by arg, then
120  all MPI ranks will be used
121  @param logger a logging.Logger for log messages
122  @param kwargs passed to produtil.mpi_impl.mpi_impl_base.CMDFGen
123  when mpiserial is in use.
124  @returns a produtil.prog.Runner that will run the selected MPI program"""
125  if logger is None:
126  logger=logging.getLogger('lsf_cray_intel')
127  assert(isinstance(arg,produtil.mpiprog.MPIRanksBase))
128  (serial,parallel)=arg.check_serial()
129  if serial and parallel:
130  raise MPIMixed(
131  'Cannot mix serial and parallel MPI ranks in the same '
132  'MPI program.')
133  nodesize=os.environ.get('PRODUTIL_RUN_NODESIZE','24')
134  nodesize=int(nodesize)
135  hyperthreads=os.environ.get('PRODUTIL_RUN_HYPERTHREADS','1')
136  hyperthreads=int(hyperthreads)
137  maxtasks=os.environ['TOTAL_TASKS']
138  maxtasks=int(maxtasks)
140  # The returned runner object. We'll add to this below:
141  runner=produtil.prog.Runner([aprun_path]).env(KMP_AFFINITY='disabled')
142  threads=arg.threads
143  logger.info('Decide what to do with -cc option.')
144  if threads is not None and threads>1:
145  logger.info('Threaded with %s threads so add -cc depth.'%(
146  repr(threads),))
147  runner['-cc','depth','-d',str(threads)]
148  else:
149  logger.info('No threads (threads=%s).'%(repr(threads),))
151  # Set up the INSIDE_APRUN variable so the executed MPI program
152  # will be able to run serial programs.
153  inside_aprun=int(os.environ.get('INSIDE_APRUN','0'))
154  inside_aprun+=1
155  runner=runner.env(INSIDE_APRUN=str(inside_aprun))
157  if arg.nranks()==1 and allranks:
158  for rank,count in arg.expand_iter(expand=False):
159  pernode=min(maxtasks,nodesize)
160  runner=runner['-j','%d'%(hyperthreads),
161  '-N','%d'%(pernode),
162  '-n','%d'%(maxtasks)][rank.args()]
163  runner.prerun(Syncer())
164  return runner
165  elif allranks:
166  raise MPIAllRanksError(
167  "When using allranks=True, you must provide an mpi program "
168  "specification with only one MPI rank (to be duplicated across "
169  "all ranks).")
170  elif serial:
171  lines=[a for a in arg.to_arglist(to_shell=True,expand=True)]
172  if produtil.fileop.find_exe('mpiserial') is None:
173  raise MPISerialMissing(
174  'Attempting to run a serial program via aprun mpiserial but '
175  'the mpiserial program is not in your $PATH.')
176  pernode=min(arg.nranks(),nodesize)
177  return runner['-n%s'%(arg.nranks()),
178  '-j','1','-N','%d'%pernode,'mpiserial']\
179  .prerun(CMDFGen('serialcmdf',lines,**kwargs)) \
180  .prerun(Syncer())
181  else:
182  first=True
184  maxtasks=1
185  for rank,count in arg.expand_iter(expand=False):
186  maxtasks=max(count,maxtasks)
188  for rank,count in arg.expand_iter(expand=False):
189  pernode=min(count,maxtasks,nodesize)
190  if not first: runner=runner[':']
191  runner=runner['-j','%d'%(hyperthreads),
192  '-N','%d'%(pernode),
193  '-n','%d'%(count),
194  rank.to_shell()]
195  first=False
196  runner.prerun(Syncer())
197  return runner
