239 lines
7.1 KiB
Python
239 lines
7.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import absolute_import, print_function, unicode_literals
|
|
|
|
from glob import glob
|
|
from functools import reduce
|
|
|
|
import json
|
|
import jsone
|
|
import os
|
|
import sys
|
|
import requests
|
|
import slugid
|
|
import yaml
|
|
import subprocess
|
|
|
|
import networkx as nx
|
|
|
|
TASKS_ROOT = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])))
|
|
TASKCLUSTER_API_BASEURL = 'http://taskcluster/queue/v1/task/%(task_id)s'
|
|
|
|
def string_to_dict(sid, value):
|
|
parts = sid.split('.')
|
|
|
|
def pack(parts):
|
|
if len(parts) == 1:
|
|
return {parts[0]: value}
|
|
elif len(parts):
|
|
return {parts[0]: pack(parts[1:])}
|
|
return parts
|
|
|
|
return pack(parts)
|
|
|
|
def merge_dicts(*dicts):
|
|
if not reduce(lambda x, y: isinstance(y, dict) and x, dicts, True):
|
|
raise TypeError("Object in *dicts not of type dict")
|
|
if len(dicts) < 2:
|
|
raise ValueError("Requires 2 or more dict objects")
|
|
|
|
def merge(a, b):
|
|
for d in set(a.keys()).union(b.keys()):
|
|
if d in a and d in b:
|
|
if type(a[d]) == type(b[d]):
|
|
if not isinstance(a[d], dict):
|
|
ret = list({a[d], b[d]})
|
|
if len(ret) == 1: ret = ret[0]
|
|
yield (d, sorted(ret))
|
|
else:
|
|
yield (d, dict(merge(a[d], b[d])))
|
|
else:
|
|
raise TypeError("Conflicting key:value type assignment", type(a[d]), a[d], type(b[d]), b[d])
|
|
elif d in a:
|
|
yield (d, a[d])
|
|
elif d in b:
|
|
yield (d, b[d])
|
|
else:
|
|
raise KeyError
|
|
|
|
return reduce(lambda x, y: dict(merge(x, y)), dicts[1:], dicts[0])
|
|
|
|
def taskcluster_event_context():
|
|
das_context = {}
|
|
|
|
# Pre-filterting
|
|
for k in os.environ.keys():
|
|
if k == 'GITHUB_HEAD_USER':
|
|
os.environ['GITHUB_HEAD_USER_LOGIN'] = os.environ[k]
|
|
del os.environ['GITHUB_HEAD_USER']
|
|
|
|
for k in os.environ.keys():
|
|
if k == 'TASK_ID':
|
|
parts = string_to_dict('taskcluster.taskGroupId', os.environ[k])
|
|
das_context = merge_dicts(das_context, parts)
|
|
|
|
if k.startswith('GITHUB_'):
|
|
parts = string_to_dict(k.lower().replace('_', '.').replace('github', 'event'), os.environ[k])
|
|
das_context = merge_dicts(das_context, parts)
|
|
|
|
return das_context
|
|
|
|
def load_specific_contextFile(file):
|
|
specific_context = {}
|
|
|
|
try:
|
|
with open(os.path.join(TASKS_ROOT, file)) as src:
|
|
specific_context = yaml.safe_load(src)
|
|
|
|
if specific_context is None:
|
|
specific_context = {}
|
|
except FileNotFoundError:
|
|
specific_context = {}
|
|
|
|
return specific_context
|
|
|
|
def defaultValues_build_context():
|
|
return load_specific_contextFile('.build.yml')
|
|
|
|
def shared_context():
|
|
return load_specific_contextFile('.shared.yml')
|
|
|
|
def create_task_payload(build, base_context):
|
|
print('build', build)
|
|
build_type = os.path.splitext(os.path.basename(build))[0]
|
|
|
|
build_context = defaultValues_build_context()
|
|
with open(build) as src:
|
|
build_context['build'].update(yaml.safe_load(src)['build'])
|
|
|
|
# Be able to use what has been defined in base_context
|
|
# e.g., the {${event.head.branch}}
|
|
build_context = jsone.render(build_context, base_context)
|
|
template_context = {
|
|
'taskcluster': {
|
|
'taskId': as_slugid(build_type)
|
|
},
|
|
'build_type': build_type
|
|
}
|
|
|
|
with open(os.path.join(TASKS_ROOT, build_context['build']['template_file'])) as src:
|
|
template = yaml.safe_load(src)
|
|
|
|
contextes = merge_dicts({}, base_context, template_context, build_context)
|
|
for one_context in glob(os.path.join(TASKS_ROOT, '*.cyml')):
|
|
with open(one_context) as src:
|
|
contextes = merge_dicts(contextes, yaml.safe_load(src))
|
|
|
|
return jsone.render(template, contextes)
|
|
|
|
def send_task(t):
|
|
url = TASKCLUSTER_API_BASEURL % {'task_id': t['taskId']}
|
|
del t['taskId']
|
|
|
|
r = requests.put(url, json=t)
|
|
|
|
print(url, r.status_code)
|
|
if r.status_code != requests.codes.ok:
|
|
print(json.dumps(t, indent=2))
|
|
print(r.content)
|
|
print(json.loads(r.content.decode())['message'])
|
|
|
|
return r.status_code == requests.codes.ok
|
|
|
|
slugids = {}
|
|
def as_slugid(name):
|
|
if name not in slugids:
|
|
slugids[name] = slugid.nice().decode()
|
|
print('cache miss', name, slugids[name])
|
|
else:
|
|
print('cache hit', name, slugids[name])
|
|
return slugids[name]
|
|
|
|
def to_int(x):
|
|
return int(x)
|
|
|
|
def functions_context():
|
|
return {
|
|
'as_slugid': as_slugid,
|
|
'to_int': to_int
|
|
}
|
|
|
|
def is_dry_run():
|
|
return (len(sys.argv) > 1) and (sys.argv[1] == '--dry')
|
|
|
|
def should_run():
|
|
# Make a quick clone to fetch the last commit
|
|
try:
|
|
subprocess.check_call([
|
|
'git', 'clone', '--quiet', '-b', os.environ.get('GITHUB_HEAD_BRANCH'),
|
|
'--single-branch', os.environ.get('GITHUB_HEAD_REPO_URL'),
|
|
'--depth=1', '/tmp/ds-clone/'
|
|
], env={'GIT_LFS_SKIP_SMUDGE': '1'})
|
|
except subprocess.CalledProcessError as e:
|
|
print("Error while git cloning:", e, file=sys.stderr)
|
|
return False
|
|
|
|
try:
|
|
git_msg = subprocess.check_output([
|
|
'git', '--git-dir=/tmp/ds-clone/.git/',
|
|
'log', '--format=%b', '-n', '1',
|
|
os.environ.get('GITHUB_HEAD_SHA')
|
|
]).decode('utf-8').strip().upper()
|
|
except subprocess.CalledProcessError as e:
|
|
print("Error while git show:", e, file=sys.stderr)
|
|
return False
|
|
|
|
print('Commit message:', git_msg)
|
|
|
|
x_deepspeech = filter(lambda x: 'X-DEEPSPEECH:' in x, git_msg.split('\n'))
|
|
if len(list(filter(lambda x: 'NOBUILD' in x, x_deepspeech))) == 1:
|
|
print('Not running anything according to commit message')
|
|
return False
|
|
|
|
return True
|
|
|
|
if __name__ == '__main__':
|
|
if not is_dry_run():
|
|
# We might want to NOT run in some cases
|
|
if not should_run():
|
|
sys.exit(0)
|
|
|
|
base_context = taskcluster_event_context()
|
|
base_context = merge_dicts(base_context, functions_context())
|
|
base_context = merge_dicts(base_context, shared_context())
|
|
|
|
root_task = base_context['taskcluster']['taskGroupId']
|
|
|
|
tasks_graph = nx.DiGraph()
|
|
tasks = {}
|
|
|
|
for build in glob(os.path.join(TASKS_ROOT, '*.yml')):
|
|
t = create_task_payload(build, base_context)
|
|
|
|
# We allow template to produce completely empty output
|
|
if not t:
|
|
continue
|
|
|
|
if 'dependencies' in t and len(t['dependencies']) > 0:
|
|
for dep in t['dependencies']:
|
|
tasks_graph.add_edge(t['taskId'], dep)
|
|
else:
|
|
tasks_graph.add_edge(t['taskId'], root_task)
|
|
|
|
tasks[t['taskId']] = t
|
|
|
|
for task in nx.dfs_postorder_nodes(tasks_graph):
|
|
# root_task is the task group and also the task id that is already
|
|
# running, so we don't have to schedule that
|
|
if task == root_task:
|
|
continue
|
|
|
|
t = tasks[task]
|
|
if is_dry_run():
|
|
print(json.dumps(t, indent=2))
|
|
continue
|
|
|
|
p = send_task(t)
|
|
if not p:
|
|
sys.exit(1)
|