Author: gnichols Date: 2009-12-22 17:58:11 +0000 (Tue, 22 Dec 2009) New Revision: 271
Added: trunk/v7/command-popen2.py Log: 484657 - hts uses code deprecated in python 2.6
Added: trunk/v7/command-popen2.py =================================================================== --- trunk/v7/command-popen2.py (rev 0) +++ trunk/v7/command-popen2.py 2009-12-22 17:58:11 UTC (rev 271) @@ -0,0 +1,275 @@ +# Copyright (c) 2006 Red Hat, Inc. All rights reserved. This copyrighted material +# is made available to anyone wishing to use, modify, copy, or +# redistribute it subject to the terms and conditions of the GNU General +# Public License v.2. +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +# PARTICULAR PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# Author: Greg Nichols +# +# Overview: +# The Command class is a wrapper for shell commands that performs +# validation and error checking. Example usage can be seen in the +# __main__ self test function at the end of this file. + + +import os,re, commands, exceptions, popen2, string, sys + + +class Command: + + def __init__(self, command): + """ Creates a Command object that wraps the shell command + via the supplied string, similar to os.system. The + constuctor does not actually execute the command. + """ + self.command = command + self.regex = None + self.singleLine = True + self.regexGroup = None + self.output = None + self.errors = None + self.returnValue = 0 + self.signal = 0 + + def _run(self): + commandPipe = popen2.Popen3(self.command, capturestderr=True) + self.output = commandPipe.fromchild.readlines() + result = commandPipe.wait() + self.returnValue = (result >> 8) & 0xFF + self.signal = result & 0xFF + self.errors = None + + if commandPipe.childerr: + self.errors = commandPipe.childerr.readlines() + # always print error messages + for line in self.errors: + sys.stderr.write( line ) + sys.stderr.flush() + + def _checkErrors(self): + + if self.errors and len(self.errors) > 0: + raise V7CommandException(self, "has output on stderr") + if self.returnValue != 0: + # if error returned, show stdout whether echo or run was called + for line in self.output: + sys.stdout.write( line ) + sys.stdout.flush() + raise V7CommandException(self, "returned %d" % self.returnValue) + + def run(self): + """ This method runs the command to produce an action. Any output + to stdout is not echoed to the caller. """ + self._run() + self._checkErrors() + return 0 + + + def echo(self): + """ output is equivalent to run, except that the commands' output + is echo'd on stdout. """ + self._run() + self._checkErrors() + for line in self.output: + sys.stdout.write( line ) + return 0 + + def echoIgnoreErrors(self): + """ like echo, except don't raise exception on errors """ + self._run() + for line in self.output: + sys.stdout.write( line ) + return 0 + + def printOutput(self): + for line in self.output: + sys.stdout.write( line ) + sys.stdout.flush() + + def printErrors(self): + for line in self.errors: + sys.stderr.write( line ) + sys.stderr.flush() + + + def _getString(self, regex=None, regexGroup=None, singleLine=True, returnList=False, ignoreErrors=False): + """ + Get the string from the command's output. With default parameters + it returns the command's single line of output as a string. + + If singleLine is True, and multiple lines are found in the output, + V7CommandException is raised. + + The regex parameter allows the output to be searched for a regular + expression match. If no regexGroup is supplied, the entire pattern + match is returned. The regexGroup parameter allows named + substrings of the match to be returned if regex has named groups + via the "(?P<name>)" syntax. If no match is found, + V7CommandException is raised. + """ + self.regex = regex + self.singleLine = singleLine + self.regexGroup = regexGroup + + self._run() + + if self.singleLine: + if len(self.output) > 1: + raise V7CommandException(self, "Found %u lines of output, expected 1" % len(self.output)) + + line = self.output[0].strip() + if not self.regex: + return line + # otherwise, try the regex + pattern = re.compile(self.regex) + match = pattern.match(line) + if match: + if self.regexGroup: + return match.group(self.regexGroup) + # otherwise, no group, return the whole line + return line + + # no regex match try a grep-style match + if not self.regexGroup: + match = pattern.search(line) + if match: + return match.group() + + # otherwise + raise V7CommandException(self, "no match for regular expression %s" % self.regex) + + + #otherwise, multi-line or single-line regex + if not self.regex: + raise V7CommandError(self, "no regular expression set for multi-line command") + pattern = re.compile(self.regex) + result = None + if returnList: + result = list() + for line in self.output: + if self.regexGroup: + match = pattern.match(line) + if match: + if self.regexGroup: + if returnList: + result.append(match.group(self.regexGroup)) + else: + return match.group(self.regexGroup) + else: + # otherwise, return the matching line + match = pattern.search(line) + if match: + if returnList: + result.append(match.group()) + else: + return match.group() + if result: + return result + + # otherwise + raise V7CommandException(self, "no match for regular expression %s" % self.regex) + + def getStringList(self, regex=None, regexGroup=None, ignoreErrors=False): + """ like getString, except return a complete list of matches on multiple lines.""" + result = self._getString(regex, regexGroup, singleLine=False, returnList=True) + if not ignoreErrors: + self._checkErrors() + return result + + def getString(self, regex=None, regexGroup=None, singleLine=True, ignoreErrors=False): + """ like getString, except return a complete list of matches on multiple lines.""" + result = self._getString(regex, regexGroup, singleLine, returnList=False) + if not ignoreErrors: + self._checkErrors() + return result + + + def getInteger(self, regex=None, regexGroup=None, singleLine=True): + """ + getInteger is the same as getString, except the output is required to + be an Integer. + """ + value = self.getString(regex, regexGroup, singleLine) + return string.atoi(value) + +class V7CommandException(exceptions.Exception): + def __init__(self, command, message): + self.message = message + self.command = command + + def __str__(self): + return ""%s" %s" % (self.command.command, self.message) + + +if __name__ == "__main__": + + try: + # positive test: run + command = Command("ls -a") + print "ls -a:" + command.run() + + # positive test: simple match + command = Command("date") + print "is it 2008?" + print command.getString("2008") + + # positive test: regex on single line + command = Command("date") + print "day of the week via date: %s" % command.getString(regex="^(?P<day>[MTWF][a-z]).*$", regexGroup="day") + + # positive test: regex on multiline + command = Command("who") + print "a device via who: %s" % command.getString(regex="^(?P<user>[a-z]+) (?P<device>[a-z/]+[0-9]*)[ \t]*(?P<date>2008-\d+-\d+).*$", regexGroup="device", singleLine=False) + + #positive test: integer - simple match + command = Command("du .") + print "simple du: %u" % command.getInteger(regex="\d+", singleLine=False) + + #positive test: integer + command = Command("df .") + print "blocks via df: %u" % command.getInteger(regex="^[ \t]+(?P<blocks>[0-9]+[ \t]+).*$", regexGroup="blocks", singleLine=False) + + except V7CommandException, e: + print e + + # negative test: fail simple match + try: + print "is it 1999?:" + command = Command("date") + command.getString(regex="1999") + except V7CommandException, e: + print e + + # negative test: return value + try: + print "negative test: return value:" + command = Command("exit 1") + print "call exit(1)" % command.getString() + except V7CommandException, e: + print e + + # negative test: expect single line, get multiple + try: + print "negative test: more than one line of output" + command = Command("who") + print "who: %s" % command.getString() + except V7CommandException, e: + print e + + # negative test: expect single line, get multiple + try: + print "negative test: output on stderr" + command = Command("echo "boguserror" >&2; echo "boguserror"") + print "error echo: %s" % command.getString(regex="boguserror", singleLine=False) + except V7CommandException, e: + print e + +
v7-commits@lists.stg.fedorahosted.org