Package wafadmin :: Module Task
[hide private]
[frames] | no frames]

Source Code for Module wafadmin.Task

  1  #!/usr/bin/env python 
  2  # encoding: utf-8 
  3  # Thomas Nagy, 2005-2008 (ita) 
  4   
  5  """ 
  6  Running tasks in parallel is a simple problem, but in practice it is more complicated: 
  7  * dependencies discovered during the build (dynamic task creation) 
  8  * dependencies discovered after files are compiled 
  9  * the amount of tasks and dependencies (graph size) can be huge 
 10   
 11  This is why the dependency management is split on three different levels: 
 12  1. groups of tasks that run all after another group of tasks 
 13  2. groups of tasks that can be run in parallel 
 14  3. tasks that can run in parallel, but with possible unknown ad-hoc dependencies 
 15   
 16  The point #1 represents a strict sequential order between groups of tasks, for example a compiler is produced 
 17  and used to compile the rest, whereas #2 and #3 represent partial order constraints where #2 applies to the kind of task 
 18  and #3 applies to the task instances. 
 19   
 20  #1 is held by the task manager: ordered list of TaskGroups (see bld.add_group) 
 21  #2 is held by the task groups and the task types: precedence after/before (topological sort), 
 22     and the constraints extracted from file extensions 
 23  #3 is held by the tasks individually (attribute run_after), 
 24     and the scheduler (Runner.py) use Task::runnable_status to reorder the tasks 
 25   
 26  -- 
 27   
 28  To try, use something like this in your code: 
 29  import Constants, Task 
 30  Task.algotype = Constants.MAXPARALLEL 
 31   
 32  -- 
 33   
 34  There are two concepts with the tasks (individual units of change): 
 35  * dependency (if 1 is recompiled, recompile 2) 
 36  * order (run 2 after 1) 
 37   
 38  example 1: if t1 depends on t2 and t2 depends on t3 it is not necessary to make t1 depend on t3 (dependency is transitive) 
 39  example 2: if t1 depends on a node produced by t2, it is not immediately obvious that t1 must run after t2 (order is not obvious) 
 40   
 41  The role of the Task Manager is to give the tasks in order (groups of task that may be run in parallel one after the other) 
 42   
 43  """ 
 44   
 45  import os, types, shutil, sys, re, new, random, time 
 46  from Utils import md5 
 47  import Build, Runner, Utils, Node, Logs, Options 
 48  from Logs import debug, error, warn 
 49  from Constants import * 
 50   
 51  algotype = NORMAL 
 52  algotype = JOBCONTROL 
 53  algotype = MAXPARALLEL 
 54   
 55  """ 
 56  Enable different kind of dependency algorithms: 
 57  1 make groups: first compile all cpps and then compile all links (NORMAL) 
 58  2 parallelize all (each link task run after its dependencies) (MAXPARALLEL) 
 59  3 like 1 but provide additional constraints for the parallelization (MAXJOBS) 
 60   
 61  In theory 1. will be faster than 2 for waf, but might be slower for builds 
 62  The scheme 2 will not allow for running tasks one by one so it can cause disk thrashing on huge builds 
 63   
 64  """ 
 65   
66 -class TaskManager(object):
67 """The manager is attached to the build object, it holds a list of TaskGroup"""
68 - def __init__(self):
69 self.groups = [] 70 self.tasks_done = [] 71 self.current_group = 0
72
73 - def get_next_set(self):
74 """return the next set of tasks to execute 75 the first parameter is the maximum amount of parallelization that may occur""" 76 ret = None 77 while not ret and self.current_group < len(self.groups): 78 ret = self.groups[self.current_group].get_next_set() 79 if ret: return ret 80 else: self.current_group += 1 81 return (None, None)
82
83 - def add_group(self):
84 if self.groups and not self.groups[0].tasks: 85 warn('add_group: an empty group is already present') 86 self.groups.append(TaskGroup())
87
88 - def add_task(self, task):
89 if not self.groups: self.add_group() 90 self.groups[-1].add_task(task)
91
92 - def total(self):
93 total = 0 94 if not self.groups: return 0 95 for group in self.groups: 96 total += len(group.tasks) 97 return total
98
99 - def add_finished(self, tsk):
100 self.tasks_done.append(tsk) 101 bld = Build.bld 102 if Options.is_install: 103 f = None 104 if 'install' in tsk.__dict__: 105 f = tsk.__dict__['install'] 106 f(tsk) 107 else: 108 tsk.install()
109
110 -class TaskGroup(object):
111 "the compilation of one group does not begin until the previous group has finished (in the manager)"
112 - def __init__(self):
113 self.tasks = [] # this list will be consumed 114 115 self.cstr_groups = {} # tasks having equivalent constraints 116 self.cstr_order = {} # partial order between the cstr groups 117 self.temp_tasks = [] # tasks put on hold 118 self.ready = 0
119
120 - def reset(self):
121 "clears the state of the object (put back the tasks into self.tasks)" 122 for x in self.cstr_groups: 123 self.tasks += self.cstr_groups[x] 124 self.tasks = self.temp_tasks + self.tasks 125 self.temp_tasks = [] 126 self.cstr_groups = [] 127 self.cstr_order = {} 128 self.ready = 0
129
130 - def prepare(self):
131 "prepare the scheduling" 132 self.ready = 1 133 self.make_cstr_groups() 134 self.extract_constraints()
135
136 - def get_next_set(self):
137 "next list of tasks to execute using max job settings, returns (maxjobs, task_list)" 138 global algotype 139 if algotype == NORMAL: 140 tasks = self.tasks_in_parallel() 141 maxj = sys.maxint 142 elif algotype == JOBCONTROL: 143 (maxj, tasks) = self.tasks_by_max_jobs() 144 elif algotype == MAXPARALLEL: 145 tasks = self.tasks_with_inner_constraints() 146 maxj = sys.maxint 147 else: 148 raise Utils.WafError("unknown algorithm type %s" % (algotype)) 149 150 if not tasks: return () 151 return (maxj, tasks)
152
153 - def make_cstr_groups(self):
154 "unite the tasks that have similar constraints" 155 self.cstr_groups = {} 156 for x in self.tasks: 157 h = x.hash_constraints() 158 try: self.cstr_groups[h].append(x) 159 except KeyError: self.cstr_groups[h] = [x]
160
161 - def add_task(self, task):
162 try: self.tasks.append(task) 163 except KeyError: self.tasks = [task]
164
165 - def set_order(self, a, b):
166 try: self.cstr_order[a].add(b) 167 except KeyError: self.cstr_order[a] = set([b,])
168
169 - def compare_exts(self, t1, t2):
170 "extension production" 171 x = "ext_in" 172 y = "ext_out" 173 in_ = t1.attr(x, ()) 174 out_ = t2.attr(y, ()) 175 for k in in_: 176 if k in out_: 177 return -1 178 in_ = t2.attr(x, ()) 179 out_ = t1.attr(y, ()) 180 for k in in_: 181 if k in out_: 182 return 1 183 return 0
184
185 - def compare_partial(self, t1, t2):
186 "partial relations after/before" 187 m = "after" 188 n = "before" 189 name = t2.__class__.__name__ 190 if name in t1.attr(m, ()): return -1 191 elif name in t1.attr(n, ()): return 1 192 name = t1.__class__.__name__ 193 if name in t2.attr(m, ()): return 1 194 elif name in t2.attr(n, ()): return -1 195 return 0
196
197 - def extract_constraints(self):
198 "extract the parallelization constraints from the tasks with different constraints" 199 keys = self.cstr_groups.keys() 200 max = len(keys) 201 #a = self.__class__ 202 # hopefully the lenght of this list is short 203 for i in xrange(max): 204 t1 = self.cstr_groups[keys[i]][0] 205 for j in xrange(i + 1, max): 206 t2 = self.cstr_groups[keys[j]][0] 207 208 # add the constraints based on the comparisons 209 val = (self.compare_exts(t1, t2) 210 or self.compare_partial(t1, t2) 211 ) 212 if val > 0: 213 self.set_order(keys[i], keys[j]) 214 elif val < 0: 215 self.set_order(keys[j], keys[i])
216 217 #print "the constraint groups are:", self.cstr_groups, "and the constraints ", self.cstr_order 218 # TODO extract constraints by file extensions on the actions 219
220 - def tasks_in_parallel(self):
221 "(NORMAL) next list of tasks that may be executed in parallel" 222 223 if not self.ready: self.prepare() 224 225 #print [(a.name, cstrs[a].name) for a in cstrs] 226 keys = self.cstr_groups.keys() 227 228 unconnected = [] 229 remainder = [] 230 231 for u in keys: 232 for k in self.cstr_order.values(): 233 if u in k: 234 remainder.append(u) 235 break 236 else: 237 unconnected.append(u) 238 239 #print "unconnected tasks: ", unconnected, "tasks", [eq_groups[x] for x in unconnected] 240 241 toreturn = [] 242 for y in unconnected: 243 toreturn.extend(self.cstr_groups[y]) 244 245 # remove stuff only after 246 for y in unconnected: 247 try: self.cstr_order.__delitem__(y) 248 except KeyError: pass 249 self.cstr_groups.__delitem__(y) 250 251 if not toreturn and remainder: 252 raise Utils.WafError("circular order constraint detected %r" % remainder) 253 254 #print "returning", toreturn 255 return toreturn
256
257 - def tasks_by_max_jobs(self):
258 "(JOBCONTROL) returns the tasks that can run in parallel with the max amount of jobs" 259 if not self.ready: self.prepare() 260 if not self.temp_tasks: self.temp_tasks = self.tasks_in_parallel() 261 if not self.temp_tasks: return (None, None) 262 263 maxjobs = sys.maxint 264 ret = [] 265 remaining = [] 266 for t in self.temp_tasks: 267 m = getattr(t, "maxjobs", getattr(self.__class__, "maxjobs", sys.maxint)) 268 if m > maxjobs: 269 remaining.append(t) 270 elif m < maxjobs: 271 remaining += ret 272 ret = [t] 273 maxjobs = m 274 else: 275 ret.append(t) 276 self.temp_tasks = remaining 277 return (maxjobs, ret)
278
280 """(MAXPARALLEL) returns all tasks in this group, but add the constraints on each task instance 281 as an optimization, it might be desirable to discard the tasks which do not have to run""" 282 if not self.ready: self.prepare() 283 284 if getattr(self, "done", None): return None 285 286 for p in self.cstr_order: 287 for v in self.cstr_order[p]: 288 for m in self.cstr_groups[p]: 289 for n in self.cstr_groups[v]: 290 n.set_run_after(m) 291 self.cstr_order = {} 292 self.cstr_groups = {} 293 self.done = 1 294 return self.tasks[:] # make a copy
295
296 -class store_task_type(type):
297 "store the task types that have a name ending in _task into a map (remember the existing task types)"
298 - def __init__(cls, name, bases, dict):
299 super(store_task_type, cls).__init__(name, bases, dict) 300 name = cls.__name__ 301 302 if name.endswith('_task'): 303 name = name.replace('_task', '') 304 TaskBase.classes[name] = cls
305
306 -class TaskBase(object):
307 """Base class for all Waf tasks 308 309 The most important methods are (by usual order of call): 310 1 runnable_status: ask the task if it should be run, skipped, or if we have to ask later 311 2 __str__: string to display to the user 312 3 run: execute the task 313 4 post_run: after the task is run, update the cache about the task 314 315 This class should be seen as an interface, it provides the very minimum necessary for the scheduler 316 so it does not do much. 317 318 For illustration purposes, TaskBase instances try to execute self.fun (if provided) 319 """ 320 321 __metaclass__ = store_task_type 322 323 color = "GREEN" 324 maxjobs = sys.maxint 325 classes = {} 326 stat = None 327
328 - def __init__(self, normal=1):
329 self.hasrun = NOT_RUN 330 331 manager = Build.bld.task_manager 332 if normal: 333 manager.add_task(self)
334
335 - def __repr__(self):
336 "used for debugging" 337 return '\n\t{task: %s %s}' % (self.__class__.__name__, str(getattr(self, "fun", "")))
338
339 - def __str__(self):
340 "string to display to the user" 341 try: self.fun 342 except AttributeError: return self.__class__.__name__ + '\n' 343 else: return 'executing: %s\n' % self.fun.__name__
344
345 - def runnable_status(self):
346 "RUN_ME SKIP_ME or ASK_LATER" 347 return RUN_ME
348
349 - def can_retrieve_cache(self):
350 return False
351
352 - def call_run(self):
353 if self.can_retrieve_cache(): 354 return 0 355 return self.run()
356
357