Repository : http://git.fedorahosted.org/cgit/copr.git
On branch : master
commit d05eb48433da4dabbe67d58a7fdeb91c2177c1be Merge: 5fdd930 7ae3974 Author: Seth Vidal skvidal@fedoraproject.org Date: Mon Dec 10 17:02:09 2012 -0500
Merge branch 'skvidal-backend'
* skvidal-backend: (63 commits) rename file to README and explain where to look ...
TODO-backend | 19 ++ backend/__init__.py | 7 + backend/dispatcher.py | 300 +++++++++++++++++++++++ backend/errors.py | 12 + backend/mockremote.py | 645 +++++++++++++++++++++++++++++++++++++++++++++++++ copr-be.conf.example | 16 ++ copr-be.py | 263 ++++++++++++++++++++ 7 files changed, 1262 insertions(+), 0 deletions(-)
diff --git a/TODO-backend b/TODO-backend new file mode 100644 index 0000000..4cc5ca1 --- /dev/null +++ b/TODO-backend @@ -0,0 +1,19 @@ + +- change instance type by build request for more mem/procs/extend timeouts + - use extra-vars? + - need ansible 0.9? +- auto-timer/cleanup script for old instances that may have been orphaned +- prune out builders when we drop the number of them active +- LOADS of fixme and catching weird conditions +- make logging from mockremote more sane and coinsistent +- mock configs should be pushed to instances at creation time + - single url to repos, not mirrorlists +- consider making each worker return job to a completed queue so the primary + process can do other kinds of notification +- email notifications from backend? +- refactor mockremote/dispatcher.worker together? +- work on a way to find and cancel a specific build that's happening other than just killing the instance +- determine if it is properly checking the timeout from a dead instance +- maybe dump out the PID of the worker that is running so we know which one to kill? +- failure/success not being returned correctly. Should check for 'fail' in the directories and return based on + that. also anything lacking success is a failure. diff --git a/backend/__init__.py b/backend/__init__.py new file mode 100644 index 0000000..e25106e --- /dev/null +++ b/backend/__init__.py @@ -0,0 +1,7 @@ +# part of copr backend +# skvidal@fedoraproject.org - seth vidal +# (c) copyright Red Hat, Inc 2012 +# gplv2+ + +__version__ = "0.1" +__author__ = "Seth Vidal" diff --git a/backend/dispatcher.py b/backend/dispatcher.py new file mode 100644 index 0000000..9b0399d --- /dev/null +++ b/backend/dispatcher.py @@ -0,0 +1,300 @@ +#!/usr/bin/python -tt + + +import os +import sys +import multiprocessing +import time +import Queue +import json +import mockremote +from bunch import Bunch +import errors +import ansible +import ansible.playbook +import ansible.errors +from ansible import callbacks +import requests + + + + +class SilentPlaybookCallbacks(callbacks.PlaybookCallbacks): + ''' playbook callbacks - quietly! ''' + + def __init__(self, verbose=False): + + self.verbose = verbose + + def on_start(self): + callbacks.call_callback_module('playbook_on_start') + + def on_notify(self, host, handler): + callbacks.call_callback_module('playbook_on_notify', host, handler) + + def on_no_hosts_matched(self): + callbacks.call_callback_module('playbook_on_no_hosts_matched') + + def on_no_hosts_remaining(self): + callbacks.call_callback_module('playbook_on_no_hosts_remaining') + + def on_task_start(self, name, is_conditional): + callbacks.call_callback_module('playbook_on_task_start', name, is_conditional) + + def on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None): + result = None + print "***** VARS_PROMPT WILL NOT BE RUN IN THIS KIND OF PLAYBOOK *****" + callbacks.call_callback_module('playbook_on_vars_prompt', varname, private=private, prompt=prompt, encrypt=encrypt, confirm=confirm, salt_size=salt_size, salt=None) + return result + + def on_setup(self): + callbacks.call_callback_module('playbook_on_setup') + + def on_import_for_host(self, host, imported_file): + callbacks.call_callback_module('playbook_on_import_for_host', host, imported_file) + + def on_not_import_for_host(self, host, missing_file): + callbacks.call_callback_module('playbook_on_not_import_for_host', host, missing_file) + + def on_play_start(self, pattern): + callbacks.call_callback_module('playbook_on_play_start', pattern) + + def on_stats(self, stats): + callbacks.call_callback_module('playbook_on_stats', stats) + + +class WorkerCallback(object): + def __init__(self, logfile=None): + self.logfile = logfile + + def log(self, msg): + if not self.logfile: + return + + now = time.strftime('%F %T') + try: + open(self.logfile, 'a').write(str(now) + ': ' + msg + '\n') + except (IOError, OSError), e: + print >>sys.stderr, 'Could not write to logfile %s - %s' % (self.logfile, str(e)) + + +class Worker(multiprocessing.Process): + def __init__(self, opts, jobs, worker_num, ip=None, create=True, callback=None): + + # base class initialization + multiprocessing.Process.__init__(self, name="worker-builder") + + + # job management stuff + self.jobs = jobs + self.worker_num = worker_num + self.ip = ip + self.opts = opts + self.kill_received = False + self.callback = callback + self.create = create + if not self.callback: + self.logfile = self.opts.worker_logdir + '/worker-%s.log' % self.worker_num + self.callback = WorkerCallback(logfile = self.logfile) + + if ip: + self.callback.log('creating worker: %s' % ip) + else: + self.callback.log('creating worker: dynamic ip') + + def spawn_instance(self): + """call the spawn playbook to startup/provision a building instance""" + self.callback.log('spawning instance begin') + start = time.time() + + stats = callbacks.AggregateStats() + playbook_cb = SilentPlaybookCallbacks(verbose=False) + runner_cb = callbacks.DefaultRunnerCallbacks() + # fixme - extra_vars to include ip as a var if we need to specify ips + # also to include info for instance type to handle the memory requirements of builds + play = ansible.playbook.PlayBook(stats=stats, playbook=self.opts.spawn_playbook, + callbacks=playbook_cb, runner_callbacks=runner_cb, + remote_user='root') + + play.run() + self.callback.log('spawning instance end') + self.callback.log('Instance spawn/provision took %s sec' % (time.time() - start)) + + if self.ip: + return self.ip + + for i in play.SETUP_CACHE: + if i =='localhost': + continue + return i + + # if we get here we're in trouble + self.callback.log('No IP back from spawn_instance - dumping cache output') + self.callback.log(str(play.SETUP_CACHE)) + self.callback.log(str(play.stats.summarize('localhost'))) + self.callback.log('Test spawn_instance playbook manually') + + return None + + def terminate_instance(self,ip): + """call the terminate playbook to destroy the building instance""" + self.callback.log('terminate instance begin') + + stats = callbacks.AggregateStats() + playbook_cb = SilentPlaybookCallbacks(verbose=False) + runner_cb = callbacks.DefaultRunnerCallbacks() + play = ansible.playbook.PlayBook(host_list=[ip], stats=stats, playbook=self.opts.terminate_playbook, + callbacks=playbook_cb, runner_callbacks=runner_cb, + remote_user='root') + + play.run() + self.callback.log('terminate instance end') + + def parse_job(self, jobfile): + # read the json of the job in + # break out what we need return a bunch of the info we need + build = json.load(open(jobfile)) + jobdata = Bunch() + jobdata.pkgs = build['pkgs'].split(' ') + jobdata.repos = [r for r in build['repos'].split(' ') if r.strip() ] + jobdata.chroots = build['chroots'].split(' ') + jobdata.memory_reqs = build['memory_reqs'] + jobdata.timeout = build['timeout'] + jobdata.destdir = self.opts.destdir + '/' + build['copr']['owner']['name'] + '/' + build['copr']['name'] + '/' + jobdata.build_id = build['id'] + jobdata.results = self.opts.results_baseurl + '/' + build['copr']['owner']['name'] + '/' + build['copr']['name'] + '/' + jobdata.copr_id = build['copr']['id'] + jobdata.user_id = build['user_id'] + return jobdata + + # maybe we move this to the callback? + def post_to_frontend(self, data): + """send data to frontend""" + + headers = {'content-type': 'application/json'} + url='%s/update_builds/' % self.opts.frontend_url + auth=('user', self.opts.frontend_auth) + + msg = None + try: + r = requests.post(url, data=json.dumps(data), auth=auth, + headers=headers) + if r.status_code != 200: + msg = 'Failed to submit to frontend: %s: %s' % (r.status_code, r.text) + except requests.RequestException, e: + msg = 'Post request failed: %s' % e + + if msg: + self.callback.log(msg) + return False + + return True + + # maybe we move this to the callback? + def mark_started(self, job): + + build = {'id':job.build_id, + 'started_on': job.started_on, + 'results': job.results, + } + data = {'builds':[build]} + + if not self.post_to_frontend(data): + raise errors.CoprWorkerError, "Could not communicate to front end to submit status info" + + # maybe we move this to the callback? + def return_results(self, job): + self.callback.log('%s status %s. Took %s seconds' % (job.build_id, job.status, job.ended_on - job.started_on)) + + build = {'id':job.build_id, + 'ended_on': job.ended_on, + 'status': job.status, + } + data = {'builds':[build]} + + if not self.post_to_frontend(data): + raise errors.CoprWorkerError, "Could not communicate to front end to submit results" + + os.unlink(job.jobfile) + + def run(self): + # worker should startup and check if it can function + # for each job it takes from the jobs queue + # run opts.setup_playbook to create the instance + # do the build (mockremote) + # terminate the instance + + while not self.kill_received: + try: + jobfile = self.jobs.get() + except Queue.Empty: + break + + # parse the job json into our info + job = self.parse_job(jobfile) + + # FIXME + # this is our best place to sanity check the job before starting + # up any longer process + + job.jobfile = jobfile + + # spin up our build instance + if self.create: + try: + ip = self.spawn_instance() + if not ip: + raise errors.CoprWorkerError, "No IP found from creating instance" + + except ansible.errors.AnsibleError, e: + self.callback.log('failure to setup instance: %s' % e) + raise + + status = 1 + job.started_on = time.time() + self.mark_started(job) + + for chroot in job.chroots: + + chroot_destdir = job.destdir + '/' + chroot + # setup our target dir locally + if not os.path.exists(chroot_destdir): + try: + os.makedirs(chroot_destdir) + except (OSError, IOError), e: + msg = "Could not make results dir for job: %s - %s" % (chroot_destdir, str(e)) + self.callback.log(msg) + status = 0 + continue + + # FIXME + # need a plugin hook or some mechanism to check random + # info about the pkgs + # this should use ansible to download the pkg on the remote system + # and run a series of checks on the package before we + # start the build - most importantly license checks. + + + self.callback.log('Starting build: id=%r builder=%r timeout=%r destdir=%r chroot=%r repos=%r' % (job.build_id,ip, job.timeout, job.destdir, chroot, str(job.repos))) + self.callback.log('building pkgs: %s' % ' '.join(job.pkgs)) + try: + chrootlogfile = chroot_destdir + '/mockremote.log' + mr = mockremote.MockRemote(builder=ip, timeout=job.timeout, + destdir=job.destdir, chroot=chroot, cont=True, recurse=True, + repos=job.repos, + callback=mockremote.CliLogCallBack(quiet=True,logfn=chrootlogfile)) + mr.build_pkgs(job.pkgs) + except mockremote.MockRemoteError, e: + # record and break + self.callback.log('%s - %s' % (ip, e)) + status = 0 # failure + self.callback.log('Finished build: builder=%r timeout=%r destdir=%r chroot=%r repos=%r' % (ip, job.timeout, job.destdir, chroot, str(job.repos))) + + job.ended_on = time.time() + job.status = status + self.return_results(job) + self.callback.log('worker finished build: %s' % ip) + # clean up the instance + if self.create: + self.terminate_instance(ip) + diff --git a/backend/errors.py b/backend/errors.py new file mode 100644 index 0000000..ae8ac34 --- /dev/null +++ b/backend/errors.py @@ -0,0 +1,12 @@ +# copr error/exceptions +class CoprBackendError(Exception): + + def __init__(self, msg): + self.msg = msg + + def __str__(self): + return self.msg + +class CoprWorkerError(CoprBackendError): + pass + diff --git a/backend/mockremote.py b/backend/mockremote.py new file mode 100755 index 0000000..2c641c8 --- /dev/null +++ b/backend/mockremote.py @@ -0,0 +1,645 @@ +#!/usr/bin/python -tt +# by skvidal +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. +# copyright 2012 Red Hat, Inc. + + +# take list of pkgs +# take single hostname +# send 1 pkg at a time to host +# build in remote w/mockchain +# rsync results back +# repeat +# take args from mockchain (more or less) + + +import os +import sys +import subprocess + +import ansible.runner +import optparse +from operator import methodcaller +import time +import socket +import traceback + +# where we should execute mockchain from on the remote +mockchain='/usr/bin/mockchain' +# rsync path +rsync='/usr/bin/rsync' + +DEF_REMOTE_BASEDIR='/var/tmp' +DEF_TIMEOUT=3600 +DEF_REPOS = [] +DEF_CHROOT= None +DEF_USER = 'mockbuilder' +DEF_DESTDIR = os.getcwd() + +class SortedOptParser(optparse.OptionParser): + '''Optparser which sorts the options by opt before outputting --help''' + def format_help(self, formatter=None): + self.option_list.sort(key=methodcaller('get_opt_string')) + return optparse.OptionParser.format_help(self, formatter=None) + + +def createrepo(path): + if os.path.exists(path + '/repodata/repomd.xml'): + comm = ['/usr/bin/createrepo', '--update', path] + else: + comm = ['/usr/bin/createrepo', path] + cmd = subprocess.Popen(comm, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = cmd.communicate() + return cmd.returncode, out, err + +def read_list_from_file(fn): + lst = [] + f = open(fn, 'r') + for line in f.readlines(): + line = line.replace('\n','') + line = line.strip() + if line.startswith('#'): + continue + lst.append(line) + + return lst + +def log(lf, msg): + if lf: + now = time.time() + try: + open(lf, 'a').write(str(now) + ':' + msg + '\n') + except (IOError, OSError), e: + print 'Could not write to logfile %s - %s' % (lf, str(e)) + print msg + +def get_ans_results(results, hostname): + if hostname in results['dark']: + return results['dark'][hostname] + if hostname in results['contacted']: + return results['contacted'][hostname] + + return {} + +def _create_ans_conn(hostname, username, timeout): + ans_conn = ansible.runner.Runner(remote_user=username, + host_list=[hostname], pattern=hostname, forks=1, + timeout=timeout) + return ans_conn + +def check_for_ans_error(results, hostname, err_codes=[], success_codes=[0], + return_on_error=['stdout', 'stderr']): + # returns True or False + dict + # dict includes 'msg' + # may include 'rc', 'stderr', 'stdout' and any other + # requested result codes + err_results = {} + + if 'dark' in results and hostname in results['dark']: + err_results['msg'] = "Error: Could not contact/connect to %s." % hostname + return (True, err_results) + + error = False + + if err_codes or success_codes: + if hostname in results['contacted']: + if 'rc' in results['contacted'][hostname]: + rc = int(results['contacted'][hostname]['rc']) + err_results['rc'] = rc + # check for err codes first + if rc in err_codes: + error = True + err_results['msg'] = 'rc %s matched err_codes' % rc + elif rc not in success_codes: + error = True + err_results['msg'] = 'rc %s not in success_codes' % rc + elif 'failed' in results['contacted'][hostname] and results['contacted'][hostname]['failed']: + error = True + err_results['msg'] = 'results included failed as true' + + if error: + for item in return_on_error: + if item in results['contacted'][hostname]: + err_results[item] = results['contacted'][hostname][item] + + return error, err_results + + +class MockRemoteError(Exception): + + def __init__(self, msg): + self.msg = msg + + def __str__(self): + return self.msg + +class BuilderError(MockRemoteError): + pass + +class DefaultCallBack(object): + def __init__(self, **kwargs): + self.quiet = kwargs.get('quiet', False) + self.logfn = kwargs.get('logfn', None) + + def start_build(self, pkg): + pass + + def end_build(self, pkg): + pass + + def start_download(self, pkg): + pass + + def end_download(self, pkg): + pass + + def error(self, msg): + self.log("Error: %s" % msg) + + def log(self, msg): + if not self.quiet: + print msg + +class CliLogCallBack(DefaultCallBack): + def __init__(self, **kwargs): + DefaultCallBack.__init__(self, **kwargs) + + def start_build(self, pkg): + msg = "Start build: %s" % pkg + self.log(msg) + + + def end_build(self, pkg): + msg = "End Build: %s" % pkg + self.log(msg) + + def start_download(self, pkg): + msg = "Start retrieve results for: %s" % pkg + self.log(msg) + + def end_download(self, pkg): + msg = "End retrieve results for: %s" % pkg + self.log(msg) + + def error(self, msg): + self.log("Error: %s" % msg) + + def log(self, msg): + if self.logfn: + now = time.time() + try: + open(self.logfn, 'a').write(str(now) + ':' + msg + '\n') + except (IOError, OSError), e: + print >>sys.stderr, 'Could not write to logfile %s - %s' % (self.lf, str(e)) + if not self.quiet: + print msg + +class Builder(object): + def __init__(self, hostname, username, timeout, mockremote): + self.hostname = hostname + self.username = username + self.timeout = timeout + self.chroot = mockremote.chroot + self.repos = mockremote.repos + self.mockremote = mockremote + self.checked = False + self._tempdir = None + # check out the host - make sure it can build/be contacted/etc + self.check() + # if we're at this point we've connected and done stuff on the host + self.conn = _create_ans_conn(self.hostname, self.username, self.timeout) + + @property + def remote_build_dir(self): + return self.tempdir + '/build/' + + @property + def tempdir(self): + if self.mockremote.remote_tempdir: + return self.mockremote.remote_tempdir + + if self._tempdir: + return self._tempdir + + cmd='/bin/mktemp -d %s/%s-XXXXX' % (self.mockremote.remote_basedir, 'mockremote') + self.conn.module_name="shell" + self.conn.module_args = str(cmd) + results = self.conn.run() + tempdir = None + for hn, resdict in results['contacted'].items(): + tempdir = resdict['stdout'] + + # if still nothing then we've broken + if not tempdir: + raise BuilderError('Could not make tmpdir on %s' % self.hostname) + + cmd = "/bin/chmod 755 %s" % tempdir + self.conn.module_args = str(cmd) + self.conn.run() + self._tempdir = tempdir + + return self._tempdir + + @tempdir.setter + def tempdir(self, value): + self._tempdir = value + + def _get_remote_pkg_dir(self, pkg): + # the pkg will build into a dir by mockchain named: + # $tempdir/build/results/$chroot/$packagename + s_pkg = os.path.basename(pkg) + pdn = s_pkg.replace('.src.rpm', '') + remote_pkg_dir = self.remote_build_dir + '/results/' + self.chroot + '/' + pdn + return remote_pkg_dir + + def build(self, pkg): + + # build the pkg passed in + # add pkg to various lists + # check for success/failure of build + # return success/failure,stdout,stderr of build command + # returns success_bool, out, err + + success = False + + # check if pkg is local or http + dest = None + if os.path.exists(pkg): + dest = self.tempdir + '/' + os.path.basename(pkg) + self.conn.module_name="copy" + margs = 'src=%s dest=%s' % (pkg, dest) + self.conn.module_args = str(margs) + self.mockremote.callback.log("Sending %s to %s to build" % (os.path.basename(pkg), self.hostname)) + + # FIXME should probably check this but <shrug> + self.conn.run() + else: + dest = pkg + + # construct the mockchain command + buildcmd = '%s -r %s -l %s ' % (mockchain, self.chroot, self.remote_build_dir) + for r in self.repos: + buildcmd += '-a %s ' % r + + buildcmd += dest + + #print ' Running %s on %s' % (buildcmd, hostname) + # run the mockchain command async + # this runs it sync - FIXME + self.conn.module_name="shell" + self.conn.module_args = str(buildcmd) + results = self.conn.run() + + is_err, err_results = check_for_ans_error(results, self.hostname, success_codes=[0], + return_on_error=['stdout', 'stderr']) + if is_err: + return success, err_results.get('stdout', ''), err_results.get('stderr', '') + + # we know the command ended successfully but not if the pkg built successfully + myresults = get_ans_results(results, self.hostname) + out = myresults.get('stdout', '') + err = myresults.get('stderr', '') + + successfile = self._get_remote_pkg_dir(pkg) + '/success' + testcmd = '/usr/bin/test -f %s' % successfile + self.conn.module_args = str(testcmd) + results = self.conn.run() + is_err, err_results = check_for_ans_error(results, self.hostname, success_codes=[0]) + if not is_err: + success = True + + return success, out, err + + def download(self, pkg, destdir): + # download the pkg to destdir using rsync + ssh + # return success/failure, stdout, stderr + + success = False + rpd = self._get_remote_pkg_dir(pkg) + destdir = "'" + destdir.replace("'", "'\''") + "'" # make spaces work w/our rsync command below :( + # build rsync command line from the above + remote_src = '%s@%s:%s' % (self.username, self.hostname, rpd) + ssh_opts = "'ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no'" + command = "%s -avH -e %s %s %s/" % (rsync, ssh_opts, remote_src, destdir) + cmd = subprocess.Popen(command, shell=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # rsync results into opts.destdir + out, err = cmd.communicate() + if cmd.returncode: + success = False + else: + success = True + + return success, out, err + + def check(self): + # do check of host + # set checked if successful + # return success/failure, errorlist + + if self.checked: + return True, [] + + errors = [] + + try: + socket.gethostbyname(self.hostname) + except socket.gaierror: + raise BuilderError('%s could not be resolved' % self.hostname) + + # connect as user + ans = ansible.runner.Runner(host_list=[self.hostname], pattern='*', + remote_user=self.username, forks=1, timeout=20) + ans.module_name = "shell" + ans.module_args = str("/bin/rpm -q mock rsync") + res = ans.run() + # check for mock/rsync from results + is_err, err_results = check_for_ans_error(res, self.hostname, success_codes=[0]) + if is_err: + if 'rc' in err_results: + errors.append('Warning: %s does not have mock or rsync installed' % self.hostname) + else: + errors.append(err_results['msg']) + + + # test for path existence for mockchain and chroot config for this chroot + ans.module_name = "shell" + ans.module_args = str("/usr/bin/test -f %s && /usr/bin/test -f /etc/mock/%s.cfg" % (mockchain, self.chroot)) + res = ans.run() + + is_err, err_results = check_for_ans_error(res, self.hostname, success_codes=[0]) + if is_err: + if 'rc' in err_results: + errors.append('Warning: %s lacks mockchain or the chroot %s' % (self.hostname, self.chroot)) + else: + errors.append(err_results['msg']) + + if not errors: + self.checked = True + else: + msg = '\n'.join(errors) + raise BuilderError(msg) + + +class MockRemote(object): + def __init__(self, builder=None, user=DEF_USER, timeout=DEF_TIMEOUT, + destdir=DEF_DESTDIR, chroot=DEF_CHROOT, cont=False, recurse=False, + repos=DEF_REPOS, callback=None, + remote_basedir=DEF_REMOTE_BASEDIR, remote_tempdir=None): + + self.destdir = destdir + self.chroot = chroot + self.repos = repos + self.cont = cont + self.recurse = recurse + self.callback = callback + self.remote_basedir = remote_basedir + self.remote_tempdir = remote_tempdir + + if not self.callback: + self.callback = DefaultCallBack() + + self.callback.log("Setting up builder: %s" % builder) + self.builder = Builder(builder, user, timeout, self) + + if not self.chroot: + raise MockRemoteError("No chroot specified!") + + + self.failed = [] + self.finished = [] + self.pkg_list = [] + + + def _get_pkg_destpath(self, pkg): + s_pkg = os.path.basename(pkg) + pdn = s_pkg.replace('.src.rpm', '') + resdir = '%s/%s/%s' % (self.destdir, self.chroot, pdn) + resdir = os.path.normpath(resdir) + return resdir + + def build_pkgs(self, pkgs=None): + + if not pkgs: + pkgs = self.pkg_list + + built_pkgs = [] + downloaded_pkgs = {} + + try_again = True + to_be_built = pkgs + while try_again: + self.failed = [] + just_built = [] + for pkg in to_be_built: + if pkg in just_built: + self.callback.log("skipping duplicate pkg in this list: %s" % pkg) + continue + else: + just_built.append(pkg) + + p_path = self._get_pkg_destpath(pkg) + + # check the destdir to see if these pkgs need to be built + if os.path.exists(p_path): + if os.path.exists(p_path + '/success'): + self.callback.log("Skipping already built pkg %s" % os.path.basename(pkg)) + continue + # if we're asking to build it and it is marked as fail - nuke + # the failure and try rebuilding it + elif os.path.exists(p_path + '/fail'): + os.unlink(p_path + '/fail') + + # off to the builder object + # building + self.callback.start_build(pkg) + b_status, b_out, b_err = self.builder.build(pkg) + self.callback.end_build(pkg) + + # downloading + self.callback.start_download(pkg) + # mockchain makes things with the chroot appended - so suck down + # that pkg subdir from w/i that location + d_ret, d_out, d_err = self.builder.download(pkg, self.destdir + '/' + self.chroot) + if not d_ret: + msg = "Failure to download %s: %s" % (pkg, d_out + d_err) + if not self.cont: + raise MockRemoteError, msg + self.callback.error(msg) + + self.callback.end_download(pkg) + # write out whatever came from the builder call into the destdir/chroot + if not os.path.exists(self.destdir + '/' + self.chroot): + os.makedirs(self.destdir + '/' + self.chroot) + r_log = open(self.destdir + '/' + self.chroot + '/mockchain.log', 'a') + r_log.write('%s\n' % pkg) + r_log.write(b_out) + if b_err: + r_log.write('\nstderr\n') + r_log.write(b_err) + r_log.close() + + + # checking where to stick stuff + if not b_status: + if self.recurse: + self.failed.append(pkg) + self.callback.error("Error building %s, will try again" % os.path.basename(pkg)) + else: + msg = "Error building %s\nSee logs/resultsin %s" % (os.path.basename(pkg), self.destdir) + if not self.cont: + raise MockRemoteError, msg + self.callback.error(msg) + + else: + self.callback.log("Success building %s" % os.path.basename(pkg)) + built_pkgs.append(pkg) + # createrepo with the new pkgs + rc, out, err = createrepo(self.destdir) + if err.strip(): + self.callback.error("Error making local repo: %s" % self.destdir) + self.callback.error("%s" % err) + #FIXME - maybe clean up .repodata and .olddata here? + + if self.failed: + if len(self.failed) != len(to_be_built): + to_be_built = self.failed + try_again = True + self.callback.log('Trying to rebuild %s failed pkgs' % len(self.failed)) + else: + self.callback.log("Tried twice - following pkgs could not be successfully built:") + for pkg in self.failed: + msg = pkg + if pkg in downloaded_pkgs: + msg = downloaded_pkgs[pkg] + self.callback.log(msg) + + try_again = False + else: + try_again = False + + + +def parse_args(args): + + parser = SortedOptParser("mockremote -b hostname -u user -r chroot pkg pkg pkg") + parser.add_option('-r', '--root', default=DEF_CHROOT, dest='chroot', + help="chroot config name/base to use in the mock build") + parser.add_option('-c', '--continue', default=False, action='store_true', + dest='cont', + help="if a pkg fails to build, continue to the next one") + parser.add_option('-a','--addrepo', default=DEF_REPOS, action='append', + dest='repos', + help="add these repo baseurls to the chroot's yum config") + parser.add_option('--recurse', default=False, action='store_true', + help="if more than one pkg and it fails to build, try to build the rest and come back to it") + parser.add_option('--log', default=None, dest='logfile', + help="log to the file named by this option, defaults to not logging") + parser.add_option("-b", "--builder", dest='builder', default=None, + help="builder to use") + parser.add_option("-u", dest="user", default=DEF_USER, + help="user to run as/connect as on builder systems") + parser.add_option("-t", "--timeout", dest="timeout", type="int", + default=DEF_TIMEOUT, help="maximum time in seconds a build can take to run") + parser.add_option("--destdir", dest="destdir", default=DEF_DESTDIR, + help="place to download all the results/packages") + parser.add_option("--packages", dest="packages_file", default=None, + help="file to read list of packages from") + parser.add_option("-q","--quiet", dest="quiet", default=False, action="store_true", + help="output very little to the terminal") + + opts,args = parser.parse_args(args) + + if not opts.builder: + print "Must specify a system to build on" + sys.exit(1) + + if opts.packages_file and os.path.exists(opts.packages_file): + args.extend(read_list_from_file(opts.packages_file)) + + #args = list(set(args)) # poor man's 'unique' - this also changes the order + # :( + + if not args: + print "Must specify at least one pkg to build" + sys.exit(1) + + if not opts.chroot: + print "Must specify a mock chroot" + sys.exit(1) + + for url in opts.repos: + if not (url.startswith('http') or url.startswith('file://')): + print "Only http[s] or file urls allowed for repos" + sys.exit(1) + + return opts, args + + +#FIXME +# play with createrepo run at the end of each build +# need to output the things that actually worked :) + + +def main(args): + + # parse args + opts,pkgs = parse_args(args) + + if not os.path.exists(opts.destdir): + os.makedirs(opts.destdir) + + try: + # setup our callback + callback = CliLogCallBack(logfn=opts.logfile, quiet=opts.quiet) + # our mockremote instance + mr = MockRemote(builder=opts.builder, user=opts.user, + timeout=opts.timeout, destdir=opts.destdir, chroot=opts.chroot, + cont=opts.cont, recurse=opts.recurse, repos=opts.repos, + callback=callback) + + # FIXMES + # things to think about doing: + # output the remote tempdir when you start up + # output the number of pkgs + # output where you're writing things to + # consider option to sync over destdir to the remote system to use + # as a local repo for the build + # + + if not opts.quiet: + print "Building %s pkgs" % len(pkgs) + + mr.build_pkgs(pkgs) + + if not opts.quiet: + print "Output written to: %s" % mr.destdir + + except MockRemoteError, e: + print >>sys.stderr, "Error on build:" + print >>sys.stderr, str(e) + return + + +if __name__ == '__main__': + try: + main(sys.argv[1:]) + except Exception, e: + + print "ERROR: %s - %s" % (str(type(e)), str(e)) + traceback.print_exc() + sys.exit(1) diff --git a/copr-be.conf.example b/copr-be.conf.example new file mode 100644 index 0000000..ffeb235 --- /dev/null +++ b/copr-be.conf.example @@ -0,0 +1,16 @@ +[backend] +results_baseurl=http://copr-be.cloud.fedoraproject.org/results +frontend_url=http://copr-fe.cloud.fedoraproject.org/backend +frontend_auth=backend_password_from_fe_config +spawn_playbook=/srv/copr-work/provision/builderpb.yml +terminate_playbook=/srv/copr-work/provision/terminatepb.yml +jobsdir=/srv/copr-work/jobs +destdir=/srv/copr-repo/results +sleeptime=30 +num_workers=5 +logfile=/srv/copr-work/logs/copr.log +worker_logdir=/srv/copr-work/logs/workers/ + + +[builder] +timeout=3600 diff --git a/copr-be.py b/copr-be.py new file mode 100644 index 0000000..6631ac4 --- /dev/null +++ b/copr-be.py @@ -0,0 +1,263 @@ +#!/usr/bin/python -tt + + +import sys +import os +import glob +import time +import multiprocessing +from backend.dispatcher import Worker +from backend import errors +from bunch import Bunch +import ConfigParser +import optparse +import json +import requests + +def _get_conf(cp, section, option, default): + """to make returning items from config parser less irritating""" + if cp.has_section(section) and cp.has_option(section,option): + return cp.get(section, option) + return default + + +class CoprBackend(object): + def __init__(self, config_file=None, ext_opts=None): + # read in config file + # put all the config items into a single self.opts bunch + + if not config_file: + raise errors.CoprBackendError, "Must specify config_file" + + self.config_file = config_file + self.ext_opts = ext_opts # to stow our cli options for read_conf() + self.opts = self.read_conf() + + logdir = os.path.dirname(self.opts.logfile) + if not os.path.exists(logdir): + os.makedirs(logdir, mode=0750) + + if not os.path.exists(self.opts.destdir): + os.makedirs(self.opts.destdir, mode=0755) + + # setup a log file to write to + self.logfile = self.opts.logfile + self.log("Starting up new copr-be instance") + + + if not os.path.exists(self.opts.worker_logdir): + os.makedirs(self.opts.worker_logdir, mode=0750) + + self.jobs = multiprocessing.Queue() + self.workers = [] + self.added_jobs = [] + + + def read_conf(self): + "read in config file - return Bunch of config data" + opts = Bunch() + cp = ConfigParser.ConfigParser() + try: + cp.read(self.config_file) + opts.results_baseurl = _get_conf(cp,'backend', 'results_baseurl', 'http://copr') + opts.frontend_url = _get_conf(cp, 'backend', 'frontend_url', 'http://coprs/rest/api') + opts.frontend_auth = _get_conf(cp,'backend', 'frontend_auth', 'PASSWORDHERE') + opts.spawn_playbook = _get_conf(cp,'backend','spawn_playbook', '/etc/copr/builder_playbook.yml') + opts.terminate_playbook = _get_conf(cp,'backend','terminate_playbook', '/etc/copr/terminate_playbook.yml') + opts.jobsdir = _get_conf(cp, 'backend', 'jobsdir', None) + opts.destdir = _get_conf(cp, 'backend', 'destdir', None) + opts.daemonize = _get_conf(cp, 'backend', 'daemonize', True) + opts.exit_on_worker = _get_conf(cp, 'backend', 'exit_on_worker', False) + opts.sleeptime = int(_get_conf(cp, 'backend', 'sleeptime', 10)) + opts.num_workers = int(_get_conf(cp, 'backend', 'num_workers', 8)) + opts.timeout = int(_get_conf(cp, 'builder', 'timeout', 1800)) + opts.logfile = _get_conf(cp, 'backend', 'logfile', '/var/log/copr/backend.log') + opts.worker_logdir = _get_conf(cp, 'backend', 'worker_logdir', '/var/log/copr/worker/') + # thoughts for later + # ssh key for connecting to builders? + # cloud key stuff? + # + except ConfigParser.Error, e: + raise errors.CoprBackendError, 'Error parsing config file: %s: %s' % (self.config_file, e) + + + if not opts.jobsdir or not opts.destdir: + raise errors.CoprBackendError, "Incomplete Config - must specify jobsdir and destdir in configuration" + + if self.ext_opts: + for v in self.ext_opts: + setattr(opts, v, self.ext_opts.get(v)) + return opts + + + def log(self, msg): + now = time.strftime('%F %T') + output = str(now) + ': ' + msg + if not self.opts.daemonize: + print output + + try: + open(self.logfile, 'a').write(output + '\n') + except (IOError, OSError), e: + print >>sys.stderr, 'Could not write to logfile %s - %s' % (self.logfile, str(e)) + + + def fetch_jobs(self): + self.log('fetching jobs') + try: + r = requests.get('%s/waiting_builds/' % self.opts.frontend_url) # auth stuff here? maybe/maybenot + except requests.RequestException, e: + self.log('Error retrieving jobs from %s: %s' % (self.opts.frontend_url, e)) + else: + r_json = json.loads(r.content) # using old requests on el6 :( + if 'builds' in r_json: + self.log('%s jobs returned' % len(r_json['builds'])) + count = 0 + for b in r_json['builds']: + if 'id' in b: + jobfile = self.opts.jobsdir + '/%s.json' % b['id'] + if not os.path.exists(jobfile) and b['id'] not in self.added_jobs: + count += 1 + open(jobfile, 'w').write(json.dumps(b)) + self.log('Wrote job: %s' % b['id']) + self.log('New jobs: %s' % count) + + def run(self): + + abort = False + while not abort: + self.fetch_jobs() + for f in sorted(glob.glob(self.opts.jobsdir + '/*.json')): + n = os.path.basename(f).replace('.json', '') + if n not in self.added_jobs: + self.jobs.put(f) + self.added_jobs.append(n) + self.log('adding to work queue id %s' % n) + + # re-read config into opts + self.opts = self.read_conf() + + if self.jobs.qsize(): + self.log("# jobs in queue: %s" % self.jobs.qsize()) + # this handles starting/growing the number of workers + if len(self.workers) < self.opts.num_workers: + self.log("Spinning up more workers for jobs") + for i in range(self.opts.num_workers - len(self.workers)): + worker_num = len(self.workers) + 1 + w = Worker(self.opts, self.jobs, worker_num) + self.workers.append(w) + w.start() + self.log("Finished starting worker processes") + # FIXME - prune out workers + #if len(self.workers) > self.opts.num_workers: + # killnum = len(self.workers) - self.opts.num_workers + # for w in self.workers[:killnum]: + # #insert a poison pill? Kill after something? I dunno. + # FIXME - if a worker bombs out - we need to check them + # and startup a new one if it happens + # check for dead workers and abort + for w in self.workers: + if not w.is_alive(): + self.log('Worker %d died unexpectedly' % w.worker_num) + if self.opts.exit_on_worker: + raise errors.CoprBackendError, "Worker died unexpectedly, exiting" + else: + self.workers.remove(w) # it is not working anymore + w.terminate() # kill it with a fire + + time.sleep(self.opts.sleeptime) + +# lifted from certmaster +def daemonize(pidfile=None): + """ + Daemonize this process with the UNIX double-fork trick. + Writes the new PID to the provided file name if not None. + """ + + cur_umask = os.umask(077) + os.umask(cur_umask) + + pid = os.fork() + if pid > 0: + sys.exit(0) + os.chdir("/") + os.setsid() + os.umask(cur_umask) + pid = os.fork() + + os.close(0) + os.close(1) + os.close(2) + + # The standard I/O file descriptors are redirected to /dev/null by default. + if (hasattr(os, "devnull")): + REDIRECT_TO = os.devnull + else: + REDIRECT_TO = "/dev/null" + + # based on http://code.activestate.com/recipes/278731/ + os.open(REDIRECT_TO, os.O_RDWR) # standard input (0) + + os.dup2(0, 1) # standard output (1) + os.dup2(0, 2) # standard error (2) + + + + if pid > 0: + if pidfile is not None: + open(pidfile, "w").write(str(pid)) + sys.exit(0) + +def parse_args(args): + parser = optparse.OptionParser('\ncopr-be [options]') + parser.add_option('-c', '--config', default='/etc/copr-be.conf', dest='config_file', + help="config file to use for copr-be run") + parser.add_option('-d','--daemonize', default=False, dest='daemonize', + action='store_true', help="daemonize or not") + parser.add_option('-p', '--pidfile', default='copr-be.pid', dest='pidfile', + help="pid file to use for copr-be if daemonized") + parser.add_option('-x', '--exit', default=False, dest='exit_on_worker', + action='store_true', help="exit on worker failure") + + opts, args = parser.parse_args(args) + if not os.path.exists(opts.config_file): + print "No config file found at: %s" % opts.config_file + sys.exit(1) + opts.config_file = os.path.abspath(opts.config_file) + + ret_opts = Bunch() + for o in ('daemonize', 'exit_on_worker', 'pidfile', 'config_file'): + setattr(ret_opts, o, getattr(opts, o)) + + return ret_opts + + + +def main(args): + opts = parse_args(args) + + try: + cbe = CoprBackend(opts.config_file, ext_opts=opts) + if opts.daemonize: + daemonize(opts.pidfile) + cbe.run() + except Exception, e: + print 'Killing/Dying' + if 'cbe' in locals(): + for w in cbe.workers: + w.terminate() + raise + except KeyboardInterrupt, e: + pass + +if __name__ == '__main__': + try: + main(sys.argv[1:]) + except Exception, e: + print "ERROR: %s - %s" % (str(type(e)), str(e)) + # FIXME - maybe check on daemonize and do this as a syslog.syslog() call? + sys.exit(1) + except KeyboardInterrupt, e: + print "\nUser cancelled, may need cleanup\n" + sys.exit(0) +
copr-devel@lists.stg.fedorahosted.org