first try

This commit is contained in:
Mathew Guest 2019-12-27 19:28:52 -07:00
commit f070be3353
6 changed files with 744 additions and 0 deletions

56
.gitignore vendored Normal file

@ -0,0 +1,56 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.cache
nosetests.xml
coverage.xml
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
*.swp

43
setup.py Executable file

@ -0,0 +1,43 @@
#!/usr/bin/env python
#
# Usage:
#
# First, enable the python environment you want to install to, or if installing
# system-wide then ensure you're logged in with sufficient permissions
# (admin or root to install to system directories)
#
# installation:
#
# $ ./setup.py install
#
# de-installation:
#
# $ pip uninstall <app>
from setuptools import setup
__project__ = 'WaBoT'
__version__ = '0.1.0'
setup(
name = __project__,
version = __version__,
description = '',
author = 'Mathew Guest',
author_email = 't3h.zavage@gmail.com',
# Third-party dependencies; will be automatically installed
install_requires = (
'selenium',
),
# Local packages to be installed (our packages)
packages = (
'wabot',
),
# Binaries/Executables to be installed to system
scripts=()
)

3
wabot/__init__.py Normal file

@ -0,0 +1,3 @@
from .api import *
from .page import *

0
wabot/api.py Normal file

103
wabot/fields.py Normal file

@ -0,0 +1,103 @@
class PageObject:
"""
Wrapper around page element that provides an object orientated interface
to elements. Can be sublcassed to integrate more complicated instruction.
"""
def __init__(self, page, accessors=None, name=None):
self.page = page
self.driver = page.driver
self.accessors = accessors
self.name = name
# self.el = None
def __getattr__(self, name):
# if not self.el:
# raise AttributeError
try:
return getattr(self.el, name)
except AttributeError as ex:
raise
def selenium_element(self, accessors=None):
"""
Creates and returns a selenium webelement for
interfacing with the page.
"""
if not accessors:
accessors = self.accessors
accessor = accessors
by = accessor[0]
value = accessor[1]
el = self.page.driver.find_element(by=by, value=value)
return el
def click(self):
return self.page.click(self.el)
def click_and_go(self):
time.sleep(random.uniform(3,6))
return self.page.click_and_go(self.el)
# def get_value(self):
# def set_value(self, value):
# def custom, e.g. zoom(self, x, y, dx, dy)
class TextField(PageObject):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
el = self.selenium_element(kwargs['accessors'])
self.el = el
def get_value(self):
return self.page.get_el_value(self.el)
def set_value(self, value):
nhsn_lo.logger.info('<%s> set_text (%s)' % (self.name, value))
time.sleep(random.uniform(3,5))
return self.page.set_el_value(self.el, value)
class SelectField(PageObject):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
el = self.selenium_element(kwargs['accessors'])
dropdown = selenium.webdriver.support.ui.Select(el)
self.el = el
self.dropdown = dropdown
def get_value(self):
return self.page.get_select_value(self.dropdown)
def set_value(self, value):
nhsn_lo.logger.info('<%s> set_select (%s)' % (self.name, value))
time.sleep(random.uniform(6,11))
return self.page.set_select_value(self.dropdown, value)
def select_by_index(self, idx):
return self.dropdown.select_by_index(idx)
class CheckField(PageObject):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
el = self.selenium_element(kwargs['accessors'])
self.el = el
def get_checked(self, ignore=False):
return self.page.get_checkbox_value(self.el, ignore)
def set_checked(self, checked):
nhsn_lo.logger.info('<%s> set_checked (%s)' % (self.name, checked))
time.sleep(random.uniform(2,3))
return self.page.set_checkbox(self.el, checked)
class NullField(PageObject):
def __init__(self, el, *args, **kwargs):
self.el = el
def __getattr__(self, attr):
raise Exception
def __bool__(self):
return False

539
wabot/page.py Normal file

@ -0,0 +1,539 @@
import nhsn_lo.pages
import enum
import random
import selenium.webdriver
import time
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import selenium.common.exceptions
from .fields import *
import selenium.webdriver.support.ui
from selenium.webdriver.common.by import By
from PIL import Image
import os
import inspect
ENABLE_GOTO_CLINIC_SELECT_OPTIMIZATION = True
ALERT_TIMEOUT = 3
class OBJ_T(enum.Enum):
ELEMENT = 1
ELEMENT_ARRAY = 2
SELECT = 3
CUSTOM = 4
class RC(enum.Enum):
FAILURE = 0
SUCCESS = 1
EVENT_WITHIN_21_DAYS = 2
DUPLICATE_EVENT = 3
NO_FACILITY = 4
MISSING_REPORTING_PLAN = 5
class Page:
"""
Provides ancillary utility methods such as finding
elements, interacting with forms, and taking screenshots.
"""
def __init__(self, parent):
self.parent = parent
self.driver = parent.driver
def find_element_locators(self, key):
"""
Returns the generator data for an element, i.e. what
class to use and the arguments to construct it with.
This is found/stored in the static class definition,
page.elements = {key: (accessors)}
"""
class_hierarchy = inspect.getmro(type(self))
locators = None
for cls in class_hierarchy:
if not hasattr(cls, 'elements'): # no elements defined here...
continue
locators = cls.elements.get(key)
if not locators: # our element isn't defined here
continue
return locators # return first found match
def get_proxy(self, key):
locators = self.find_element_locators(key)
if not locators:
nhsn_lo.logger.warn('element not found: %s' % (key))
return NullField(None, page=self)
obj_type = locators[0]
accessors = locators[1]
# if not el:
# nhsn_lo.logger.warn('failed to make proxy object: %s' % key)
# return NullField(None, page=self)
if obj_type == 'el':
obj = TextField(page=self, accessors=accessors, name=key)
return obj
elif obj_type == 'select':
obj = SelectField(page=self, accessors=accessors, name=key)
return obj
elif obj_type == 'checkbox':
obj = CheckField(page=self, accessors=accessors, name=key)
return obj
elif obj_type == 'els':
el = self.find_element(key)
return self.find_element(key)
elif isinstance(obj_type, type):
# by = accessors[0]
# value = accessors[1]
# el = self.driver.find_element(by=by, value=value)
obj = obj_type(idelement=accessors[1], page=self, accessors=accessors,
name=key)
return obj
else:
nhsn_lo.logger.error('failed to create page element type: %s' % (obj_type))
nhsn_lo.logger.error('requested unknown page element: %s' % key)
return
def __getitem__(self, key):
# input('__getitem__ %s' % key)
# return self.find_element(key) # DEPRECATED
return self.get_proxy(key)
def find_element(self, key):
"""
Accesses element on page
"""
locators = self.find_element_locators(key)
if not locators:
nhsn_lo.logger.error('failed to find page element: %s' % (key))
return False
try:
it = iter(locators)
type_ = next(it)
if type_ == 'custom':
cls = next(it)
locators = []
for locator in it:
locators.append(locator)
obj = cls(self, OBJ_T.CUSTOM, locators)
# print(obj)
return obj
for locator in it:
# print('locator =', locator)
if type_ == 'el' or type_ == 'checkbox':
try:
by = locator[0]
value = locator[1]
el = self.driver.find_element(by=by, value=value)
nhsn_lo.logger.trace('found single element (%s) by %s = %s', key, by, value)
return el
except Exception as ex:
nhsn_lo.logger.warn('failed to find single element (%s) by %s = %s', key, by, value)
continue
elif type_ == 'els':
try:
by = locator[0]
value = locator[1]
els = self.driver.find_elements(by=by, value=value)
nhsn_lo.logger.trace('found elements (%s) by %s = %s', key, by, value)
return els
except Exception as ex:
nhsn_lo.logger.warn('failed to find any elements (%s) by %s = %s', key, by, value)
continue
elif type_ == 'select':
try:
by = locator[0]
value = locator[1]
el = self.driver.find_element(by=by, value=value)
nhsn_lo.logger.trace('found dropdown element (%s) by %s = %s', key, by, value)
dropdown = selenium.webdriver.support.ui.Select(el)
return dropdown
except Exception as ex:
nhsn_lo.logger.warn('failed to find dropdown (%s) by %s = %s', key, by, value)
continue
else:
nhsn_lo.logger.error('unable to find element (%s): unknown type = %s' % (key, type_))
return
except Exception as ex:
print(ex)
def verify(self):
# The page proxy api calls PageObject.verify() if it exists, which
# should call super().verify(). It is necessary for every class in
# the hierarchy to have a callable verify(). Every class in the
# hierarchy should implement verify method or subclass from here.
return True
def click_and_go(self, el):
rc = self.click(el)
if not rc:
return rc
self.accept_alert()
nhsn_lo.logger.debug('waiting for page to load...')
rc = self._wait_for_element_to_go_stale(el)
if not rc:
nhsn_lo.logger.error('failed: timed out waiting for page load')
return False
return rc
def _wait_for_element_to_go_stale(self, el):
try:
wait = selenium.webdriver.support.ui.WebDriverWait(self.driver, 10)
wait.until(lambda driver: self.is_element_stale(el))
return True
except selenium.common.exceptions.TimeoutException as ex:
nhsn_lo.logger.error('failed: timed out waiting for page load')
return False
def click(self, el):
"""
Clicks element at a random coordinate based on the size of the
element.
By default, selenium clicks elements at pos(0, 0). NHSN records
where things are clicked at.
"""
if not el:
nhsn_lo.logger.warn("refusing to click null element")
return False
try:
size = el.size
except selenium.common.exceptions.StaleElementReferenceException as ex:
nhsn_lo.logger.error('failed to click element: stale reference')
return False
# Use a guassian distribution to click more often towards the center
width = size["width"]
if width > 4:
x = random.gauss((width/2), (width/7))
if x < 0: x = 1
elif x > width: x = width - 1
else:
i = 0
while i < 10:
try:
if el.is_displayed() and el.is_enabled():
el.click()
return True
except selenium.common.exceptions.ElementNotVisibleException as ex:
nhsn_lo.logger.error('failed to click element: not visible')
return False
except selenium.common.exceptions.StaleElementReferenceException as ex:
nhsn_lo.logger.error('failed to click element: stale reference')
return False
time.sleep(.2)
i = i+1
return False
height = size["height"]
if height > 4:
y = random.gauss((height/2), (height/7))
if y < 0: y = 1
elif y > height: y = height -1
else:
el.click()
return
nhsn_lo.logger.trace( "clicking %s (dim: x = %s, y = %s) at %d, %d" %
(self.get_el_identifier(el), size["width"], size["height"], x,y) )
i = 0
n = 20
while i < n:
if el.is_displayed() and el.is_enabled():
break
if i == n:
raise Exception("unable to click element")
time.sleep(.5)
i = i + 1
# return el.click()
x = int(x)
y = int(y)
# print('size is', width, height)
# print('clicking through selenium actions at', x, y)
actions = selenium.webdriver.ActionChains(self.driver)
actions.move_to_element_with_offset(el, x, y)
actions.click()
try:
actions.perform()
except Exception as ex: # type is selenium timeout... not sure
print(ex)
nhsn_lo.pages.logger.error("%s" % (ex))
return False
return True
def save_screenshot(self, filename):
nhsn_lo.pages.logger.info("saving page screenshot: %s" % (filename))
# Chromium2 screenshot only captures viewable area,
# selenium is waiting on WebDriver which is waiting
# on chromium. Doesn't look like it will be fixed soon.
# more info:
# https://code.google.com/p/chromedriver/issues/detail?id=294
#
# For now, the workaround is stitch it together for chromium.
# Firefox2 works fine but has it's own problems, hence the
# chromium stitch.
if self.parent.driver_type == "chromium2":
self.chrome_take_full_page_screenshot(filename)
else:
self.driver.get_screenshot_as_file(filename)
def chrome_take_full_page_screenshot(self, file):
self.driver.maximize_window()
# Global.scroll_to_zero()
time.sleep(0.2)
# Log.Debug("Starting chrome full page screenshot workaround ...")
total_width = self.driver.execute_script("return document.body.offsetWidth")
total_height = self.driver.execute_script("return document.body.parentNode.scrollHeight")
viewport_width = self.driver.execute_script("return document.body.clientWidth")
viewport_height = self.driver.execute_script("return window.innerHeight")
# Log.Debug("Total: ({0}, {1}), Viewport: ({2},{3})".format(total_width, total_height,viewport_width,viewport_height))
rectangles = []
i = 0
while i < total_height:
ii = 0
top_height = i + viewport_height
if top_height > total_height:
top_height = total_height
while ii < total_width:
top_width = ii + viewport_width
if top_width > total_width:
top_width = total_width
# Log.Debug("Appending rectangle ({0},{1},{2},{3})".format(ii,i,top_width,top_height))
rectangles.append((ii,i,top_width,top_height))
ii = ii + viewport_width
i = i + viewport_height
stitched_image = Image.new('RGB', (total_width, total_height))
previous = None
part = 0
for rectangle in rectangles:
if not previous is None:
self.driver.execute_script("window.scrollTo({0}, {1})".format(rectangle[0], rectangle[1]))
# Log.Debug("Scrolled To ({0},{1})".format(rectangle[0], rectangle[1]))
time.sleep(0.2)
tmp_path = "/tmp/"
file_name = "{0}scroll_{1}_part_{2}.png".format(tmp_path, 1, part)
# file_name = "/tmp/screen.png"
# Log.Debug("Capturing {0} ...".format(file_name))
self.driver.get_screenshot_as_file(file_name)
screenshot = Image.open(file_name)
# offset = (rectangle[0], rectangle[1])
if rectangle[1] + viewport_height > total_height:
offset = (rectangle[0], total_height - viewport_height)
else:
offset = (rectangle[0], rectangle[1])
# Log.Debug("Adding to stitched image with offset ({0}, {1})".format(offset[0],offset[1]))
stitched_image.paste(screenshot, offset)
del screenshot
os.remove(file_name)
part = part + 1
previous = rectangle
stitched_image.save(file)
# Log.Debug("Finishing chrome full page screenshot workaround ...")
return True
def get_el_value(self, el):
if not el:
return None
val = el.get_attribute("value")
# nhsn_lo.logger.debug('peeked at field <%s>, value = %s'
# % (self.get_el_identifier(el), val))
return val
def get_el_text(self, el):
if not el:
return None
return el.text
def set_el_value(self, el, value, js=False, slow_type=False):
if not el:
return False
prev_val = self.get_el_value(el)
if value is None:
el.clear()
return
el.clear()
if js:
# doesn't work
rc = self.driver.execute_script("arguments[0].setAttribute('value', '%s');arguments[0].onchange();" % (value), el)
else:
if slow_type:
try:
for ch in value:
el.send_keys(ch)
time.sleep(1)
except Exception as ex:
nhsn_lo.pages.logger.error("failed to send keys, element in unknown state!!: %s" % (ex))
return False
else:
try:
el.send_keys(value)
except Exception as ex:
nhsn_lo.pages.logger.error("failed to send keys, element in unknown state!!: %s" % (ex))
return False
el_value = self.get_el_value(el)
if str(el_value) != str(value):
print(type(el_value), type(value))
print("values didn't match.", value, el_value)
return False
# nhsn_lo.logger.info('set field <%s> -> %s; previous = %s'
# % (self.get_el_identifier(el), el_value, prev_val))
return True
def get_select_value(self, select):
if not select:
nhsn_lo.pages.logger.error("tried to get select value of NULL element")
return None
try:
value = select.first_selected_option.get_attribute("value")
except selenium.common.exceptions.NoSuchElementException as ex:
value = None
return value
def set_select_value(self, select, value=None, text=None):
if not select:
return False
# nhsn_lo.pages.logger.trace("setting select value (%s) for (%s)"\
# % (value, self.get_el_identifier(select._el)))
if value:
try:
select.select_by_value(str(value))
return True
except Exception as ex:
return False
elif text:
select.select_by_visible_text(text)
return False
def set_checkbox(self, el, checked):
if not el:
return False
is_enabled = el.is_enabled()
is_selected = el.is_selected()
# print("enabled, selected, check:", is_enabled, is_selected, checked)
if not is_enabled:
return False
if checked:
if not is_selected:
# nhsn_lo.pages.logger.trace("checking unchecked box (%s)" % (self.get_el_identifier(el)))
self.click(el)
else:
if is_selected:
# nhsn_lo.pages.logger.trace("unchecking checked box (%s)" % (self.get_el_identifier(el)))
self.click(el)
val = self.get_checkbox_value(el)
if val != checked:
pass
# False != None
# print('checkbox not checked properly?', val, checked)
# input('checkbox not checked properly?')
# todo(mathew guest) assert checked
return True
def get_checkbox_value(self, el, ignore_disabled=False):
"""
Returns the checked status of a checkbox element. True if enabled
and checked, False if disabled or unchecked.
"""
if not el:
return None
return (ignore_disabled or el.is_enabled()) and el.is_selected()
def get_el_identifier(self, el):
"""
Returns a quick identifier to refer an element by.
"""
id_ = el.get_attribute("id")
name = el.get_attribute("name")
class_ = el.get_attribute("style")
if id_:
return id_
if name:
return name
if class_:
return class_
def accept_alert(self, accept=True, timeout=ALERT_TIMEOUT):
"""
Looks for and tries to accept a javascript alert if it exists.
returns bool: whether or not an alert was accepted.
There is a timeout penalty when calling this method if there is no alert.
"""
try:
selenium.webdriver.support.ui.WebDriverWait(self.driver, timeout).\
until(selenium.webdriver.support.expected_conditions.alert_is_present(),
'Timed out waiting alert' )
alert = self.driver.switch_to_alert()
text = alert.text
if accept:
alert.accept()
else:
alert.dismiss()
nhsn_lo.pages.logger.trace('caught js alert: %s' % text)
return text
except selenium.common.exceptions.TimeoutException:
nhsn_lo.pages.logger.trace('no js alert present')
return False
def is_element_stale(self, webelement):
"""
Checks if a webelement is stale.
@param webelement: A selenium webdriver webelement
"""
try:
webelement.tag_name
except selenium.common.exceptions.StaleElementReferenceException:
return True
except NameError:
pass
except:
pass
return False