Previously, if backend started a bit faster than job-grabber, it
was fine, but the other way around (because job-grabber is
prerequisite for backend in service file):
1. Job Grabber started
2. JobGrabber took the list of tasks
3. JobGrabber filled the redis task queue && and filled its
internal representation of queue
4. Backend started, which resulted in redis task queue cleanup
5. JobGrabber was confused because the filled redis queue did
not match its internal queue representation state
This resulted in unprocessed queue.
Now backend precisely tells job_grabber via "different" (Queue)
channel about backend re-start (so backend does not have to touch
the Queue itself and the inconsistency does not happen).
---
backend/backend/daemons/backend.py | 32 ++----------
backend/backend/daemons/dispatcher.py | 17 +++---
backend/backend/daemons/job_grab.py | 97 ++++++++++++++++++++++-------------
backend/backend/helpers.py | 13 +++++
backend/backend/jobgrabcontrol.py | 80 +++++++++++++++++++++++++++++
5 files changed, 166 insertions(+), 73 deletions(-)
create mode 100644 backend/backend/jobgrabcontrol.py
diff --git a/backend/backend/daemons/backend.py b/backend/backend/daemons/backend.py
index 934a566..8b54bf1 100644
--- a/backend/backend/daemons/backend.py
+++ b/backend/backend/daemons/backend.py
@@ -15,13 +15,13 @@ from collections import defaultdict
import lockfile
from daemon import DaemonContext
from requests import RequestException
-from retask.queue import Queue
from retask import ConnectionError
from backend.frontend import FrontendClient
from ..exceptions import CoprBackendError
from ..helpers import BackendConfigReader, get_redis_logger
from .dispatcher import Worker
+from .. import jobgrabcontrol
class CoprBackend(object):
@@ -42,7 +42,7 @@ class CoprBackend(object):
raise CoprBackendError("Must specify config_file")
self.config_file = config_file
- self.ext_opts = ext_opts # to stow our cli options for read_conf()
+ self.ext_opts = ext_opts # to show our cli options for read_conf()
self.workers_by_group_id = defaultdict(list)
self.max_worker_num_by_group_id = defaultdict(int)
@@ -55,35 +55,14 @@ class CoprBackend(object):
self.log = get_redis_logger(self.opts, "backend.main", "backend")
self.frontend_client = FrontendClient(self.opts, self.log)
+ self.jg_control = jobgrabcontrol.Channel(self.opts, self.log)
self.is_running = False
- def clean_task_queues(self):
- """
- Make sure there is nothing in our task queues
- """
- try:
- for queue in self.task_queues.values():
- while queue.length:
- queue.dequeue()
- except ConnectionError:
- raise CoprBackendError(
- "Could not connect to a task queue. Is Redis running?")
-
def init_task_queues(self):
"""
- Connect to the retask.Queue for each group_id. Remove old tasks from queues.
+ Remove old tasks from queues.
"""
- try:
- for group in self.opts.build_groups:
- group_id = group["id"]
- queue = Queue("copr-be-{0}".format(group_id))
- queue.connect()
- self.task_queues[group_id] = queue
- except ConnectionError:
- raise CoprBackendError(
- "Could not connect to a task queue. Is Redis running?")
-
- self.clean_task_queues()
+ self.jg_control.backend_start()
def update_conf(self):
"""
@@ -158,7 +137,6 @@ class CoprBackend(object):
for w in self.workers_by_group_id[group_id][:]:
self.workers_by_group_id[group_id].remove(w)
w.terminate_instance()
- self.clean_task_queues()
try:
self.log.info("Rescheduling unfinished builds before stop")
diff --git a/backend/backend/daemons/dispatcher.py b/backend/backend/daemons/dispatcher.py
index 10375ba..ae3f460 100644
--- a/backend/backend/daemons/dispatcher.py
+++ b/backend/backend/daemons/dispatcher.py
@@ -7,8 +7,6 @@ import shutil
import multiprocessing
from setproctitle import setproctitle
-from retask.queue import Queue
-
from ..vm_manage.manager import VmManager
from ..exceptions import MockRemoteError, CoprWorkerError, VmError, NoVmAvailable
from ..job import BuildJob
@@ -16,6 +14,7 @@ from ..mockremote import MockRemote
from ..constants import BuildStatus, JOB_GRAB_TASK_END_PUBSUB, build_log_format
from ..helpers import register_build_result, get_redis_connection, get_redis_logger, \
local_file_logger
+from .. import jobgrabcontrol
# ansible_playbook = "ansible-playbook"
@@ -32,8 +31,6 @@ class Worker(multiprocessing.Process):
Worker process dispatches building tasks. Backend spin-up multiple workers, each
worker associated to one group_id and process one task at the each moment.
- Worker listens for the new tasks from :py:class:`retask.Queue` associated with its group_id
-
:param Munch opts: backend config
:param int worker_num: worker number
:param int group_id: group_id from the set of groups defined in config
@@ -51,9 +48,7 @@ class Worker(multiprocessing.Process):
self.log = get_redis_logger(self.opts, self.logger_name, "worker")
- # job management stuff
- self.task_queue = Queue("copr-be-{0}".format(str(group_id)))
- self.task_queue.connect()
+ self.jg = jobgrabcontrol.Channel(self.opts, self.log)
# event queue for communicating back to dispatcher
self.kill_received = False
@@ -238,14 +233,16 @@ class Worker(multiprocessing.Process):
# this sometimes caused TypeError in random worker
# when another one picekd up a task to build
# why?
+ # praiskup: not reproduced
try:
- task = self.task_queue.dequeue()
- except TypeError:
+ task = self.jg.get_build(self.group_id)
+ except TypeError as err:
+ self.log.warning(err)
return
if not task:
return
- job = BuildJob(task.data, self.opts)
+ job = BuildJob(task, self.opts)
self.update_process_title(suffix="Task: {} chroot: {}, obtained at {}"
.format(job.build_id, job.chroot, str(datetime.now())))
diff --git a/backend/backend/daemons/job_grab.py b/backend/backend/daemons/job_grab.py
index 21871ae..7bca75b 100644
--- a/backend/backend/daemons/job_grab.py
+++ b/backend/backend/daemons/job_grab.py
@@ -10,8 +10,6 @@ import time
from setproctitle import setproctitle
from requests import get, RequestException
-from retask.task import Task
-from retask.queue import Queue
from backend.frontend import FrontendClient
@@ -19,11 +17,22 @@ from ..actions import Action
from ..constants import JOB_GRAB_TASK_END_PUBSUB
from ..helpers import get_redis_connection, get_redis_logger
from ..exceptions import CoprJobGrabError
-
+from .. import jobgrabcontrol
# TODO: Replace entire model with asynchronous queue, so that frontend push task,
# and workers listen for them
-
+# praiskup: Please don't. I doubt this would help too much, and I really don't
+# think it is worth another rewrite. Reasons (imho):
+# a. there still needs to be "one" organizator, aka jobgrabber on the backend
+# VM side -- we do not want allow Workers to contact frontend directly
+# because of (1) security and (2) process synchronization.
+# b. in frontend, we _never_ want to block UI differently than on database,
+# so the push to BE can't be done instantly -- and thus there would have
+# to be something like buffered "JobPusher" (and that would be most
+# probably implemented as poll anyway). Maybe we could use some "pipe"
+# approach through infinite (http?) connection, or opened database
+# connection, .. but I don't think it does matter too much who will
+# control the "pipe".
class CoprJobGrab(object):
"""
@@ -36,41 +45,33 @@ class CoprJobGrab(object):
:param Munch opts: backend config
:param lock: :py:class:`multiprocessing.Lock` global backend lock
+ TODO: Not yet fully ready for config reload.
"""
def __init__(self, opts):
""" base class initialization """
self.opts = opts
+
+ # Maps e.g. x86_64 && i386 => PC (.
self.arch_to_group_id_map = dict()
- for group in self.opts.build_groups:
- for arch in group["archs"]:
- self.arch_to_group_id_map[arch] = group["id"]
-
- self.task_queues_by_arch = {}
- self.task_queues_by_group = {}
-
- self.added_jobs_dict = dict() # task_id -> task dict
-
+ # PC => max N builders per user
+ self.group_to_usermax = dict()
+ # task_id -> task dict
+ self.added_jobs_dict = dict()
self.rc = None
self.channel = None
self.ps_thread = None
self.log = get_redis_logger(self.opts, "backend.job_grab", "job_grab")
+ self.jg_control = jobgrabcontrol.Channel(self.opts, self.log)
self.frontend_client = FrontendClient(self.opts, self.log)
- def connect_queues(self):
- """
- Connects to the retask queues. One queue per builders group.
- """
- for group in self.opts.build_groups:
- queue = Queue("copr-be-{0}".format(group["id"]))
- queue.connect()
- self.task_queues_by_group[group["name"]] = queue
- for arch in group["archs"]:
- self.task_queues_by_arch[arch] = queue
+ def group(self, arch):
+ return self.arch_to_group_id_map[arch]
+
def listen_to_pubsub(self):
"""
@@ -84,6 +85,7 @@ class CoprJobGrab(object):
self.log.info("Subscribed to {} channel".format(JOB_GRAB_TASK_END_PUBSUB))
+
def route_build_task(self, task):
"""
Route build task to the appropriate queue.
@@ -101,24 +103,23 @@ class CoprJobGrab(object):
if "task_id" in task:
if task["task_id"] not in self.added_jobs_dict:
arch = task["chroot"].split("-")[2]
- if arch not in self.task_queues_by_arch:
- raise CoprJobGrabError("No builder group for architecture: {}, task: {}"
- .format(arch, task))
+ group = self.group(arch)
username = task["project_owner"]
- group_id = int(self.arch_to_group_id_map[arch])
active_jobs_count = len([t for t_id, t in self.added_jobs_dict.items()
if t["project_owner"] == username])
- if active_jobs_count > self.opts.build_groups[group_id]["max_vm_per_user"]:
- self.log.debug("User can not acquire more VM (active builds #{}), "
+ if active_jobs_count > self.group_to_usermax[group]:
+ self.log.debug("User can not acquire more VM (active builds #{0}), "
"don't schedule more tasks".format(active_jobs_count))
return 0
+ msg = "enqueue task for user {0}: id={1}, arch={2}, group={3}, active={4}"
+ self.log.debug(msg.format(username, task["task_id"], arch, group, active_jobs_count))
+
+ # Add both to local list and control channel queue.
self.added_jobs_dict[task["task_id"]] = task
-
- task_obj = Task(task)
- self.task_queues_by_arch[arch].enqueue(task_obj)
+ self.jg_control.add_build(group, task)
count += 1
else:
@@ -226,22 +227,46 @@ class CoprJobGrab(object):
self.log.debug("Added jobs after remove and load: {}".format(self.added_jobs_dict))
self.log.debug("# of executed jobs: {}".format(len(self.added_jobs_dict)))
- for group, queue in self.task_queues_by_group.items():
- if queue.length > 0:
- self.log.debug("# of pending jobs for `{}`: {}".format(group, queue.length))
+
+ def init_internal_structures(self):
+ self.arch_to_group_id_map = dict()
+ self.group_to_usermax = dict()
+ for group in self.opts.build_groups:
+ group_id = group["id"]
+ for arch in group["archs"]:
+ self.arch_to_group_id_map[arch] = group_id
+ self.log.debug("mapping {0} to {1} group".format(arch, group_id))
+
+ self.log.debug("user might use only {0}VMs for {1} group".format(group["max_vm_per_user"], group_id))
+ self.group_to_usermax[group_id] = group["max_vm_per_user"]
+
+ self.added_jobs_dict = dict()
+
+
+ def handle_control_channel(self):
+ if not self.jg_control.backend_started():
+ return
+ self.log.info("backend gave us signal to start")
+ self.init_internal_structures()
+ self.jg_control.remove_all_builds()
+ self.jg_control.job_graber_initialized()
def run(self):
"""
Starts job grabber process
"""
setproctitle("CoprJobGrab")
- self.connect_queues()
self.listen_to_pubsub()
self.log.info("JobGrub started.")
+
+ self.init_internal_structures()
try:
while True:
try:
+ # This effectively delays job_grabbing until backend
+ # gives as signal to start.
+ self.handle_control_channel()
self.load_tasks()
self.log_queue_info()
time.sleep(self.opts.sleeptime)
diff --git a/backend/backend/helpers.py b/backend/backend/helpers.py
index 3e9c028..fa1a3e5 100644
--- a/backend/backend/helpers.py
+++ b/backend/backend/helpers.py
@@ -11,6 +11,7 @@ import ConfigParser
import os
import sys
import errno
+import time
from contextlib import contextmanager
import traceback
@@ -28,6 +29,18 @@ from backend.constants import DEF_BUILD_USER, DEF_BUILD_TIMEOUT, DEF_CONSECUTIVE
CONSECUTIVE_FAILURE_REDIS_KEY, default_log_format
from backend.exceptions import CoprBackendError
+
+def wait_log(log, reason="I don't know why.", timeout=5):
+ """
+ We need to wait a while, this should happen only when copr converges to
+ boot-up/restart/..
+ """
+ if not log:
+ return
+ log.warning("I'm waiting {0}s because: {1}".format(timeout, reason))
+ time.sleep(timeout)
+
+
class SortedOptParser(optparse.OptionParser):
"""Optparser which sorts the options by opt before outputting --help"""
diff --git a/backend/backend/jobgrabcontrol.py b/backend/backend/jobgrabcontrol.py
new file mode 100644
index 0000000..c8d7414
--- /dev/null
+++ b/backend/backend/jobgrabcontrol.py
@@ -0,0 +1,80 @@
+from retask.queue import Queue
+from retask.task import Task
+
+from .helpers import wait_log
+
+class Channel(object):
+ """
+ Abstraction above retask (the set of "channels" between backend(s),
+ jobgrabber and workers). We could use multiple backends and/or diffferent
+ "atomic" medium (other implemntation than Queue) in future. But
+ make sure nobody needs to touch the "medium" directly.
+ """
+
+ def __init__(self, opts, log=None):
+ self.log = log
+ self.opts = opts
+ # channel for Backend <--> JobGrabber communication
+ self.jg_start = Queue("jg_control_start")
+ # channel for JobGrabber <--> [[Builders]] communication
+ self.build_queues = dict()
+ while not self.jg_start.connect():
+ wait_log("waiting for redis", 5)
+
+ def _get_queue(self, bgroup):
+ if not bgroup in self.build_queues:
+ q_id = "copr-be-{0}".format(bgroup)
+ q = Queue(q_id)
+ if not q.connect():
+ # As we already connected to jg_control_message, this should
+ # be also OK.
+ raise Exception("can't connect to redis, should never happen!")
+ return q
+
+ return self.build_queues[bgroup]
+
+ def add_build(self, bgroup, build):
+ """ this should be used by job_grab only for now """
+ q = self._get_queue(bgroup)
+ try:
+ q.enqueue(Task(build))
+ except Exception as err:
+ # I've seen isses Task() was not able to jsonify urllib exceptions
+ if not self.log:
+ return False
+ self.log.error("can't enqueue build {0}, reason:\n{1}".format(
+ build, err
+ ))
+
+ return True
+
+ # Builder's API
+ def get_build(self, bgroup):
+ """
+ Return task from queue or return 0
+ """
+ q = self._get_queue(bgroup)
+ t = q.dequeue()
+ return t.data if t else None
+
+ # JobGrab's API
+ def backend_started(self):
+ return self.jg_start.length
+
+ def job_graber_initialized(self):
+ while self.jg_start.dequeue():
+ pass
+
+ def remove_all_builds(self):
+ for bgroup in self.build_queues:
+ q = self._get_queue(bgroup)
+ while q.dequeue():
+ pass
+ self.build_queues = dict()
+
+ # Backend's API
+ def backend_start(self):
+ """ Notify jobgrab about service start. """
+ self.jg_start.enqueue("start")
+ while self.jg_start.length:
+ wait_log(self.log, "waiting until jobgrabber initializes queue")
--
2.5.0