instream.py


Below is the syntax highlighted version of instream.py from § Code.


"""
instream.py

The instream module defines the InStream class.
"""

#-----------------------------------------------------------------------

import sys
if sys.hexversion < 0x03000000:
    import urllib
else:
    import urllib.request as urllib
import re

#-----------------------------------------------------------------------

class InStream:

    """
    An InStream object wraps around a text file or sys.stdin, and
    supports reading from that stream.

    Note:  Usually it's a bad idea to mix these three sets of methods:

    -- isEmpty(), readInt(), readFloat(), readBool(), readString()

    -- hasNextLine(), readLine()
    
    -- readAll(), readAllInts(), readAllFloats(), readAllBools(),
       readAllStrings(), readAllLines()

    Usually it's better to use one set exclusively.
    """

    #-------------------------------------------------------------------

    def __init__(self, fileOrUrl=None):
        """
        Construct self to wrap around a stream. The stream can be
        a file whose name is given as fileOrUrl, a resource whose URL
        is given as fileOrUrl, or sys.stdin by default.
        """
        self._buffer = ''
        self._stream = None
        self._readingWebPage = False

        if fileOrUrl is None:
            import stdio # To change the mode of sys.stdin
            self._stream = sys.stdin
            return

        # Try to open a file, then a URL.
        try:
            if sys.hexversion < 0x03000000:
                self._stream = open(fileOrUrl,'rU')
            else:
                self._stream = open(fileOrUrl, 'r', encoding='utf-8')
        except IOError:
            try:
                self._stream = urllib.urlopen(fileOrUrl)
                self._readingWebPage = True
            except IOError:
                raise IOError('No such file or URL: ' + fileOrUrl)

    #-------------------------------------------------------------------

    def _readRegExp(self, regExp):
        """
        Discard leading white space characters from the stream wrapped
        by self.  Then read from the stream and return a string
        matching regular expression regExp.  Raise an EOFError if no
        non-whitespace characters remain in the stream. Raise a
        ValueError if the next characters to be read from the stream
        do not match regExp.
        """
        if self.isEmpty():
            raise EOFError()
        compiledRegExp = re.compile(r'^\s*' + regExp)
        match = compiledRegExp.search(self._buffer)
        if match is None:
            raise ValueError()
        s = match.group()
        self._buffer = self._buffer[match.end():]
        return s.lstrip()

    #-------------------------------------------------------------------

    def isEmpty(self):
        """
        Return True iff no non-whitespace characters remain in the
        stream wrapped by self.
        """
        while self._buffer.strip() == '':
            line = self._stream.readline()
            if sys.hexversion < 0x03000000 or self._readingWebPage:
                line = line.decode('utf-8')
            if line == '':
                return True
            self._buffer += str(line)
        return False

    #-------------------------------------------------------------------

    def readInt(self):
        """
        Discard leading white space characters from the stream wrapped
        by self.  Then read from the stream a sequence of characters
        comprising an integer.  Convert the sequence of characters to an
        integer, and return the integer.  Raise an EOFError if no
        non-whitespace characters remain in the stream.  Raise a
        ValueError if the next characters to be read from the stream
        cannot comprise an integer.
        """
        s = self._readRegExp(r'[-+]?(0[xX][\dA-Fa-f]+|0[0-7]*|\d+)')
        radix = 10
        strLength = len(s)
        if (strLength >= 1) and (s[0:1] == '0'): radix = 8
        if (strLength >= 2) and (s[0:2] == '-0'): radix = 8
        if (strLength >= 2) and (s[0:2] == '0x'): radix = 16
        if (strLength >= 2) and (s[0:2] == '0X'): radix = 16
        if (strLength >= 3) and (s[0:3] == '-0x'): radix = 16
        if (strLength >= 3) and (s[0:3] == '-0X'): radix = 16
        return int(s, radix)

    #-------------------------------------------------------------------

    def readAllInts(self):
        """
        Read all remaining strings from the stream wrapped by self,
        convert  each to an int, and return those ints in an array.
        Raise a ValueError if any of the strings cannot be converted
        to an int.
        """
        strings = self.readAllStrings()
        ints = []
        for s in strings:
            i = int(s)
            ints.append(i)
        return ints

    #-------------------------------------------------------------------

    def readFloat(self):
        """
        Discard leading white space characters from the stream wrapped
        by self.  Then read from the stream a sequence of characters
        comprising a float.  Convert the sequence of characters to an
        float, and return the float.  Raise an EOFError if no
        non-whitespace characters remain in the stream.  Raise a
        ValueError if the next characters to be read from the stream
        cannot comprise a float.
        """
        s = self._readRegExp(r'[-+]?(\d+(\.\d*)?|\.\d+)([eE][-+]?\d+)?')
        return float(s)

    #-------------------------------------------------------------------

    def readAllFloats(self):
        """
        Read all remaining strings from the stream wrapped by self,
        convert each to a float, and return those floats in an array.
        Raise a ValueError if any of the strings cannot be converted
        to a float.
        """
        strings = self.readAllStrings()
        floats = []
        for s in strings:
            f = float(s)
            floats.append(f)
        return floats

    #-------------------------------------------------------------------

    def readBool(self):
        """
        Discard leading white space characters from the stream wrapped
        by self.  Then read from the stream a sequence of characters
        comprising a bool.  Convert the sequence of characters to an
        bool, and return the bool.  Raise an EOFError if no
        non-whitespace characters remain in the stream.  Raise a
        ValueError if the next characters to be read from the stream
        cannot comprise an bool.
        """
        s = self._readRegExp(r'(True)|(False)|1|0')
        if (s == 'True') or (s == '1'):
            return True
        return False

    #-----------------------------------------------------------------------

    def readAllBools(self):
        """
        Read all remaining strings from the stream wrapped by self,
        convert each to a bool, and return those bools in an array.
        Raise a ValueError if any of the strings cannot be converted
        to a bool.
        """
        strings = self.readAllStrings()
        bools = []
        for s in strings:
            b = bool(s)
            bools.append(b)
        return bools


    #-------------------------------------------------------------------

    def readString(self):
        """
        Discard leading white space characters from the stream wrapped
        by self.  Then read from the stream a sequence of characters
        comprising a string, and return the string. Raise an EOFError
        if no non-whitespace characters remain in the stream.
        """
        s = self._readRegExp(r'\S+')
        return s

    #-----------------------------------------------------------------------

    def readAllStrings(self):
        """
        Read all remaining strings from the stream wrapped by self,
        and return them in an array.
        """
        strings = []
        while not self.isEmpty():
            s = self.readString()
            strings.append(s)
        return strings

    #-------------------------------------------------------------------

    def hasNextLine(self):
        """
        Return True iff the stream wrapped by self has a next line.
        """
        if self._buffer != '':
            return True
        else:
            self._buffer = self._stream.readline()
            if sys.hexversion < 0x03000000 or self._readingWebPage:
                self._buffer = self._buffer.decode('utf-8')
            if self._buffer == '':
                return False
            return True

    #-------------------------------------------------------------------

    def readLine(self):
        """
        Read and return as a string the next line of the stream wrapped
        by self.  Raise an EOFError is there is no next line.
        """
        if not self.hasNextLine():
            raise EOFError()
        s = self._buffer
        self._buffer = ''
        return s.rstrip('\n')

    #-------------------------------------------------------------------

    def readAllLines(self):
        """
        Read all remaining lines from the stream wrapped by self, and
        return them as strings in an array.
        """
        lines = []
        while self.hasNextLine():
            line = self.readLine()
            lines.append(line)
        return lines

    #-------------------------------------------------------------------

    def readAll(self):
        """
        Read and return as a string all remaining lines of the stream
        wrapped by self.
        """
        s = self._buffer
        self._buffer = ''
        for line in self._stream:
            if sys.hexversion < 0x03000000 or self._readingWebPage:
                line = line.decode('utf-8')
            s += line
        return s

    #-------------------------------------------------------------------

    def __del__(self):
        """
        Close the stream wrapped by self.
        """
        if self._stream is not None:
            self._stream.close()

#=======================================================================
# For Testing
#=======================================================================

def _main():
    """
    For testing. The first command-line argument should be the name of
    the method that should be called. The optional second command-line
    argument should be the file or URL to read.
    """

    import stdio

    testId = sys.argv[1]
    if len(sys.argv) > 2:
        inStream = InStream(sys.argv[2])
    else:
        inStream = InStream()

    if testId == 'readInt':
        stdio.writeln(inStream.readInt())
    elif testId == 'readAllInts':
        stdio.writeln(inStream.readAllInts())
    elif testId == 'readFloat':
        stdio.writeln(inStream.readFloat())
    elif testId == 'readAllFloats':
        stdio.writeln(inStream.readAllFloats())
    elif testId == 'readBool':
        stdio.writeln(inStream.readBool())
    elif testId == 'readAllBools':
        stdio.writeln(inStream.readAllBools())
    elif testId == 'readString':
        stdio.writeln(inStream.readString())
    elif testId == 'readAllStrings':
        stdio.writeln(inStream.readAllStrings())
    elif testId == 'readLine':
        stdio.writeln(inStream.readLine())
    elif testId == 'readAllLines':
        stdio.writeln(inStream.readAllLines())
    elif testId == 'readAll':
        stdio.writeln(inStream.readAll())

if __name__ == '__main__':
    _main()


Copyright © 2000–2015, Robert Sedgewick, Kevin Wayne, and Robert Dondero.
Last updated: Fri Oct 20 20:45:16 EDT 2017.