HWRF  trunk@4391
lsf_cray_intel.py
1 ## @namespace produtil.mpi_impl.lsf_cray_intel
2 # Adds support for LSF+aprun with the Intel OpenMP to produtil.run
3 #
4 # This module is part of the mpi_impl package -- see produtil.mpi_impl
5 # for details. This implements the bizarre combination of LSF, Cray
6 # aprun with Intel OpenMP.
7 
8 import os, socket, logging
10 
11 from .mpi_impl_base import MPIMixed,CMDFGen
12 
13 module_logger=logging.getLogger('lsf_cray_intel')
14 
15 ##@var mpirun_lsf_path
16 # Path to the mpirun.lsf program, or None if it isn't found.
17 aprun_path=produtil.fileop.find_exe('aprun',raise_missing=False)
18 
19 class Syncer(object):
20  """!Calls sync and aprun -B sync. This is meant to be run before
21  aprunning another program."""
22  def __init__(self):
23  super(Syncer,self).__init__()
24  def __call__(self,runner=None,logger=None):
25  if logger is None: logger=module_logger
26  logger.info("Running sync...")
27  produtil.pipeline.simple_run(["/bin/sync"],logger=logger)
28  logger.info('Running aprun -B sync...')
29  produtil.pipeline.simple_run([aprun_path,'-B','/bin/sync'],logger=logger)
30 
31 def runsync(logger=None):
32  """!Runs the "sync" command as an exe()."""
33  s=Syncer()
34  s(logger=logger)
35 
36 def openmp(arg,threads):
37  """!Adds OpenMP support to the provided object
38 
39  @param arg An produtil.prog.Runner or
40  produtil.mpiprog.MPIRanksBase object tree
41  @param threads the number of threads, or threads per rank, an
42  integer"""
43 
44  if threads is None:
45  try:
46  ont=os.environ.get('OMP_NUM_THREADS','')
47  ont=int(ont)
48  if ont>0:
49  threads=ont
50  except (KeyError,TypeError,ValueError) as e:
51  pass
52 
53  if threads is None:
54  nodesize=os.environ.get('PRODUTIL_RUN_NODESIZE','24')
55  nodesize=int(nodesize)
56  threads=max(1,nodesize-1)
57 
58  assert(threads>0)
59  threads=int(threads)
60  module_logger.info('Threaded with %s threads so add -cc depth.'%(
61  repr(threads),))
62  for a in reversed(['-cc','depth','-d',str(threads)]):
63  arg=arg.argins(1,a)
64 
65  arg=arg.env(KMP_AFFINITY='disabled',OMP_NUM_THREADS=threads)
66  arg.threads=threads
67 
68  return arg
69 
70 def aprun_ln_sf(source,target,content,logger=None):
72  aprun_path,"-N","1","-n","1","-j","1","/bin/ln","-sf",
73  str(source),str(target)], logger=logger)
74  content=os.readlink(target)
75  if content==source: return True
77  "FILESYSTEM FAILURE: Python and aprun ln -sf both cannot symlink \"%s\" -> \"%s\". Instead, the symlink is to \"%s\"."%(
78  target,source,content),target)
79 
80 def detect():
81  """!Determines if Cray aprun should be used to run MPI programs by
82  looking for the aprun program in $PATH"""
83  detected=aprun_path is not None
84  if detected:
85  produtil.fileop.ln_sf = aprun_ln_sf
86  return detected
87 
89  """!Does this module represent an MPI implementation? Returns True."""
90  return True
91 
92 def make_bigexe(exe,**kwargs):
93  """!Returns an ImmutableRunner that will run the specified program.
94  @returns an empty list
95  @note This function does NOT search $PATH. That ensures the $PATH
96  will be expanded on the compute node instead. Use
97  produtil.fileop.find_exe() if you want to explicitly search the
98  PATH before execution
99  @param exe The executable to run on compute nodes.
100  @param kwargs Ignored."""
101  exe=str(exe)
102  inside_aprun=int(os.environ.get('INSIDE_APRUN','0'))
103  inside_aprun+=1
104  r=produtil.prog.Runner(['aprun','-N','1','-j','1','-n','1',exe])\
105  .env(INSIDE_APRUN=str(inside_aprun))
106  r.prerun(Syncer())
108 
109 def mpirunner(arg,allranks=False,logger=None,**kwargs):
110  if logger is None:
111  logger=module_logger
112  m=mpirunner2(arg,allranks=allranks,logger=logger,**kwargs)
113  logger.info('mpirunner: %s => %s'%(repr(arg),repr(m)))
114  return m
115 
116 def mpirunner2(arg,allranks=False,logger=None,**kwargs):
117  """!Turns a produtil.mpiprog.MPIRanksBase tree into a produtil.prog.Runner
118  @param arg a tree of produtil.mpiprog.MPIRanksBase objects
119  @param allranks if True, and only one rank is requested by arg, then
120  all MPI ranks will be used
121  @param logger a logging.Logger for log messages
122  @param kwargs passed to produtil.mpi_impl.mpi_impl_base.CMDFGen
123  when mpiserial is in use.
124  @returns a produtil.prog.Runner that will run the selected MPI program"""
125  if logger is None:
126  logger=logging.getLogger('lsf_cray_intel')
127  assert(isinstance(arg,produtil.mpiprog.MPIRanksBase))
128  (serial,parallel)=arg.check_serial()
129  if serial and parallel:
130  raise MPIMixed(
131  'Cannot mix serial and parallel MPI ranks in the same '
132  'MPI program.')
133  nodesize=os.environ.get('PRODUTIL_RUN_NODESIZE','24')
134  nodesize=int(nodesize)
135  hyperthreads=os.environ.get('PRODUTIL_RUN_HYPERTHREADS','1')
136  hyperthreads=int(hyperthreads)
137  maxtasks=os.environ['TOTAL_TASKS']
138  maxtasks=int(maxtasks)
139 
140  # The returned runner object. We'll add to this below:
141  runner=produtil.prog.Runner([aprun_path]).env(KMP_AFFINITY='disabled')
142  threads=arg.threads
143  logger.info('Decide what to do with -cc option.')
144  if threads is not None and threads>1:
145  logger.info('Threaded with %s threads so add -cc depth.'%(
146  repr(threads),))
147  runner['-cc','depth','-d',str(threads)]
148  else:
149  logger.info('No threads (threads=%s).'%(repr(threads),))
150 
151  # Set up the INSIDE_APRUN variable so the executed MPI program
152  # will be able to run serial programs.
153  inside_aprun=int(os.environ.get('INSIDE_APRUN','0'))
154  inside_aprun+=1
155  runner=runner.env(INSIDE_APRUN=str(inside_aprun))
156 
157  if arg.nranks()==1 and allranks:
158  for rank,count in arg.expand_iter(expand=False):
159  pernode=min(maxtasks,nodesize)
160  runner=runner['-j','%d'%(hyperthreads),
161  '-N','%d'%(pernode),
162  '-n','%d'%(maxtasks)][rank.args()]
163  runner.prerun(Syncer())
164  return runner
165  elif allranks:
166  raise MPIAllRanksError(
167  "When using allranks=True, you must provide an mpi program "
168  "specification with only one MPI rank (to be duplicated across "
169  "all ranks).")
170  elif serial:
171  lines=[a for a in arg.to_arglist(to_shell=True,expand=True)]
172  if produtil.fileop.find_exe('mpiserial') is None:
173  raise MPISerialMissing(
174  'Attempting to run a serial program via aprun mpiserial but '
175  'the mpiserial program is not in your $PATH.')
176  pernode=min(arg.nranks(),nodesize)
177  return runner['-n%s'%(arg.nranks()),
178  '-j','1','-N','%d'%pernode,'mpiserial']\
179  .prerun(CMDFGen('serialcmdf',lines,**kwargs)) \
180  .prerun(Syncer())
181  else:
182  first=True
183 
184  maxtasks=1
185  for rank,count in arg.expand_iter(expand=False):
186  maxtasks=max(count,maxtasks)
187 
188  for rank,count in arg.expand_iter(expand=False):
189  pernode=min(count,maxtasks,nodesize)
190  if not first: runner=runner[':']
191  runner=runner['-j','%d'%(hyperthreads),
192  '-N','%d'%(pernode),
193  '-n','%d'%(count),
194  rank.to_shell()]
195  first=False
196  runner.prerun(Syncer())
197  return runner
198 
This module provides a set of utility functions to do filesystem operations.
Definition: fileop.py:1
def runsync
Runs the "sync" command as an exe().
def detect()
Determines if Cray aprun should be used to run MPI programs by looking for the aprun program in $PATH...
def make_bigexe(exe, kwargs)
Returns an ImmutableRunner that will run the specified program.
Implements the produtil.run: provides the object tree for representing shell commands.
Definition: prog.py:1
Calls sync and aprun -B sync.
def can_run_mpi()
Does this module represent an MPI implementation? Returns True.
def mpirunner2(arg, allranks=False, logger=None, kwargs)
Turns a produtil.mpiprog.MPIRanksBase tree into a produtil.prog.Runner.
Object structure for describing MPI programs.
Definition: mpiprog.py:1
def openmp(arg, threads)
Adds OpenMP support to the provided object.
This is the abstract superclass of all classes that represent one or more MPI ranks, including MPI ranks that are actually serial programs.
Definition: mpiprog.py:68
Represents a single stage of a pipeline to execute.
Definition: prog.py:299
def find_exe
Searches the $PATH or a specified iterable of directory names to find an executable file with the giv...
Definition: fileop.py:573
An copy-on-write version of Runner.
Definition: prog.py:884