api/BrowserProxy integrated and logger

This commit is contained in:
Mathew Guest 2019-12-27 22:23:52 -07:00
parent f070be3353
commit 1208b6b306
4 changed files with 338 additions and 30 deletions

@ -1,3 +1,4 @@
from .api import *
from .page import *
from .fields import *

@ -0,0 +1,304 @@
import logging
# import pickle
import dill as pickle
import selenium.common.exceptions
import selenium.webdriver
import sys
import time
import traceback
LOGGER = logging.getLogger('wabot')
USER_AGENT = "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Trident/5.0)"
REFUSE_AFTER_EXCEPTION = True
PICKLE_FILENAME = '/tmp/nhsnwebdriverdump'
EXECUTOR_PORT = 4444
REMOTE_EXECUTOR = 'http://127.0.0.1:%s/wd/hub'
class BrowserProxy:
def __init__(self, session_name='webdriver', pickle_filename=PICKLE_FILENAME):
self._pickle_filename = pickle_filename
LOGGER.info('creating selenium browser: session name = %s', session_name)
try:
driver_type = "remote_chromium2"
self.driver_type = driver_type
print('session_name:', session_name)
self.driver = self.get_driver(driver_type, session_name)
if not self.driver:
LOGGER.error("failed to get selenium webdriver")
self.good = False
return
except Exception as ex:
print("caught exception here")
print(type(ex), ex)
# self.page = nhsn_lo.pages.Login(self)
# self.good = True
def get_page(self):
return type(self.page)
def set_page(self, page):
LOGGER.trace('switching page to %s' % page)
newpage = page(self)
if hasattr(newpage, 'verify'):
rc = newpage.verify()
if not rc:
LOGGER.error('failed to verify page: %s' % page)
# input('failure')
return False
else:
LOGGER.trace('verified page: %s' % page)
self.page = newpage
return True
def switch_after(self, fn=None, page=None):
if not fn:
return lambda fn: self.switch_after(fn, page = page)
def wrapper(*args, **kwargs):
self.set_page(page)
return fn(*args, **kwargs)
return wrapper
def __getattr__(self, name):
"""
Access method on current page.
When object.method is referenced, python invokes
object.__getattribute__ which checks the instance
for a matching property, If none is found, python
invokes object.__getattr__, which is overridden here.
If the attribute is not found in the main class, this
will search the page object for the method. This
allows invoking page methods through the proxy class.
nhsn.foo() invokes nhsn.page.foo()
"""
# todo(mg) would this be a good spot to inject page verification?
# idea: maybe we could decorate method with error checking and
# error collection? method can return value, rc and this can
# queue actions and error check them as it goes? At the end,
# the user would be able to pull out the errors and check them
if REFUSE_AFTER_EXCEPTION and not self.good:
LOGGER.warn("broken state - refusing to invoke page action: %s" % (name))
return
try:
method = getattr(self.page, name)
return method
except AttributeError as ex:
raise
def __getitem__(self, key):
"""
Access element on the current page.
"""
return self.page[key]
def perform(self, meth, *args, **kwargs):
"""
Calls self.page.method and catches exceptions.
"""
# input('<breakpoint> at perform')
if REFUSE_AFTER_EXCEPTION and not self.good:
LOGGER.warn("broken state - refusing to invoke page action: %s" % (meth))
return
try:
x = getattr(self.page, meth)
except AttributeError as ex:
LOGGER.error("Failed to invoke page action. "\
"page = %s, method = %s" %\
(self.page.__class__, meth))
return None
# Hooks:
# x = self.page.detect_logged_out()
# y = self.page.detected_logged_in()
# print('logged out/in: ', x, y)
LOGGER.trace("invoking %s.%s" % (self.page.__class__.__name__, meth))
try:
return x(*args, **kwargs)
except selenium.common.exceptions.NoSuchWindowException:
self.good = False
except Exception as ex:
print("caught exception: %s" % (type(ex)))
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value,
exc_traceback)
for line in lines:
print(line, end = "")
self.good = False
def reset_good_status(self):
self.good = True
def get_driver(self, browser, session_name='webdriver'):
"""
Returns selenium objects of different types
The combinations of selenium{1,2} {chrome,firefox} are supported.
In the case of selenium 1 (selenium server), the host is a constant
defined at the top of this file.
Args:
browser: The selenium webdriver requested.
If a chromium2 webdriver is requested, selenium
will try to use chromium as the browser. If
firefox2 is requested, selenium will try to use
firefox. Chromium is very fast but it is designed
to be fast for an interactive user which means it
is fast at the cost of processing power and
ram. Firefox is about as fast but uses less
resources.
Available browser drivers are:
chromium2, chromium1, firefox2, firefox1
The associated webdriver has to be installed
and runnable on the system.
Returns:
The selenium webdriver handle.
"""
# nhsn actively rejects http requests that do not request with an
# IE user agent. All of these browser instances have to change the
# user agent in various ways.
LOGGER.debug("creating selenium driver: %s" % (browser))
# todo(mathew guest) turn USER_AGENT into local variable so that it
# could be parameterized someday
# user_agent = USER_AGENT
# todo(mathew guest) fallback drivers if one doesn't exist?
driver = None
if browser == "chromium2": # Selenium 2 - Chrome
driver = self._create_driver_chromium2()
elif browser == 'remote_chromium2':
driver = self._create_driver_remote_chromium2(session_name)
elif browser == "chromium1": # Selenium 1 - Chrome without working user agent switch
driver = self._create_driver_chromium1()
elif browser == "firefox2": # Selenium 2 - Firefox
driver = self._create_driver_firefox2()
elif browser == "firefox1": # Selenium 1 - Firefox
driver = self._create_driver_firefox1()
else:
LOGGER.error("an attempt was made to request an "\
"unsupported (by this product) selenium "\
"webdriver; refusing. requested = %s"\
% (browser))
driver.implicitly_wait(10)
return driver
def _create_driver_remote_chromium2(self, session_name):
print('got this far', session_name)
fp = None
drivers = {}
try:
fp = open(self._pickle_filename, 'rb')
drivers = pickle.load(fp)
if not drivers:
raise Exception
LOGGER.debug('found pickled drivers')
driver = drivers.get(session_name)
if not driver:
raise Exception
LOGGER.debug('connected to pickled webdriver instance')
url = driver.current_url # throw error if driver isn't reliable anymore
LOGGER.info('webdriver instance is ready')
self.driver = driver
return driver
except (FileNotFoundError, IOError) as ex:
self.driver = None
LOGGER.error('unable to connect to existing webdriver: no pickled drivers found')
except Exception as ex:
self.driver = None
LOGGER.error('unable to connect to existing webdriver: %s' % ex)
if self.driver is None:
print('drivers', drivers)
LOGGER.info('creating new webdriver')
opt = selenium.webdriver.chrome.options.Options()
opt.add_argument("--user-agent=" + USER_AGENT)
opt.add_argument("--kiosk-printing")
opt.add_argument("--focus-existing-tab-on-open=false")
driver = selenium.webdriver.Remote(
command_executor=REMOTE_EXECUTOR % (EXECUTOR_PORT),
desired_capabilities = opt.to_capabilities())
print(REMOTE_EXECUTOR % EXECUTOR_PORT)
fp = open(self._pickle_filename, 'wb')
drivers[session_name] = driver
print('b4 pickle')
print(drivers)
# print('#skipping pickle')
pickle.dump(drivers, fp)
# print('after pickle')
self.driver = driver
return driver
def _create_driver_chromium2(self):
opt = selenium.webdriver.chrome.options.Options()
opt.add_argument("--user-agent=" + USER_AGENT)
opt.add_argument("--kiosk-printing")
driver = selenium.webdriver.Chrome(chrome_options = opt)
self.driver = driver
return driver
def _create_driver_chromium1(self):
# Selenium 1 - Chrome without working user agent switch
# These two methods of creation ChromeOptions are equivalent objects
options = selenium.webdriver.ChromeOptions()
options.add_argument("--user-agent=" + USER_AGENT)
driver = selenium.webdriver.Remote(desired_capabilities = options.to_capabilities())
driver = selenium.webdriver.Remote(SELENIUM1_SERVER_PATH,
selenium.webdriver.DesiredCapabilities.CHROME.copy())
return driver
def _create_driver_firefox2(self):
# tmp = selenium.webdriver.FirefoxProfile()
# tmp = None
profile = None
# filename = "/tmp/firefox_profile"
# try:
# fp = open(filename, "rb")
# profile = pickle.load(fp)
# except:
# pass
if not profile:
profile = selenium.webdriver.FirefoxProfile(profile_directory = "/home/mathew/firefox_prof")
profile.set_preference("general.useragent.override", USER_AGENT )
profile.set_preference("browser.helperApps.neverAsk.saveToDisk", "text/csv");
profile.set_preference("network.http.redirection-limit", "0" )
# profile.set_preference("javascript.enabled", False )
# profile.set_preference("print.always_print_silent", True)
profile.set_preference("print.print_to_file", True)
profile.set_preference("print.print_to_filename", "/tmp/print.pdf")
profile.update_preferences()
profile.set_preference("network.http.redirection-limit", "0" )
# with open("/tmp/firefox_profile", "wb") as fp:
# pickle.dump(profile, fp, pickle.HIGHEST_PROTOCOL)
# driver = selenium.webdriver.Firefox()
driver = selenium.webdriver.Firefox(profile)
return driver
def _create_driver_firefox1(self):
profile = selenium.webdriver.FirefoxProfile()
profile.set_preference("general.useragent.override", USER_AGENT )
driver = selenium.webdriver.Remote(SELENIUM1_SERVER_PATH,
selenium.webdriver.DesiredCapabilities.FIREFOX.copy(), browser_profile = profile )
return driver

@ -1,3 +1,6 @@
import logging
LOGGER = logging.getLogger('wabot')
class PageObject:
"""
Wrapper around page element that provides an object orientated interface
@ -53,7 +56,7 @@ class TextField(PageObject):
return self.page.get_el_value(self.el)
def set_value(self, value):
nhsn_lo.logger.info('<%s> set_text (%s)' % (self.name, value))
LOGGER.info('<%s> set_text (%s)' % (self.name, value))
time.sleep(random.uniform(3,5))
return self.page.set_el_value(self.el, value)
@ -69,7 +72,7 @@ class SelectField(PageObject):
return self.page.get_select_value(self.dropdown)
def set_value(self, value):
nhsn_lo.logger.info('<%s> set_select (%s)' % (self.name, value))
LOGGER.info('<%s> set_select (%s)' % (self.name, value))
time.sleep(random.uniform(6,11))
return self.page.set_select_value(self.dropdown, value)
@ -87,7 +90,7 @@ class CheckField(PageObject):
return self.page.get_checkbox_value(self.el, ignore)
def set_checked(self, checked):
nhsn_lo.logger.info('<%s> set_checked (%s)' % (self.name, checked))
LOGGER.info('<%s> set_checked (%s)' % (self.name, checked))
time.sleep(random.uniform(2,3))
return self.page.set_checkbox(self.el, checked)

@ -1,4 +1,4 @@
import nhsn_lo.pages
# import nhsn_lo.pages
import enum
import random
@ -15,6 +15,9 @@ from PIL import Image
import os
import inspect
import logging
LOGGER = logging.getLogger('wabot')
ENABLE_GOTO_CLINIC_SELECT_OPTIMIZATION = True
ALERT_TIMEOUT = 3
@ -32,7 +35,6 @@ class RC(enum.Enum):
DUPLICATE_EVENT = 3
NO_FACILITY = 4
MISSING_REPORTING_PLAN = 5
class Page:
"""
@ -42,6 +44,7 @@ class Page:
def __init__(self, parent):
self.parent = parent
self.driver = parent.driver
LOGGER.info('Loaded Page()')
def find_element_locators(self, key):
"""
@ -63,14 +66,14 @@ class Page:
def get_proxy(self, key):
locators = self.find_element_locators(key)
if not locators:
nhsn_lo.logger.warn('element not found: %s' % (key))
LOGGER.warn('element not found: %s' % (key))
return NullField(None, page=self)
obj_type = locators[0]
accessors = locators[1]
# if not el:
# nhsn_lo.logger.warn('failed to make proxy object: %s' % key)
# LOGGER.warn('failed to make proxy object: %s' % key)
# return NullField(None, page=self)
if obj_type == 'el':
@ -93,9 +96,9 @@ class Page:
name=key)
return obj
else:
nhsn_lo.logger.error('failed to create page element type: %s' % (obj_type))
LOGGER.error('failed to create page element type: %s' % (obj_type))
nhsn_lo.logger.error('requested unknown page element: %s' % key)
LOGGER.error('requested unknown page element: %s' % key)
return
def __getitem__(self, key):
@ -109,7 +112,7 @@ class Page:
"""
locators = self.find_element_locators(key)
if not locators:
nhsn_lo.logger.error('failed to find page element: %s' % (key))
LOGGER.error('failed to find page element: %s' % (key))
return False
try:
@ -133,35 +136,35 @@ class Page:
by = locator[0]
value = locator[1]
el = self.driver.find_element(by=by, value=value)
nhsn_lo.logger.trace('found single element (%s) by %s = %s', key, by, value)
LOGGER.trace('found single element (%s) by %s = %s', key, by, value)
return el
except Exception as ex:
nhsn_lo.logger.warn('failed to find single element (%s) by %s = %s', key, by, value)
LOGGER.warn('failed to find single element (%s) by %s = %s', key, by, value)
continue
elif type_ == 'els':
try:
by = locator[0]
value = locator[1]
els = self.driver.find_elements(by=by, value=value)
nhsn_lo.logger.trace('found elements (%s) by %s = %s', key, by, value)
LOGGER.trace('found elements (%s) by %s = %s', key, by, value)
return els
except Exception as ex:
nhsn_lo.logger.warn('failed to find any elements (%s) by %s = %s', key, by, value)
LOGGER.warn('failed to find any elements (%s) by %s = %s', key, by, value)
continue
elif type_ == 'select':
try:
by = locator[0]
value = locator[1]
el = self.driver.find_element(by=by, value=value)
nhsn_lo.logger.trace('found dropdown element (%s) by %s = %s', key, by, value)
LOGGER.trace('found dropdown element (%s) by %s = %s', key, by, value)
dropdown = selenium.webdriver.support.ui.Select(el)
return dropdown
except Exception as ex:
nhsn_lo.logger.warn('failed to find dropdown (%s) by %s = %s', key, by, value)
LOGGER.warn('failed to find dropdown (%s) by %s = %s', key, by, value)
continue
else:
nhsn_lo.logger.error('unable to find element (%s): unknown type = %s' % (key, type_))
LOGGER.error('unable to find element (%s): unknown type = %s' % (key, type_))
return
except Exception as ex:
print(ex)
@ -178,10 +181,10 @@ class Page:
if not rc:
return rc
self.accept_alert()
nhsn_lo.logger.debug('waiting for page to load...')
LOGGER.debug('waiting for page to load...')
rc = self._wait_for_element_to_go_stale(el)
if not rc:
nhsn_lo.logger.error('failed: timed out waiting for page load')
LOGGER.error('failed: timed out waiting for page load')
return False
return rc
@ -191,7 +194,7 @@ class Page:
wait.until(lambda driver: self.is_element_stale(el))
return True
except selenium.common.exceptions.TimeoutException as ex:
nhsn_lo.logger.error('failed: timed out waiting for page load')
LOGGER.error('failed: timed out waiting for page load')
return False
def click(self, el):
@ -203,13 +206,13 @@ class Page:
where things are clicked at.
"""
if not el:
nhsn_lo.logger.warn("refusing to click null element")
LOGGER.warn("refusing to click null element")
return False
try:
size = el.size
except selenium.common.exceptions.StaleElementReferenceException as ex:
nhsn_lo.logger.error('failed to click element: stale reference')
LOGGER.error('failed to click element: stale reference')
return False
# Use a guassian distribution to click more often towards the center
@ -226,10 +229,10 @@ class Page:
el.click()
return True
except selenium.common.exceptions.ElementNotVisibleException as ex:
nhsn_lo.logger.error('failed to click element: not visible')
LOGGER.error('failed to click element: not visible')
return False
except selenium.common.exceptions.StaleElementReferenceException as ex:
nhsn_lo.logger.error('failed to click element: stale reference')
LOGGER.error('failed to click element: stale reference')
return False
time.sleep(.2)
i = i+1
@ -244,7 +247,7 @@ class Page:
el.click()
return
nhsn_lo.logger.trace( "clicking %s (dim: x = %s, y = %s) at %d, %d" %
LOGGER.debug( "clicking %s (dim: x = %s, y = %s) at %d, %d" %
(self.get_el_identifier(el), size["width"], size["height"], x,y) )
i = 0
@ -257,9 +260,6 @@ class Page:
time.sleep(.5)
i = i + 1
# return el.click()
x = int(x)
@ -379,7 +379,7 @@ class Page:
if not el:
return None
val = el.get_attribute("value")
# nhsn_lo.logger.debug('peeked at field <%s>, value = %s'
# LOGGER.debug('peeked at field <%s>, value = %s'
# % (self.get_el_identifier(el), val))
return val
@ -422,7 +422,7 @@ class Page:
print(type(el_value), type(value))
print("values didn't match.", value, el_value)
return False
# nhsn_lo.logger.info('set field <%s> -> %s; previous = %s'
# LOGGER.info('set field <%s> -> %s; previous = %s'
# % (self.get_el_identifier(el), el_value, prev_val))
return True