r"""
    This module provides some testing functionality for paraview and
    vtk web applications.  It provides the ability to run an arbitrary
    test script in a separate thread and communicate the results back
    to the service so that the CTest framework can be notified of the
    success or failure of the test.

    This test harness will notice when the test script has finished
    running and will notify the service to stop.  At this point, the
    test results will be checked in the main thread which ran the
    service, and in the case of failure an exception will be raised
    to notify CTest of the failure.

    Test scripts need to follow some simple rules in order to work
    within the test harness framework:

    1) implement a function called "runTest(args)", where the args
    parameter contains all the arguments given to the web application
    upon starting.  Among other important items, args will contain the
    port number where the web application is listening.

    2) import the testing module so that the script has access to
    the functions which indicate success and failure.  Also the
    testing module contains convenience functions that might be of
    use to the test scripts.

       from vtk.web import testing

    3) Call the "testPass(testName)" or "testFail(testName)" functions
    from within the runTest(args) function to indicate to the framework
    whether the test passed or failed.

"""

import_warning_info = ""
test_module_comm_queue = None

import vtk

# Try standard Python imports
try:
    import os, re, time, datetime, threading, imp, inspect, Queue, types, io
except:
    import_warning_info += "\nUnable to load at least one basic Python module"

# Image comparison imports
try:
    try:
        from PIL import Image
    except ImportError:
        import Image
    except:
        raise
    import base64
    import itertools
except:
    import_warning_info += (
        "\nUnable to load at least one modules necessary for image comparison"
    )

# Browser testing imports
try:
    import selenium
    from selenium import webdriver
except:
    import_warning_info += (
        "\nUnable to load at least one module necessary for browser tests"
    )

# HTTP imports
try:
    import requests
except:
    import_warning_info += (
        "\nUnable to load at least one module necessary for HTTP tests"
    )


# Define some infrastructure to support different (or no) browsers
test_module_browsers = ["firefox", "chrome", "internet_explorer", "safari", "nobrowser"]


class TestModuleBrowsers:
    firefox, chrome, internet_explorer, safari, nobrowser = range(5)


# =============================================================================
# We can use this exception type to indicate that the test shouldn't actually
# "fail", rather that it was unable to run because some dependencies were not
# met.
# =============================================================================
class DependencyError(Exception):
    def __init__(self, value):
        self.value = value

    def __str__(self):
        return repr(self.value)


# =============================================================================
# This class allows usage as a dictionary and an object with named property
# access.
# =============================================================================
class Dictionary(dict):
    def __getattribute__(self, attrName):
        return self[attrName]

    def __setattr__(self, attrName, attrValue):
        self[attrName] = attrValue


# =============================================================================
# Checks whether test script supplied, if so, safely imports needed modules
# =============================================================================
def initialize(opts, reactor=None, cleanupMethod=None):
    """
    This function should be called to initialize the testing module.  The first
    important thing it does is to store the options for later, since the
    startTestThread function will need them.  Then it checks the arguments that
    were passed into the server to see if a test was actually requested, making
    a note of this fact.  Then, if a test was required, this function then
    checks if all the necessary testing modules were safely imported, printing
    a warning if not.  If tests were requested and all modules were present,
    then this function sets "test_module_do_testing" to True and sets up the
    startTestThread function to be called after the reactor is running.

        opts: Parsed arguments from the server

        reactor: This argument is optional, but is used by server.py to
        cause the test thread to be started only after the server itself
        has started.  If it is not provided, the test thread is launched
        immediately.

        cleanupMethod: A callback method you would like the test thread
        to execute when the test has finished.  This is used by server.py
        as a way to have the server terminated after the test has finished,
        but could be used for other cleanup purposes.  This argument is
        also optional.
    """

    global import_warning_info

    global testModuleOptions
    testModuleOptions = Dictionary()

    # Copy the testing options into something we can easily extend
    for arg in vars(opts):
        optValue = getattr(opts, arg)
        testModuleOptions[arg] = optValue

    # If we got one, add the cleanup method to the testing options
    if cleanupMethod:
        testModuleOptions["cleanupMethod"] = cleanupMethod

    # Check if a test was actually requested
    if (
        testModuleOptions.testScriptPath != ""
        and testModuleOptions.testScriptPath is not None
    ):
        # Check if we ran into trouble with any of the testing imports
        if import_warning_info != "":
            print("WARNING: Some tests may have unmet dependencies")
            print(import_warning_info)

        if reactor is not None:
            # Add startTest callback to the reactor callback queue, so that
            # the test thread gets started after the reactor is running.  Of
            # course this should only happen if everything is good for tests.
            reactor.callWhenRunning(_start_test_thread)
        else:
            # Otherwise, our aim is to start the thread from another process
            # so just call the start method.
            _start_test_thread()


# =============================================================================
# Grab out the command-line arguments needed for by the testing module.
# =============================================================================
def add_arguments(parser):
    """
    This function retrieves any command-line arguments that the client-side
    tester needs.  In order to run a test, you will typically just need the
    following:

      --run-test-script => This should be the full path to the test script to
      be run.

      --baseline-img-dir => This should be the 'Baseline' directory where the
      baseline images for this test are located.

      --test-use-browser => This should be one of the supported browser types,
      or else 'nobrowser'.  The choices are 'chrome', 'firefox', 'internet_explorer',
      'safari', or 'nobrowser'.
    """

    parser.add_argument(
        "--run-test-script",
        default="",
        help="The path to a test script to run",
        dest="testScriptPath",
    )

    parser.add_argument(
        "--baseline-img-dir",
        default="",
        help="The path to the directory containing the web test baseline images",
        dest="baselineImgDir",
    )

    parser.add_argument(
        "--test-use-browser",
        default="nobrowser",
        help="One of 'chrome', 'firefox', 'internet_explorer', 'safari', or 'nobrowser'.",
        dest="useBrowser",
    )

    parser.add_argument(
        "--temporary-directory",
        default=".",
        help="A temporary directory for storing test images and diffs",
        dest="tmpDirectory",
    )

    parser.add_argument(
        "--test-image-file-name",
        default="",
        help="Name of file in which to store generated test image",
        dest="testImgFile",
    )


# =============================================================================
# Initialize the test client
# =============================================================================
def _start_test_thread():
    """
    This function checks whether testing is required and if so, sets up a Queue
    for the purpose of communicating with the thread.  then it starts the
    after waiting 5 seconds for the server to have a chance to start up.
    """

    global test_module_comm_queue
    test_module_comm_queue = Queue.Queue()

    t = threading.Thread(
        target=launch_web_test,
        args=[],
        kwargs={
            "serverOpts": testModuleOptions,
            "commQueue": test_module_comm_queue,
            "testScript": testModuleOptions.testScriptPath,
        },
    )

    t.start()


# =============================================================================
# Test scripts call this function to indicate passage of their test
# =============================================================================
def test_pass(testName):
    """
    Test scripts should call this function to indicate that the test passed.  A
    note is recorded that the test succeeded, and is checked later on from the
    main thread so that CTest can be notified of this result.
    """

    global test_module_comm_queue
    resultObj = {testName: "pass"}
    test_module_comm_queue.put(resultObj)


# =============================================================================
# Test scripts call this function to indicate failure of their test
# =============================================================================
def test_fail(testName):
    """
    Test scripts should call this function to indicate that the test failed.  A
    note is recorded that the test did not succeed, and this note is checked
    later from the main thread so that CTest can be notified of the result.

    The main thread is the only one that can signal test failure in
    CTest framework, and the main thread won't have a chance to check for
    passage or failure of the test until the main loop has terminated.  So
    here we just record the failure result, then we check this result in the
    processTestResults() function, throwing an exception at that point to
    indicate to CTest that the test failed.
    """

    global test_module_comm_queue
    resultObj = {testName: "fail"}
    test_module_comm_queue.put(resultObj)


# =============================================================================
# Concatenate any number of strings into a single path string.
# =============================================================================
def concat_paths(*pathElts):
    """
    A very simple convenience function so that test scripts can build platform
    independent paths out of a list of elements, without having to import the
    os module.

        pathElts: Any number of strings which should be concatenated together
        in a platform independent manner.
    """

    return os.path.join(*pathElts)


# =============================================================================
# So we can change our time format in a single place, this function is
# provided.
# =============================================================================
def get_current_time_string():
    """
    This function returns the current time as a string, using ISO 8601 format.
    """

    return datetime.datetime.now().isoformat(" ")


# =============================================================================
# Uses vtkTesting to compare images.  According to comments in the vtkTesting
# C++ code (and this seems to work), if there are multiple baseline images in
# the same directory as the baseline_img, and they follow the naming pattern:
# 'img.png', 'img_1.png', ... , 'img_N.png', then all of these images will be
# tried for a match.
# =============================================================================
def compare_images(test_img, baseline_img, tmp_dir="."):
    """
    This function creates a vtkTesting object, and specifies the name of the
    baseline image file, using a fully qualified path (baseline_img must be
    fully qualified).  Then it calls the vtkTesting method which compares the
    image (test_img, specified only with a relative path) against the baseline
    image as well as any other images in the same directory as the baseline
    image which follow the naming pattern: 'img.png', 'img_1.png', ... , 'img_N.png'

        test_img: File name of output image to be compared against baseline.

        baseline_img: Fully qualified path to first of the baseline images.

        tmp_dir: Fully qualified path to a temporary directory for storing images.
    """

    # Create a vtkTesting object and specify a baseline image
    t = vtk.vtkTesting()
    t.AddArgument("-T")
    t.AddArgument(tmp_dir)
    t.AddArgument("-V")
    t.AddArgument(baseline_img)

    # Perform the image comparison test and print out the result.
    return t.RegressionTest(test_img, 0.0)


# =============================================================================
# Provide a wait function
# =============================================================================
def wait_with_timeout(delay=None, limit=0, criterion=None):
    """
    This function provides the ability to wait for a certain number of seconds,
    or else to wait for a specific criterion to be met.
    """
    for i in itertools.count():
        if criterion is not None and criterion():
            return True
        elif delay * i > limit:
            return False
        else:
            time.sleep(delay)


# =============================================================================
# Define a WebTest class with five stages of testing: initialization, setup,
# capture, postprocess, and cleanup.
# =============================================================================
class WebTest(object):
    """
    This is the base class for all automated web-based tests.  It defines five
    stages that any test must run through, and allows any or all of these
    stages to be overridden by subclasses.  This class defines the run_test
    method to invoke the five stages overridden by subclasses, one at a time:
    1) initialize, 2) setup, 3) capture, 4) postprocess, and 5) cleanup.
    """

    class Abort:
        pass

    def __init__(self, url=None, testname=None, **kwargs):
        self.url = url
        self.testname = testname

    def run_test(self):
        try:
            self.checkdependencies()
            self.initialize()
            self.setup()
            self.capture()
            self.postprocess()
        except WebTest.Abort:
            # Placeholder for future option to return failure result
            pass
        except:
            self.cleanup()
            raise

        self.cleanup()

    def checkdependencies(self):
        pass

    def initialize(self):
        pass

    def setup(self):
        pass

    def capture(self):
        pass

    def postprocess(self):
        pass

    def cleanup(self):
        pass


# =============================================================================
# Define a WebTest subclass designed specifically for browser-based tests.
# =============================================================================
class BrowserBasedWebTest(WebTest):
    """
    This class can be used as a base for any browser-based web tests.  It
    introduces the notion of a selenium browser and overrides phases (1) and
    (3), initialization and cleanup, of the test phases introduced in the base
    class.  Initialization involves selecting the browser type, setting the
    browser window size, and asking the browser to load the url.  Cleanup
    involves closing the browser window.
    """

    def __init__(self, size=None, browser=None, **kwargs):
        self.size = size
        self.browser = browser
        self.window = None

        WebTest.__init__(self, **kwargs)

    def initialize(self):
        try:
            if self.browser is None or self.browser == TestModuleBrowsers.chrome:
                self.window = webdriver.Chrome()
            elif self.browser == TestModuleBrowsers.firefox:
                self.window = webdriver.Firefox()
            elif self.browser == TestModuleBrowsers.internet_explorer:
                self.window = webdriver.Ie()
            else:
                raise DependencyError(
                    "self.browser argument has illegal value %r" % (self.browser)
                )
        except DependencyError as dErr:
            raise
        except Exception as inst:
            raise DependencyError(inst)

        if self.size is not None:
            self.window.set_window_size(self.size[0], self.size[1])

        if self.url is not None:
            self.window.get(self.url)

    def cleanup(self):
        try:
            self.window.quit()
        except:
            print(
                "Unable to call window.quit, perhaps this is expected because of unmet browser dependency."
            )


# =============================================================================
# Extend BrowserBasedWebTest to handle vtk-style image comparison
# =============================================================================
class ImageComparatorWebTest(BrowserBasedWebTest):
    """
    This class extends browser based web tests to include image comparison.  It
    overrides the capture phase of testing with some functionality to simply
    grab a screenshot of the entire browser window.  It overrides the
    postprocess phase with a call to vtk image comparison functionality.
    Derived classes can then simply override the setup function with a series
    of selenium-based browser interactions to create a complete test.  Derived
    classes may also prefer to override the capture phase to capture only
    certain portions of the browser window for image comparison.
    """

    def __init__(self, filename=None, baseline=None, temporaryDir=None, **kwargs):
        if filename is None:
            raise TypeError("missing argument 'filename'")
        if baseline is None:
            raise TypeError("missing argument 'baseline'")

        BrowserBasedWebTest.__init__(self, **kwargs)
        self.filename = filename
        self.baseline = baseline
        self.tmpDir = temporaryDir

    def capture(self):
        self.window.save_screenshot(self.filename)

    def postprocess(self):
        result = compare_images(self.filename, self.baseline, self.tmpDir)

        if result == 1:
            test_pass(self.testname)
        else:
            test_fail(self.testname)


# =============================================================================
# Given a css selector to use in finding the image element, get the element,
# then base64 decode the "src" attribute and return it.
# =============================================================================
def get_image_data(browser, cssSelector):
    """
    This function takes a selenium browser and a css selector string and uses
    them to find the target HTML image element.  The desired image element
    should contain it's image data as a Base64 encoded JPEG image string.
    The 'src' attribute of the image is read, Base64-decoded, and then
    returned.

        browser: A selenium browser instance, as created by webdriver.Chrome(),
        for example.

        cssSelector: A string containing a CSS selector which will be used to
        find the HTML image element of interest.
    """

    # Here's maybe a better way to get at that image element
    imageElt = browser.find_element_by_css_selector(cssSelector)

    # Now get the Base64 image string and decode it into image data
    base64String = imageElt.get_attribute("src")
    b64RegEx = re.compile(r"data:image/jpeg;base64,(.+)")
    b64Matcher = b64RegEx.match(base64String)
    imgdata = base64.b64decode(b64Matcher.group(1))

    return imgdata


# =============================================================================
# Combines a variation on above function with the write_image_to_disk function.
# converting jpg to png in the process, if necessary.
# =============================================================================
def save_image_data_as_png(browser, cssSelector, imgfilename):
    """
    This function takes a selenium browser instance, a css selector string,
    and a file name.  It uses the css selector string to finds the target HTML
    Image element, which should contain a Base64 encoded JPEG image string,
    it decodes the string to image data, and then saves the data to the file.
    The image type of the written file is determined from the extension of the
    provided filename.

        browser: A selenium browser instance as created by webdriver.Chrome(),
        for example.

        cssSelector: A string containing a CSS selector which will be used to
        find the HTML image element of interest.

        imgFilename: The filename to which to save the image. The extension is
        used to determine the type of image which should be saved.
    """
    imageElt = browser.find_element_by_css_selector(cssSelector)
    base64String = imageElt.get_attribute("src")
    b64RegEx = re.compile(r"data:image/jpeg;base64,(.+)")
    b64Matcher = b64RegEx.match(base64String)
    img = Image.open(io.BytesIO(base64.b64decode(b64Matcher.group(1))))
    img.save(imgfilename)


# =============================================================================
# Given a decoded image and the full path to a file, write the image to the
# file.
# =============================================================================
def write_image_to_disk(imgData, filePath):
    """
    This function takes an image data, as returned by this module's
    get_image_data() function for example, and writes it out to the file given by
    the filePath parameter.

        imgData: An image data object
        filePath: The full path, including the file name and extension, where
        the image should be written.
    """

    with open(filePath, "wb") as f:
        f.write(imgData)


# =============================================================================
# There could be problems if the script file has more than one class defn which
# is a subclass of vtk.web.testing.WebTest, so we should write some
# documentation to help people avoid that.
# =============================================================================
def instantiate_test_subclass(pathToScript, **kwargs):
    """
    This function takes the fully qualified path to a test file, along with
    any needed keyword arguments, then dynamically loads the file as a module
    and finds the test class defined inside of it via inspection.  It then
    uses the keyword arguments to instantiate the test class and return the
    instance.

        pathToScript: Fully qualified path to python file containing defined
        subclass of one of the test base classes.
        kwargs: Keyword arguments to be passed to the constructor of the
        testing subclass.
    """

    # Load the file as a module
    moduleName = imp.load_source("dynamicTestModule", pathToScript)
    instance = None

    # Inspect dynamically loaded module members
    for name, obj in inspect.getmembers(moduleName):
        # Looking for classes only
        if inspect.isclass(obj):
            instance = obj.__new__(obj)
            # And only classes defined in the dynamically loaded module
            if instance.__module__ == "dynamicTestModule":
                try:
                    instance.__init__(**kwargs)
                    break
                except Exception as inst:
                    print("Caught exception: " + str(type(inst)))
                    print(inst)
                    raise

    return instance


# =============================================================================
# For testing purposes, define a function which can interact with a running
# paraview or vtk web application service.
# =============================================================================
def launch_web_test(*args, **kwargs):
    """
    This function loads a python file as a module (with no package), and then
    instantiates the class it must contain, and finally executes the run_test()
    method of the class (which the class may override, but which is defined in
    both of the testing base classes, WebTest and ImageComparatorBaseClass).
    After the run_test() method finishes, this function will stop the web
    server if required.  This function expects some keyword arguments will be
    present in order for it to complete it's task:

        kwargs['serverOpts']: An object containing all the parameters used
        to start the web service.  Some of them will be used in the test script
        in order perform the test.  For example, the port on which the server
        was started will be required in order to connect to the server.

        kwargs['testScript']: The full path to the python file containing the
        testing subclass.
    """

    serverOpts = None
    testScriptFile = None

    # This is really the thing all test scripts will need: access to all
    # the options used to start the server process.
    if "serverOpts" in kwargs:
        serverOpts = kwargs["serverOpts"]
        # print 'These are the serverOpts we got: '
        # print serverOpts

    # Get the full path to the test script
    if "testScript" in kwargs:
        testScriptFile = kwargs["testScript"]

    testName = "unknown"

    # Check for a test file (python file)
    if testScriptFile is None:
        print("No test script file found, no test script will be run.")
        test_fail(testName)

    # The test name will be generated from the python script name, so
    # match and capture a bunch of contiguous characters which are
    # not '.', '\', or '/', followed immediately by the string '.py'.
    fnamePattern = re.compile("([^\.\/\\\]+)\.py")
    fmatch = re.search(fnamePattern, testScriptFile)
    if fmatch:
        testName = fmatch.group(1)
    else:
        print(
            "Unable to parse testScriptFile ("
            + str(testScriptfile)
            + "), no test will be run"
        )
        test_fail(testName)

    # If we successfully got a test name, we are ready to try and run the test
    if testName != "unknown":

        # Output file and baseline file names are generated from the test name
        imgFileName = testName + ".png"
        knownGoodFileName = concat_paths(serverOpts.baselineImgDir, imgFileName)
        tempDir = serverOpts.tmpDirectory
        testImgFileName = serverOpts.testImgFile

        testBrowser = test_module_browsers.index(serverOpts.useBrowser)

        # Now try to instantiate and run the test
        try:
            testInstance = instantiate_test_subclass(
                testScriptFile,
                testname=testName,
                host=serverOpts.host,
                port=serverOpts.port,
                browser=testBrowser,
                filename=testImgFileName,
                baseline=knownGoodFileName,
                temporaryDir=tempDir,
            )

            # If we were able to instantiate the test, run it, otherwise we
            # consider it a failure.
            if testInstance is not None:
                try:
                    testInstance.run_test()
                except DependencyError as derr:
                    # TODO: trigger return SKIP_RETURN_CODE when CMake 3 is required
                    print(
                        "Some dependency of this test was not met, allowing it to pass"
                    )
                    test_pass(testName)
            else:
                print("Unable to instantiate test instance, failing test")
                test_fail(testName)
                return

        except Exception as inst:
            import sys, traceback

            tb = sys.exc_info()[2]
            print("Caught an exception while running test script:")
            print("  " + str(type(inst)))
            print("  " + str(inst))
            print("  " + "".join(traceback.format_tb(tb)))
            test_fail(testName)

    # If we were passed a cleanup method to run after testing, invoke it now
    if "cleanupMethod" in serverOpts:
        serverOpts["cleanupMethod"]()


# =============================================================================
# To keep the service module clean, we'll process the test results here, given
# the test result object we generated in "launch_web_test".  It is
# passed back to this function after the service has completed.  Failure of
# of the test is indicated by raising an exception in here.
# =============================================================================
def finalize():
    """
    This function checks the module's global test_module_comm_queue variable for a
    test result.  If one is found and the result is 'fail', then this function
    raises an exception to communicate the failure to the CTest framework.

    In order for a test result to be found in the test_module_comm_queue variable,
    the test script must have called either the testPass or testFail functions
    provided by this test module before returning.
    """

    global test_module_comm_queue

    if test_module_comm_queue is not None:
        resultObject = test_module_comm_queue.get()

        failedATest = False

        for testName in resultObject:
            testResult = resultObject[testName]
            if testResult == "fail":
                print("  Test -> " + testName + ": " + testResult)
                failedATest = True

        if failedATest is True:
            raise Exception(
                "At least one of the requested tests failed.  "
                + "See detailed output, above, for more information"
            )
