1
2
3
4
5 """
6 Running tasks in parallel is a simple problem, but in practice it is more complicated:
7 * dependencies discovered during the build (dynamic task creation)
8 * dependencies discovered after files are compiled
9 * the amount of tasks and dependencies (graph size) can be huge
10
11 This is why the dependency management is split on three different levels:
12 1. groups of tasks that run all after another group of tasks
13 2. groups of tasks that can be run in parallel
14 3. tasks that can run in parallel, but with possible unknown ad-hoc dependencies
15
16 The point #1 represents a strict sequential order between groups of tasks, for example a compiler is produced
17 and used to compile the rest, whereas #2 and #3 represent partial order constraints where #2 applies to the kind of task
18 and #3 applies to the task instances.
19
20 #1 is held by the task manager: ordered list of TaskGroups (see bld.add_group)
21 #2 is held by the task groups and the task types: precedence after/before (topological sort),
22 and the constraints extracted from file extensions
23 #3 is held by the tasks individually (attribute run_after),
24 and the scheduler (Runner.py) use Task::runnable_status to reorder the tasks
25
26 --
27
28 To try, use something like this in your code:
29 import Constants, Task
30 Task.algotype = Constants.MAXPARALLEL
31
32 --
33
34 There are two concepts with the tasks (individual units of change):
35 * dependency (if 1 is recompiled, recompile 2)
36 * order (run 2 after 1)
37
38 example 1: if t1 depends on t2 and t2 depends on t3 it is not necessary to make t1 depend on t3 (dependency is transitive)
39 example 2: if t1 depends on a node produced by t2, it is not immediately obvious that t1 must run after t2 (order is not obvious)
40
41 The role of the Task Manager is to give the tasks in order (groups of task that may be run in parallel one after the other)
42
43 """
44
45 import os, types, shutil, sys, re, new, random, time
46 from Utils import md5
47 import Build, Runner, Utils, Node, Logs, Options
48 from Logs import debug, error, warn
49 from Constants import *
50
51 algotype = NORMAL
52 algotype = JOBCONTROL
53 algotype = MAXPARALLEL
54
55 """
56 Enable different kind of dependency algorithms:
57 1 make groups: first compile all cpps and then compile all links (NORMAL)
58 2 parallelize all (each link task run after its dependencies) (MAXPARALLEL)
59 3 like 1 but provide additional constraints for the parallelization (MAXJOBS)
60
61 In theory 1. will be faster than 2 for waf, but might be slower for builds
62 The scheme 2 will not allow for running tasks one by one so it can cause disk thrashing on huge builds
63
64 """
65
67 """The manager is attached to the build object, it holds a list of TaskGroup"""
69 self.groups = []
70 self.tasks_done = []
71 self.current_group = 0
72
74 """return the next set of tasks to execute
75 the first parameter is the maximum amount of parallelization that may occur"""
76 ret = None
77 while not ret and self.current_group < len(self.groups):
78 ret = self.groups[self.current_group].get_next_set()
79 if ret: return ret
80 else: self.current_group += 1
81 return (None, None)
82
84 if self.groups and not self.groups[0].tasks:
85 warn('add_group: an empty group is already present')
86 self.groups.append(TaskGroup())
87
91
93 total = 0
94 if not self.groups: return 0
95 for group in self.groups:
96 total += len(group.tasks)
97 return total
98
100 self.tasks_done.append(tsk)
101 bld = Build.bld
102 if Options.is_install:
103 f = None
104 if 'install' in tsk.__dict__:
105 f = tsk.__dict__['install']
106 f(tsk)
107 else:
108 tsk.install()
109
111 "the compilation of one group does not begin until the previous group has finished (in the manager)"
113 self.tasks = []
114
115 self.cstr_groups = {}
116 self.cstr_order = {}
117 self.temp_tasks = []
118 self.ready = 0
119
121 "clears the state of the object (put back the tasks into self.tasks)"
122 for x in self.cstr_groups:
123 self.tasks += self.cstr_groups[x]
124 self.tasks = self.temp_tasks + self.tasks
125 self.temp_tasks = []
126 self.cstr_groups = []
127 self.cstr_order = {}
128 self.ready = 0
129
135
152
154 "unite the tasks that have similar constraints"
155 self.cstr_groups = {}
156 for x in self.tasks:
157 h = x.hash_constraints()
158 try: self.cstr_groups[h].append(x)
159 except KeyError: self.cstr_groups[h] = [x]
160
162 try: self.tasks.append(task)
163 except KeyError: self.tasks = [task]
164
166 try: self.cstr_order[a].add(b)
167 except KeyError: self.cstr_order[a] = set([b,])
168
170 "extension production"
171 x = "ext_in"
172 y = "ext_out"
173 in_ = t1.attr(x, ())
174 out_ = t2.attr(y, ())
175 for k in in_:
176 if k in out_:
177 return -1
178 in_ = t2.attr(x, ())
179 out_ = t1.attr(y, ())
180 for k in in_:
181 if k in out_:
182 return 1
183 return 0
184
186 "partial relations after/before"
187 m = "after"
188 n = "before"
189 name = t2.__class__.__name__
190 if name in t1.attr(m, ()): return -1
191 elif name in t1.attr(n, ()): return 1
192 name = t1.__class__.__name__
193 if name in t2.attr(m, ()): return 1
194 elif name in t2.attr(n, ()): return -1
195 return 0
196
198 "extract the parallelization constraints from the tasks with different constraints"
199 keys = self.cstr_groups.keys()
200 max = len(keys)
201
202
203 for i in xrange(max):
204 t1 = self.cstr_groups[keys[i]][0]
205 for j in xrange(i + 1, max):
206 t2 = self.cstr_groups[keys[j]][0]
207
208
209 val = (self.compare_exts(t1, t2)
210 or self.compare_partial(t1, t2)
211 )
212 if val > 0:
213 self.set_order(keys[i], keys[j])
214 elif val < 0:
215 self.set_order(keys[j], keys[i])
216
217
218
219
221 "(NORMAL) next list of tasks that may be executed in parallel"
222
223 if not self.ready: self.prepare()
224
225
226 keys = self.cstr_groups.keys()
227
228 unconnected = []
229 remainder = []
230
231 for u in keys:
232 for k in self.cstr_order.values():
233 if u in k:
234 remainder.append(u)
235 break
236 else:
237 unconnected.append(u)
238
239
240
241 toreturn = []
242 for y in unconnected:
243 toreturn.extend(self.cstr_groups[y])
244
245
246 for y in unconnected:
247 try: self.cstr_order.__delitem__(y)
248 except KeyError: pass
249 self.cstr_groups.__delitem__(y)
250
251 if not toreturn and remainder:
252 raise Utils.WafError("circular order constraint detected %r" % remainder)
253
254
255 return toreturn
256
258 "(JOBCONTROL) returns the tasks that can run in parallel with the max amount of jobs"
259 if not self.ready: self.prepare()
260 if not self.temp_tasks: self.temp_tasks = self.tasks_in_parallel()
261 if not self.temp_tasks: return (None, None)
262
263 maxjobs = sys.maxint
264 ret = []
265 remaining = []
266 for t in self.temp_tasks:
267 m = getattr(t, "maxjobs", getattr(self.__class__, "maxjobs", sys.maxint))
268 if m > maxjobs:
269 remaining.append(t)
270 elif m < maxjobs:
271 remaining += ret
272 ret = [t]
273 maxjobs = m
274 else:
275 ret.append(t)
276 self.temp_tasks = remaining
277 return (maxjobs, ret)
278
280 """(MAXPARALLEL) returns all tasks in this group, but add the constraints on each task instance
281 as an optimization, it might be desirable to discard the tasks which do not have to run"""
282 if not self.ready: self.prepare()
283
284 if getattr(self, "done", None): return None
285
286 for p in self.cstr_order:
287 for v in self.cstr_order[p]:
288 for m in self.cstr_groups[p]:
289 for n in self.cstr_groups[v]:
290 n.set_run_after(m)
291 self.cstr_order = {}
292 self.cstr_groups = {}
293 self.done = 1
294 return self.tasks[:]
295
297 "store the task types that have a name ending in _task into a map (remember the existing task types)"
305
307 """Base class for all Waf tasks
308
309 The most important methods are (by usual order of call):
310 1 runnable_status: ask the task if it should be run, skipped, or if we have to ask later
311 2 __str__: string to display to the user
312 3 run: execute the task
313 4 post_run: after the task is run, update the cache about the task
314
315 This class should be seen as an interface, it provides the very minimum necessary for the scheduler
316 so it does not do much.
317
318 For illustration purposes, TaskBase instances try to execute self.fun (if provided)
319 """
320
321 __metaclass__ = store_task_type
322
323 color = "GREEN"
324 maxjobs = sys.maxint
325 classes = {}
326 stat = None
327
334
336 "used for debugging"
337 return '\n\t{task: %s %s}' % (self.__class__.__name__, str(getattr(self, "fun", "")))
338
340 "string to display to the user"
341 try: self.fun
342 except AttributeError: return self.__class__.__name__ + '\n'
343 else: return 'executing: %s\n' % self.fun.__name__
344
346 "RUN_ME SKIP_ME or ASK_LATER"
347 return RUN_ME
348
351
356