blob: f6e9788d7cf6c03a7bc184c77c2f9ec1fa085bc8 [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import contextlib
import errno
import os
import re
import shutil
import StringIO
import sys
import tempfile
import unittest
import common_util
import task_manager
_GOLDEN_GRAPHVIZ = """digraph graphname {
n0 [label="0: b", color=black, shape=ellipse];
n1 [label="1: a", color=black, shape=ellipse];
n2 [label="2: c", color=black, shape=ellipse];
n0 -> n2;
n1 -> n2;
n3 [label="3: d", color=black, shape=ellipse];
n2 -> n3;
n4 [label="4: f", color=black, shape=box];
n3 -> n4;
n5 [label="e", color=blue, shape=ellipse];
n5 -> n4;
}\n"""
@contextlib.contextmanager
def EatStdoutAndStderr():
"""Overrides sys.std{out,err} to intercept write calls."""
sys.stdout.flush()
sys.stderr.flush()
original_stdout = sys.stdout
original_stderr = sys.stderr
try:
sys.stdout = StringIO.StringIO()
sys.stderr = StringIO.StringIO()
yield
finally:
sys.stdout = original_stdout
sys.stderr = original_stderr
class TestException(Exception):
pass
class TaskManagerTestCase(unittest.TestCase):
def setUp(self):
self.output_directory = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.output_directory)
def OutputPath(self, file_path):
return os.path.join(self.output_directory, file_path)
def TouchOutputFile(self, file_path):
with open(self.OutputPath(file_path), 'w') as output:
output.write(file_path + '\n')
class TaskTest(TaskManagerTestCase):
def testTaskExecution(self):
def Recipe():
Recipe.counter += 1
Recipe.counter = 0
task = task_manager.Task('hello.json', 'what/ever/hello.json', [], Recipe)
self.assertFalse(task._is_done)
self.assertEqual(0, Recipe.counter)
task.Execute()
self.assertEqual(1, Recipe.counter)
task.Execute()
self.assertEqual(1, Recipe.counter)
def testTaskExecutionWithUnexecutedDeps(self):
def RecipeA():
self.fail()
def RecipeB():
RecipeB.counter += 1
RecipeB.counter = 0
a = task_manager.Task('hello.json', 'out/hello.json', [], RecipeA)
b = task_manager.Task('hello.json', 'out/hello.json', [a], RecipeB)
self.assertEqual(0, RecipeB.counter)
b.Execute()
self.assertEqual(1, RecipeB.counter)
class BuilderTest(TaskManagerTestCase):
def testRegisterTask(self):
builder = task_manager.Builder(self.output_directory, None)
@builder.RegisterTask('hello.txt')
def TaskA():
TaskA.executed = True
TaskA.executed = False
self.assertEqual(os.path.join(self.output_directory, 'hello.txt'),
TaskA.path)
self.assertFalse(TaskA.executed)
TaskA.Execute()
self.assertTrue(TaskA.executed)
def testRegisterDuplicateTask(self):
builder = task_manager.Builder(self.output_directory, None)
@builder.RegisterTask('hello.txt')
def TaskA():
pass
del TaskA # unused
with self.assertRaises(task_manager.TaskError):
@builder.RegisterTask('hello.txt')
def TaskB():
pass
del TaskB # unused
def testTaskMerging(self):
builder = task_manager.Builder(self.output_directory, None)
@builder.RegisterTask('hello.txt')
def TaskA():
pass
@builder.RegisterTask('hello.txt', merge=True)
def TaskB():
pass
self.assertEqual(TaskA, TaskB)
def testOutputSubdirectory(self):
builder = task_manager.Builder(self.output_directory, 'subdir')
@builder.RegisterTask('world.txt')
def TaskA():
pass
del TaskA # unused
self.assertIn('subdir/world.txt', builder._tasks)
self.assertNotIn('subdir/subdir/world.txt', builder._tasks)
self.assertNotIn('world.txt', builder._tasks)
@builder.RegisterTask('subdir/world.txt')
def TaskB():
pass
del TaskB # unused
self.assertIn('subdir/subdir/world.txt', builder._tasks)
self.assertNotIn('world.txt', builder._tasks)
class GenerateScenarioTest(TaskManagerTestCase):
def testParents(self):
builder = task_manager.Builder(self.output_directory, None)
@builder.RegisterTask('a')
def TaskA():
pass
@builder.RegisterTask('b', dependencies=[TaskA])
def TaskB():
pass
@builder.RegisterTask('c', dependencies=[TaskB])
def TaskC():
pass
scenario = task_manager.GenerateScenario([TaskA, TaskB, TaskC], set())
self.assertListEqual([TaskA, TaskB, TaskC], scenario)
scenario = task_manager.GenerateScenario([TaskB], set())
self.assertListEqual([TaskA, TaskB], scenario)
scenario = task_manager.GenerateScenario([TaskC], set())
self.assertListEqual([TaskA, TaskB, TaskC], scenario)
scenario = task_manager.GenerateScenario([TaskC, TaskB], set())
self.assertListEqual([TaskA, TaskB, TaskC], scenario)
def testFreezing(self):
builder = task_manager.Builder(self.output_directory, None)
@builder.RegisterTask('a')
def TaskA():
pass
@builder.RegisterTask('b', dependencies=[TaskA])
def TaskB():
pass
@builder.RegisterTask('c')
def TaskC():
pass
@builder.RegisterTask('d', dependencies=[TaskB, TaskC])
def TaskD():
pass
# assert no exception raised.
task_manager.GenerateScenario([TaskB], set([TaskC]))
with self.assertRaises(task_manager.TaskError):
task_manager.GenerateScenario([TaskD], set([TaskA]))
self.TouchOutputFile('a')
scenario = task_manager.GenerateScenario([TaskD], set([TaskA]))
self.assertListEqual([TaskB, TaskC, TaskD], scenario)
self.TouchOutputFile('b')
scenario = task_manager.GenerateScenario([TaskD], set([TaskB]))
self.assertListEqual([TaskC, TaskD], scenario)
def testCycleError(self):
builder = task_manager.Builder(self.output_directory, None)
@builder.RegisterTask('a')
def TaskA():
pass
@builder.RegisterTask('b', dependencies=[TaskA])
def TaskB():
pass
@builder.RegisterTask('c', dependencies=[TaskB])
def TaskC():
pass
@builder.RegisterTask('d', dependencies=[TaskC])
def TaskD():
pass
TaskA._dependencies.append(TaskC)
with self.assertRaises(task_manager.TaskError):
task_manager.GenerateScenario([TaskD], set())
def testCollisionError(self):
builder_a = task_manager.Builder(self.output_directory, None)
builder_b = task_manager.Builder(self.output_directory, None)
@builder_a.RegisterTask('a')
def TaskA():
pass
@builder_b.RegisterTask('a')
def TaskB():
pass
with self.assertRaises(task_manager.TaskError):
task_manager.GenerateScenario([TaskA, TaskB], set())
def testGenerateDependentSetPerTask(self):
builder = task_manager.Builder(self.output_directory, None)
@builder.RegisterTask('a')
def TaskA():
pass
@builder.RegisterTask('b')
def TaskB():
pass
@builder.RegisterTask('c', dependencies=[TaskA, TaskB])
def TaskC():
pass
@builder.RegisterTask('d', dependencies=[TaskA])
def TaskD():
pass
def RunSubTest(expected, scenario, task):
self.assertEqual(
expected, task_manager.GenerateDependentSetPerTask(scenario)[task])
RunSubTest(set([]), [TaskA], TaskA)
RunSubTest(set([]), [TaskA, TaskB], TaskA)
RunSubTest(set([TaskC]), [TaskA, TaskB, TaskC], TaskA)
RunSubTest(set([TaskC, TaskD]), [TaskA, TaskB, TaskC, TaskD], TaskA)
RunSubTest(set([]), [TaskA, TaskD], TaskD)
def testGraphVizOutput(self):
builder = task_manager.Builder(self.output_directory, None)
@builder.RegisterTask('a')
def TaskA():
pass
@builder.RegisterTask('b')
def TaskB():
pass
@builder.RegisterTask('c', dependencies=[TaskB, TaskA])
def TaskC():
pass
@builder.RegisterTask('d', dependencies=[TaskC])
def TaskD():
pass
@builder.RegisterTask('e')
def TaskE():
pass
@builder.RegisterTask('f', dependencies=[TaskD, TaskE])
def TaskF():
pass
self.TouchOutputFile('e')
scenario = task_manager.GenerateScenario([TaskF], set([TaskE]))
output = StringIO.StringIO()
task_manager.OutputGraphViz(scenario, [TaskF], output)
self.assertEqual(_GOLDEN_GRAPHVIZ, output.getvalue())
def testListResumingTasksToFreeze(self):
TaskManagerTestCase.setUp(self)
builder = task_manager.Builder(self.output_directory, None)
@builder.RegisterTask('a')
def TaskA():
pass
@builder.RegisterTask('b')
def TaskB():
pass
@builder.RegisterTask('c', dependencies=[TaskA, TaskB])
def TaskC():
pass
@builder.RegisterTask('d', dependencies=[TaskA])
def TaskD():
pass
@builder.RegisterTask('e', dependencies=[TaskC])
def TaskE():
pass
@builder.RegisterTask('f', dependencies=[TaskC])
def TaskF():
pass
for k in 'abcdef':
self.TouchOutputFile(k)
def RunSubTest(
final_tasks, initial_frozen_tasks, skipped_tasks, reference):
scenario = task_manager.GenerateScenario(
final_tasks, initial_frozen_tasks)
resume_frozen_tasks = task_manager.ListResumingTasksToFreeze(
scenario, final_tasks, skipped_tasks)
self.assertEqual(reference, resume_frozen_tasks)
new_scenario = \
task_manager.GenerateScenario(final_tasks, resume_frozen_tasks)
self.assertEqual(skipped_tasks, set(new_scenario))
RunSubTest([TaskA], set([]), set([TaskA]), [])
RunSubTest([TaskD], set([]), set([TaskA, TaskD]), [])
RunSubTest([TaskD], set([]), set([TaskD]), [TaskA])
RunSubTest([TaskE, TaskF], set([TaskA]), set([TaskB, TaskC, TaskE, TaskF]),
[TaskA])
RunSubTest([TaskE, TaskF], set([TaskA]), set([TaskC, TaskE, TaskF]),
[TaskA, TaskB])
RunSubTest([TaskE, TaskF], set([TaskA]), set([TaskE, TaskF]), [TaskC])
RunSubTest([TaskE, TaskF], set([TaskA]), set([TaskF]), [TaskE, TaskC])
RunSubTest([TaskD, TaskE, TaskF], set([]), set([TaskD, TaskF]),
[TaskA, TaskE, TaskC])
class CommandLineControlledExecutionTest(TaskManagerTestCase):
def setUp(self):
TaskManagerTestCase.setUp(self)
self.with_raise_exception_tasks = False
self.task_execution_history = None
def Execute(self, command_line_args):
self.task_execution_history = []
builder = task_manager.Builder(self.output_directory, None)
@builder.RegisterTask('a')
def TaskA():
self.task_execution_history.append(TaskA.name)
@builder.RegisterTask('b')
def TaskB():
self.task_execution_history.append(TaskB.name)
@builder.RegisterTask('c', dependencies=[TaskA, TaskB])
def TaskC():
self.task_execution_history.append(TaskC.name)
@builder.RegisterTask('d', dependencies=[TaskA])
def TaskD():
self.task_execution_history.append(TaskD.name)
@builder.RegisterTask('e', dependencies=[TaskC])
def TaskE():
self.task_execution_history.append(TaskE.name)
@builder.RegisterTask('raise_exception', dependencies=[TaskD])
def RaiseExceptionTask():
self.task_execution_history.append(RaiseExceptionTask.name)
raise TestException('Expected error.')
@builder.RegisterTask('raise_keyboard_interrupt', dependencies=[TaskD])
def RaiseKeyboardInterruptTask():
self.task_execution_history.append(RaiseKeyboardInterruptTask.name)
raise KeyboardInterrupt
@builder.RegisterTask('sudden_death', dependencies=[TaskD])
def SimulateKillTask():
self.task_execution_history.append(SimulateKillTask.name)
raise MemoryError
@builder.RegisterTask('timeout_error', dependencies=[TaskD])
def SimulateTimeoutError():
self.task_execution_history.append(SimulateTimeoutError.name)
raise common_util.TimeoutError
@builder.RegisterTask('errno_ENOSPC', dependencies=[TaskD])
def SimulateENOSPC():
self.task_execution_history.append(SimulateENOSPC.name)
raise IOError(errno.ENOSPC, os.strerror(errno.ENOSPC))
@builder.RegisterTask('errno_EPERM', dependencies=[TaskD])
def SimulateEPERM():
self.task_execution_history.append(SimulateEPERM.name)
raise IOError(errno.EPERM, os.strerror(errno.EPERM))
default_final_tasks = [TaskD, TaskE]
if self.with_raise_exception_tasks:
default_final_tasks.extend([
RaiseExceptionTask,
RaiseKeyboardInterruptTask,
SimulateKillTask,
SimulateTimeoutError,
SimulateENOSPC,
SimulateEPERM])
task_parser = task_manager.CommandLineParser()
parser = argparse.ArgumentParser(parents=[task_parser],
fromfile_prefix_chars=task_manager.FROMFILE_PREFIX_CHARS)
cmd = ['-o', self.output_directory]
cmd.extend([i for i in command_line_args])
args = parser.parse_args(cmd)
with EatStdoutAndStderr():
return task_manager.ExecuteWithCommandLine(args, default_final_tasks)
def ResumeFilePath(self):
return self.OutputPath(task_manager._TASK_RESUME_ARGUMENTS_FILE)
def ResumeCmd(self):
return task_manager.FROMFILE_PREFIX_CHARS + self.ResumeFilePath()
def testSimple(self):
self.assertEqual(0, self.Execute([]))
self.assertListEqual(['a', 'd', 'b', 'c', 'e'], self.task_execution_history)
def testDryRun(self):
self.assertEqual(0, self.Execute(['-d']))
self.assertListEqual([], self.task_execution_history)
def testRegex(self):
self.assertEqual(0, self.Execute(['-e', 'b', '-e', 'd']))
self.assertListEqual(['b', 'a', 'd'], self.task_execution_history)
self.assertEqual(1, self.Execute(['-e', r'\d']))
self.assertListEqual([], self.task_execution_history)
def testFreezing(self):
self.assertEqual(0, self.Execute(['-f', r'\d']))
self.assertListEqual(['a', 'd', 'b', 'c', 'e'], self.task_execution_history)
self.TouchOutputFile('c')
self.assertEqual(0, self.Execute(['-f', 'c']))
self.assertListEqual(['a', 'd', 'e'], self.task_execution_history)
def testDontFreezeUnreachableTasks(self):
self.TouchOutputFile('c')
self.assertEqual(0, self.Execute(['-e', 'e', '-f', 'c', '-f', 'd']))
def testAbortOnFirstError(self):
ARGS = ['-e', 'exception', '-e', r'^b$']
self.with_raise_exception_tasks = True
self.assertEqual(1, self.Execute(ARGS))
self.assertListEqual(
['a', 'd', 'raise_exception'], self.task_execution_history)
with open(self.ResumeFilePath()) as resume_input:
self.assertEqual('-f\n^d$', resume_input.read())
self.TouchOutputFile('d')
self.assertEqual(1, self.Execute(ARGS + [self.ResumeCmd()]))
self.assertListEqual(['raise_exception'], self.task_execution_history)
self.assertEqual(1, self.Execute(ARGS + [self.ResumeCmd()]))
self.assertListEqual(['raise_exception'], self.task_execution_history)
self.assertEqual(1, self.Execute(ARGS + [self.ResumeCmd(), '-k']))
self.assertListEqual(['raise_exception', 'b'], self.task_execution_history)
def testKeepGoing(self):
ARGS = ['-k', '-e', 'exception', '-e', r'^b$']
self.with_raise_exception_tasks = True
self.assertEqual(1, self.Execute(ARGS))
self.assertListEqual(
['a', 'd', 'raise_exception', 'b'], self.task_execution_history)
with open(self.ResumeFilePath()) as resume_input:
self.assertEqual('-f\n^d$\n-f\n^b$', resume_input.read())
self.TouchOutputFile('d')
self.TouchOutputFile('b')
self.assertEqual(1, self.Execute(ARGS + [self.ResumeCmd()]))
self.assertListEqual(['raise_exception'], self.task_execution_history)
self.assertEqual(1, self.Execute(ARGS + [self.ResumeCmd()]))
self.assertListEqual(['raise_exception'], self.task_execution_history)
def testKeyboardInterrupt(self):
self.with_raise_exception_tasks = True
with self.assertRaises(KeyboardInterrupt):
self.Execute(
['-k', '-e', 'raise_keyboard_interrupt', '-e', r'^b$'])
self.assertListEqual(['a', 'd', 'raise_keyboard_interrupt'],
self.task_execution_history)
with open(self.ResumeFilePath()) as resume_input:
self.assertEqual('-f\n^d$', resume_input.read())
def testResumeAfterSuddenDeath(self):
EXPECTED_RESUME_FILE_CONTENT = '-f\n^a$\n-f\n^d$\n'
ARGS = ['-k', '-e', 'sudden_death', '-e', r'^a$']
self.with_raise_exception_tasks = True
with self.assertRaises(MemoryError):
self.Execute(ARGS)
self.assertListEqual(
['a', 'd', 'sudden_death'], self.task_execution_history)
with open(self.ResumeFilePath()) as resume_input:
self.assertEqual(EXPECTED_RESUME_FILE_CONTENT, resume_input.read())
self.TouchOutputFile('a')
self.TouchOutputFile('d')
with self.assertRaises(MemoryError):
self.Execute(ARGS + [self.ResumeCmd()])
self.assertListEqual(['sudden_death'], self.task_execution_history)
with open(self.ResumeFilePath()) as resume_input:
self.assertEqual(EXPECTED_RESUME_FILE_CONTENT, resume_input.read())
with self.assertRaises(MemoryError):
self.Execute(ARGS + [self.ResumeCmd()])
self.assertListEqual(['sudden_death'], self.task_execution_history)
with open(self.ResumeFilePath()) as resume_input:
self.assertEqual(EXPECTED_RESUME_FILE_CONTENT, resume_input.read())
def testTimeoutError(self):
self.with_raise_exception_tasks = True
self.Execute(['-k', '-e', 'timeout_error', '-e', r'^b$'])
self.assertListEqual(['a', 'd', 'timeout_error', 'b'],
self.task_execution_history)
with open(self.ResumeFilePath()) as resume_input:
self.assertEqual('-f\n^d$\n-f\n^b$', resume_input.read())
def testENOSPC(self):
self.with_raise_exception_tasks = True
with self.assertRaises(IOError):
self.Execute(['-k', '-e', 'errno_ENOSPC', '-e', r'^a$'])
self.assertListEqual(
['a', 'd', 'errno_ENOSPC'], self.task_execution_history)
with open(self.ResumeFilePath()) as resume_input:
self.assertEqual('-f\n^a$\n-f\n^d$\n', resume_input.read())
def testEPERM(self):
self.with_raise_exception_tasks = True
self.Execute(['-k', '-e', 'errno_EPERM', '-e', r'^b$'])
self.assertListEqual(['a', 'd', 'errno_EPERM', 'b'],
self.task_execution_history)
with open(self.ResumeFilePath()) as resume_input:
self.assertEqual('-f\n^d$\n-f\n^b$', resume_input.read())
def testImpossibleTasks(self):
self.assertEqual(1, self.Execute(['-f', r'^a$', '-e', r'^c$']))
self.assertListEqual([], self.task_execution_history)
self.assertEqual(0, self.Execute(
['-f', r'^a$', '-e', r'^c$', '-e', r'^b$']))
self.assertListEqual(['b'], self.task_execution_history)
if __name__ == '__main__':
unittest.main()