mirror of
https://git.zavage.net/Zavage-Software/wabot.git
synced 2025-02-20 12:09:21 -07:00
Compare commits
10 Commits
f070be3353
...
bd8a3f35f2
Author | SHA1 | Date | |
---|---|---|---|
bd8a3f35f2 | |||
fd2aa7c524 | |||
3ef55c0590 | |||
e0e70cc539 | |||
6b3de90926 | |||
372d5d0ad4 | |||
359eb96f21 | |||
d028dea189 | |||
b64cd13525 | |||
1208b6b306 |
1
setup.py
1
setup.py
@ -29,6 +29,7 @@ setup(
|
||||
|
||||
# Third-party dependencies; will be automatically installed
|
||||
install_requires = (
|
||||
'dill',
|
||||
'selenium',
|
||||
),
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
from .api import *
|
||||
from .page import *
|
||||
from .fields import *
|
||||
|
||||
|
251
wabot/api.py
251
wabot/api.py
@ -0,0 +1,251 @@
|
||||
from .create_browser import *
|
||||
|
||||
import appdirs
|
||||
import logging
|
||||
# import pickle
|
||||
import dill as pickle
|
||||
import selenium.common.exceptions
|
||||
import selenium.webdriver
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
import os
|
||||
|
||||
USER_AGENT = "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Trident/5.0)"
|
||||
REFUSE_AFTER_EXCEPTION = True
|
||||
EXECUTOR_PORT = 4444
|
||||
REMOTE_EXECUTOR = 'http://127.0.0.1:%s/wd/hub'
|
||||
|
||||
# PICKLE_FILENAME = '/tmp/nhsnwebdriverdump'
|
||||
# PICKLE_FILENAME = os.path.join(
|
||||
# appdirs.user_data_dir('wabot'),
|
||||
# 'saved_browser_instances.pickle'
|
||||
# )
|
||||
|
||||
LOGGER = logging.getLogger('wabot')
|
||||
|
||||
DEFAULT_WEBDRIVER_TYPE = 'firefox1'
|
||||
|
||||
class BrowserProxy:
|
||||
def __init__(
|
||||
self,
|
||||
session_name='webdriver',
|
||||
pickle_filename=None,
|
||||
phantom=False,
|
||||
webdriver_type=None # remote_chromium2
|
||||
):
|
||||
"""
|
||||
BrowserProxy wraps a selenium webdriver instance and provides utility
|
||||
functions for automation webpages.
|
||||
"""
|
||||
LOGGER.info('requesting selenium browser instance (%s): instance_name = %s', webdriver_type, session_name)
|
||||
|
||||
# if pickle_filename is None:
|
||||
# pickle_filename = PICKLE_FILENAME
|
||||
|
||||
# self._pickle_filename = pickle_filename
|
||||
|
||||
if webdriver_type is None:
|
||||
webdriver_type = 'firefox1'
|
||||
|
||||
assert webdriver_type in (
|
||||
'firefox1',
|
||||
'firefox2',
|
||||
'chromium2',
|
||||
'remote_chromium',
|
||||
'remote_chrome',
|
||||
'phantomjs',
|
||||
'remote_firefox'
|
||||
), 'webdriver_type must be firefox1, firefox2, chromium2, remote_chromium2, or phantomjs'
|
||||
|
||||
|
||||
|
||||
|
||||
try:
|
||||
self.driver_type = webdriver_type
|
||||
if phantom:
|
||||
pass
|
||||
# driver_type = "phantomjs"
|
||||
self.driver = self.get_driver(webdriver_type, session_name)
|
||||
if not self.driver:
|
||||
LOGGER.error('failed to get selenium webdriver')
|
||||
self.good = False
|
||||
return
|
||||
except Exception as ex:
|
||||
print('caught exception at BrowserProxy().__init__')
|
||||
print(type(ex), ex)
|
||||
raise
|
||||
|
||||
# self.page = nhsn_lo.pages.Login(self)
|
||||
# self.good = True
|
||||
|
||||
def get_page(self):
|
||||
return type(self.page)
|
||||
|
||||
def set_page(self, page):
|
||||
LOGGER.trace('switching page to %s' % page)
|
||||
newpage = page(self)
|
||||
if hasattr(newpage, 'verify'):
|
||||
rc = newpage.verify()
|
||||
if not rc:
|
||||
LOGGER.error('failed to verify page: %s' % page)
|
||||
# input('failure')
|
||||
return False
|
||||
else:
|
||||
LOGGER.trace('verified page: %s' % page)
|
||||
|
||||
self.page = newpage
|
||||
return True
|
||||
|
||||
def switch_after(self, fn=None, page=None):
|
||||
if not fn:
|
||||
return lambda fn: self.switch_after(fn, page = page)
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
self.set_page(page)
|
||||
return fn(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
def __getattr__(self, name):
|
||||
"""
|
||||
Access method on current page.
|
||||
|
||||
When object.method is referenced, python invokes
|
||||
object.__getattribute__ which checks the instance
|
||||
for a matching property, If none is found, python
|
||||
invokes object.__getattr__, which is overridden here.
|
||||
If the attribute is not found in the main class, this
|
||||
will search the page object for the method. This
|
||||
allows invoking page methods through the proxy class.
|
||||
|
||||
nhsn.foo() invokes nhsn.page.foo()
|
||||
"""
|
||||
# todo(mg) would this be a good spot to inject page verification?
|
||||
# idea: maybe we could decorate method with error checking and
|
||||
# error collection? method can return value, rc and this can
|
||||
# queue actions and error check them as it goes? At the end,
|
||||
# the user would be able to pull out the errors and check them
|
||||
if REFUSE_AFTER_EXCEPTION and not self.good:
|
||||
LOGGER.warn("broken state - refusing to invoke page action: %s" % (name))
|
||||
return
|
||||
|
||||
try:
|
||||
method = getattr(self.page, name)
|
||||
return method
|
||||
except AttributeError as ex:
|
||||
raise
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""
|
||||
Access element on the current page.
|
||||
"""
|
||||
return self.page[key]
|
||||
|
||||
def perform(self, meth, *args, **kwargs):
|
||||
"""
|
||||
Calls self.page.method and catches exceptions.
|
||||
"""
|
||||
# input('<breakpoint> at perform')
|
||||
if REFUSE_AFTER_EXCEPTION and not self.good:
|
||||
LOGGER.warn("broken state - refusing to invoke page action: %s" % (meth))
|
||||
return
|
||||
|
||||
try:
|
||||
x = getattr(self.page, meth)
|
||||
except AttributeError as ex:
|
||||
LOGGER.error("Failed to invoke page action. "\
|
||||
"page = %s, method = %s" %\
|
||||
(self.page.__class__, meth))
|
||||
return None
|
||||
|
||||
# Hooks:
|
||||
# x = self.page.detect_logged_out()
|
||||
# y = self.page.detected_logged_in()
|
||||
# print('logged out/in: ', x, y)
|
||||
|
||||
LOGGER.trace("invoking %s.%s" % (self.page.__class__.__name__, meth))
|
||||
try:
|
||||
return x(*args, **kwargs)
|
||||
except selenium.common.exceptions.NoSuchWindowException:
|
||||
self.good = False
|
||||
except Exception as ex:
|
||||
print("caught exception: %s" % (type(ex)))
|
||||
exc_type, exc_value, exc_traceback = sys.exc_info()
|
||||
lines = traceback.format_exception(exc_type, exc_value,
|
||||
exc_traceback)
|
||||
for line in lines:
|
||||
print(line, end = "")
|
||||
|
||||
self.good = False
|
||||
|
||||
def reset_good_status(self):
|
||||
self.good = True
|
||||
|
||||
def get_driver(self, browser, session_name='webdriver'):
|
||||
"""
|
||||
Returns selenium objects of different types
|
||||
|
||||
The combinations of selenium{1,2} {chrome,firefox} are supported.
|
||||
In the case of selenium 1 (selenium server), the host is a constant
|
||||
defined at the top of this file.
|
||||
|
||||
Args:
|
||||
browser: The selenium webdriver requested.
|
||||
If a chromium2 webdriver is requested, selenium
|
||||
will try to use chromium as the browser. If
|
||||
firefox2 is requested, selenium will try to use
|
||||
firefox. Chromium is very fast but it is designed
|
||||
to be fast for an interactive user which means it
|
||||
is fast at the cost of processing power and
|
||||
ram. Firefox is about as fast but uses less
|
||||
resources.
|
||||
|
||||
Available browser drivers are:
|
||||
chromium2, chromium1, firefox2, firefox1
|
||||
|
||||
The associated webdriver has to be installed
|
||||
and runnable on the system.
|
||||
Returns:
|
||||
The selenium webdriver handle.
|
||||
"""
|
||||
LOGGER.debug('requesting selenium browser instance: type = %s' % (browser))
|
||||
|
||||
driver = None
|
||||
browser_factory = CreateBrowser()
|
||||
if browser == 'chromium2': # Selenium 2 - Chrome
|
||||
driver = self._create_driver_chromium()
|
||||
|
||||
elif (
|
||||
browser == 'remote_chromium'
|
||||
or browser == 'remote_chrome'
|
||||
):
|
||||
driver = browser_factory._create_driver_remote_chromium(session_name)
|
||||
|
||||
elif browser == 'remote_firefox': # Selenium 1 - Firefox
|
||||
driver = browser_factory._create_driver_remote_firefox(session_name)
|
||||
|
||||
|
||||
# TODO(MG) need to redo/rename below. selenium2/ not remote, uses native browser
|
||||
# webdrviers instead of javascript
|
||||
elif browser == 'chromium1': # Selenium 1 - Chrome without working user agent switch
|
||||
driver = self._create_driver_chromium1()
|
||||
|
||||
elif browser == 'firefox2': # Selenium 2 - Firefox
|
||||
driver = self._create_driver_firefox2()
|
||||
|
||||
|
||||
|
||||
elif browser == 'phantomjs':
|
||||
driver = self._create_driver_phantomjs()
|
||||
else:
|
||||
LOGGER.error(
|
||||
'an attempt was made to request an '\
|
||||
'unsupported (by this product) selenium '\
|
||||
'webdriver; refusing. requested = %s'\
|
||||
% (browser)
|
||||
)
|
||||
|
||||
driver.implicitly_wait(10)
|
||||
return driver
|
||||
|
316
wabot/create_browser.py
Normal file
316
wabot/create_browser.py
Normal file
@ -0,0 +1,316 @@
|
||||
|
||||
import appdirs
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import selenium
|
||||
import pickle
|
||||
|
||||
|
||||
LOGGER = logging.getLogger('wabot')
|
||||
|
||||
USER_AGENT = "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Trident/5.0)"
|
||||
REFUSE_AFTER_EXCEPTION = True
|
||||
EXECUTOR_PORT = 4444
|
||||
REMOTE_EXECUTOR = 'http://127.0.0.1:%s/wd/hub'
|
||||
|
||||
|
||||
class PickledBrowserReference:
|
||||
"""
|
||||
Structure for saving a webdriver instance. Also includes the timestamp
|
||||
to help invalidate old references.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.browser_ref = None
|
||||
self.timestamp = None
|
||||
|
||||
# Factory Creator - instantiate Selenium1 and Selenium2 webdriver instances.
|
||||
class CreateBrowser:
|
||||
"""
|
||||
Creates and instantiates selenium webbrowser instances.
|
||||
|
||||
Two strategies: remote via selenium server or local.
|
||||
|
||||
The advantage of using remote is the browser instance can stay
|
||||
open after the process closes. This allows you to disconnect
|
||||
and re-connect to the same browser instance with different
|
||||
processes.
|
||||
"""
|
||||
def __init__(self, pickle_filename=None):
|
||||
self._pickle_filename = None
|
||||
if pickle_filename is not None:
|
||||
self.pickle_filename = pickle_filename
|
||||
self._has_cleaned = False
|
||||
|
||||
@property
|
||||
def pickle_filename(self):
|
||||
"""
|
||||
Reference to the user-data-dir pickle target filename, for
|
||||
saving browser references.
|
||||
"""
|
||||
if self._pickle_filename is None:
|
||||
self._pickle_filename = os.path.join(
|
||||
appdirs.user_data_dir('wabot'),
|
||||
'saved_browser_instances.pickle'
|
||||
)
|
||||
return self._pickle_filename
|
||||
else:
|
||||
return self._pickle_filename
|
||||
|
||||
@pickle_filename.setter
|
||||
def pickle_filename(self, value):
|
||||
self._pickle_filename = value
|
||||
|
||||
def _clean_old_pickles(self):
|
||||
if self._has_cleaned is True:
|
||||
LOGGER.debug('not cleaning pickles twice')
|
||||
return
|
||||
|
||||
p = self.pickle_filename
|
||||
try:
|
||||
fp = open(p, 'rb')
|
||||
pickles = pickle.load(fp)
|
||||
if not pickles:
|
||||
raise Exception
|
||||
|
||||
|
||||
LOGGER.info('cleaning any old pickle refs')
|
||||
|
||||
|
||||
for pickle_ref in pickles:
|
||||
driver_ref = pickles[pickle_ref]
|
||||
ts = driver_ref.timestamp
|
||||
|
||||
if datetime.datetime.now() - driver_ref.timestamp >= datetime.timedelta(days=3):
|
||||
LOGGER.info('deleting old pickled driver ref: %s at %s', pickle_ref, ts)
|
||||
pickles.pop(final_name)
|
||||
|
||||
self._has_cleaned = True
|
||||
|
||||
except (FileNotFoundError, IOError) as ex:
|
||||
LOGGER.error('unable to load pickles: no pickled drivers found')
|
||||
except Exception as ex:
|
||||
print(ex)
|
||||
pass
|
||||
|
||||
def _load_from_pickle(self, final_name):
|
||||
p = self.pickle_filename
|
||||
|
||||
|
||||
# Definitely no browser instance already, we must instantiate
|
||||
if not os.path.exists(p):
|
||||
LOGGER.debug('no pickled file for saved browser instances (nothing saved yet)')
|
||||
return
|
||||
|
||||
# There MAY be an open browser or an invalidated reference to a once-open browser
|
||||
if os.path.exists(p):
|
||||
LOGGER.debug('found pickled file for saved browser instances: %s', p)
|
||||
# First, see if existing session_name browser instance exists
|
||||
fp = None
|
||||
|
||||
self._clean_old_pickles()
|
||||
try:
|
||||
fp = open(p, 'rb')
|
||||
# drivers = pickle.load(fp)
|
||||
pickles = pickle.load(fp)
|
||||
if not pickles:
|
||||
raise Exception
|
||||
|
||||
for idx, saved_instance in enumerate(pickles):
|
||||
LOGGER.debug('found saved browser instances (%s): %s', idx, saved_instance)
|
||||
|
||||
driver_ref = pickles.get(final_name)
|
||||
if not driver_ref:
|
||||
raise Exception
|
||||
|
||||
# if datetime.datetime.now() - driver_ref.timestamp >= datetime.timedelta(days=3):
|
||||
# pickles.pop(final_name)
|
||||
|
||||
driver = driver_ref.browser_ref
|
||||
LOGGER.debug('connected to pickled webdriver instance: %s', final_name)
|
||||
url = driver.current_url # throw error if driver isn't reliable anymore
|
||||
LOGGER.info('webdriver instance is ready')
|
||||
# self.driver = driver
|
||||
return driver
|
||||
except (FileNotFoundError, IOError) as ex:
|
||||
LOGGER.error('unable to connect to existing webdriver: no pickled drivers found')
|
||||
except Exception as ex:
|
||||
print(ex)
|
||||
self.driver = None
|
||||
|
||||
def _save_to_pickle(self, final_name, driver_ref):
|
||||
p = self.pickle_filename
|
||||
pickles = {}
|
||||
|
||||
# Definitely no browser instance already, we must instantiate
|
||||
if not os.path.exists(p):
|
||||
LOGGER.debug('no pickled file for saved browser instances (nothing saved yet)')
|
||||
|
||||
# Create usr-app-dir if doesn't exist
|
||||
dirname = os.path.dirname(p)
|
||||
if not os.path.isdir(dirname):
|
||||
os.mkdir(dirname)
|
||||
|
||||
# There MAY be an open browser or an invalidated reference to a once-open browser
|
||||
if os.path.exists(p):
|
||||
LOGGER.debug('found existing pickle file for saved browser instances: %s', p)
|
||||
# First, see if existing session_name browser instance exists
|
||||
|
||||
|
||||
self._clean_old_pickles()
|
||||
|
||||
fp = open(p, 'rb')
|
||||
pickles = pickle.load(fp)
|
||||
if not pickles:
|
||||
raise Exception
|
||||
LOGGER.debug('found saved browser instances while saving: %s', list(pickles.keys()))
|
||||
fp.close()
|
||||
|
||||
|
||||
# Save to pickle - creating file if necessary
|
||||
pkl = PickledBrowserReference()
|
||||
pkl.browser_ref = driver_ref
|
||||
pkl.timestamp = datetime.datetime.now()
|
||||
pickles[final_name] = pkl
|
||||
|
||||
|
||||
|
||||
import pprint;
|
||||
print('pickles:')
|
||||
pprint.pprint(pickles)
|
||||
|
||||
# pickle file must be created
|
||||
fp = open(p, 'wb')
|
||||
# drivers[final_name] = driver
|
||||
LOGGER.info('saving browser instance to pickle: %s', final_name)
|
||||
pickle.dump(pickles, fp)
|
||||
fp.close()
|
||||
|
||||
def _create_driver_remote_chromium(self, session_name):
|
||||
"""
|
||||
Creates and returns Selenium1 chromium instance.
|
||||
"""
|
||||
p = self.pickle_filename
|
||||
# e.g.: remote-chromium-webdriver
|
||||
final_name = '{}-{}'.format('remote-chromium', session_name)
|
||||
driver = None
|
||||
|
||||
driver = self._load_from_pickle(final_name)
|
||||
if driver is not None:
|
||||
LOGGER.info('webdriver instance is ready (from pickle)')
|
||||
return driver
|
||||
|
||||
# At this point, need to instantiate a new browser instance
|
||||
sel_host = REMOTE_EXECUTOR % (EXECUTOR_PORT)
|
||||
LOGGER.info('instantianting new browser instance (remote_chromium)')
|
||||
LOGGER.info('remote selenium: %s', sel_host)
|
||||
|
||||
opt = selenium.webdriver.chrome.options.Options()
|
||||
opt.add_argument("--user-agent=" + USER_AGENT)
|
||||
opt.add_argument("--kiosk-printing")
|
||||
opt.add_argument("--focus-existing-tab-on-open=false")
|
||||
driver = selenium.webdriver.Remote(
|
||||
command_executor=sel_host,
|
||||
desired_capabilities = opt.to_capabilities()
|
||||
)
|
||||
|
||||
self._save_to_pickle(final_name, driver)
|
||||
|
||||
# self.driver = driver
|
||||
return driver
|
||||
|
||||
|
||||
# Pickle impl. is duped here
|
||||
|
||||
def _create_driver_remote_firefox(self, session_name):
|
||||
"""
|
||||
Creates and returns Selenium1 firefox instance.
|
||||
"""
|
||||
final_name = '{}-{}'.format('remote-firefox', session_name)
|
||||
driver = None
|
||||
|
||||
driver = self._load_from_pickle(final_name)
|
||||
if driver is not None:
|
||||
LOGGER.info('webdriver instance is ready (from pickle)')
|
||||
return driver
|
||||
|
||||
# At this point, need to instantiate a new browser instance
|
||||
sel_host = REMOTE_EXECUTOR % (EXECUTOR_PORT)
|
||||
LOGGER.info('instantianting new browser instance (remote_firefox)')
|
||||
LOGGER.info('remote selenium: %s', sel_host)
|
||||
|
||||
profile = selenium.webdriver.FirefoxProfile()
|
||||
profile.set_preference("general.useragent.override", USER_AGENT)
|
||||
driver = selenium.webdriver.Remote(
|
||||
# SELENIUM1_SERVER_PATH,
|
||||
sel_host,
|
||||
selenium.webdriver.DesiredCapabilities.FIREFOX.copy(),
|
||||
browser_profile = profile
|
||||
)
|
||||
|
||||
self._save_to_pickle(final_name, driver)
|
||||
return driver
|
||||
|
||||
|
||||
|
||||
def _create_driver_chromium2(self):
|
||||
opt = selenium.webdriver.chrome.options.Options()
|
||||
opt.add_argument("--user-agent=" + USER_AGENT)
|
||||
opt.add_argument("--kiosk-printing")
|
||||
driver = selenium.webdriver.Chrome(chrome_options = opt)
|
||||
self.driver = driver
|
||||
return driver
|
||||
|
||||
def _create_driver_chromium1(self):
|
||||
# Selenium 1 - Chrome without working user agent switch
|
||||
# These two methods of creation ChromeOptions are equivalent objects
|
||||
options = selenium.webdriver.ChromeOptions()
|
||||
options.add_argument("--user-agent=" + USER_AGENT)
|
||||
driver = selenium.webdriver.Remote(desired_capabilities = options.to_capabilities())
|
||||
driver = selenium.webdriver.Remote(SELENIUM1_SERVER_PATH,
|
||||
selenium.webdriver.DesiredCapabilities.CHROME.copy())
|
||||
return driver
|
||||
|
||||
def _create_driver_firefox2(self):
|
||||
# tmp = selenium.webdriver.FirefoxProfile()
|
||||
# tmp = None
|
||||
profile = None
|
||||
# filename = "/tmp/firefox_profile"
|
||||
# try:
|
||||
# fp = open(filename, "rb")
|
||||
# profile = pickle.load(fp)
|
||||
# except:
|
||||
# pass
|
||||
|
||||
if not profile:
|
||||
profile = selenium.webdriver.FirefoxProfile(profile_directory = "/home/mathew/firefox_prof")
|
||||
profile.set_preference("general.useragent.override", USER_AGENT )
|
||||
profile.set_preference("browser.helperApps.neverAsk.saveToDisk", "text/csv");
|
||||
profile.set_preference("network.http.redirection-limit", "0" )
|
||||
# profile.set_preference("javascript.enabled", False )
|
||||
# profile.set_preference("print.always_print_silent", True)
|
||||
profile.set_preference("print.print_to_file", True)
|
||||
profile.set_preference("print.print_to_filename", "/tmp/print.pdf")
|
||||
profile.update_preferences()
|
||||
profile.set_preference("network.http.redirection-limit", "0" )
|
||||
# with open("/tmp/firefox_profile", "wb") as fp:
|
||||
# pickle.dump(profile, fp, pickle.HIGHEST_PROTOCOL)
|
||||
|
||||
# driver = selenium.webdriver.Firefox()
|
||||
driver = selenium.webdriver.Firefox(profile)
|
||||
return driver
|
||||
|
||||
|
||||
|
||||
def _create_driver_phantomjs(self):
|
||||
# Note(MG): Selenium support for PhantomJS has been deprecated, please use headless
|
||||
# driver = selenium.webdriver.PhantomJS()
|
||||
# return driver
|
||||
opt = selenium.webdriver.chrome.options.Options()
|
||||
opt.add_argument("--user-agent=" + USER_AGENT)
|
||||
opt.add_argument("--kiosk-printing")
|
||||
opt.add_argument("--headless")
|
||||
driver = selenium.webdriver.Chrome(chrome_options = opt)
|
||||
driver.set_window_size(838, 907)
|
||||
self.driver = driver
|
||||
return driver
|
@ -1,3 +1,10 @@
|
||||
import logging
|
||||
import random
|
||||
import selenium
|
||||
import time
|
||||
|
||||
LOGGER = logging.getLogger('wabot')
|
||||
|
||||
class PageObject:
|
||||
"""
|
||||
Wrapper around page element that provides an object orientated interface
|
||||
@ -53,7 +60,7 @@ class TextField(PageObject):
|
||||
return self.page.get_el_value(self.el)
|
||||
|
||||
def set_value(self, value):
|
||||
nhsn_lo.logger.info('<%s> set_text (%s)' % (self.name, value))
|
||||
LOGGER.info('[%s]->set_text (%s)' % (self.name, value))
|
||||
time.sleep(random.uniform(3,5))
|
||||
return self.page.set_el_value(self.el, value)
|
||||
|
||||
@ -69,7 +76,7 @@ class SelectField(PageObject):
|
||||
return self.page.get_select_value(self.dropdown)
|
||||
|
||||
def set_value(self, value):
|
||||
nhsn_lo.logger.info('<%s> set_select (%s)' % (self.name, value))
|
||||
LOGGER.info('[%s]->set_select (%s)' % (self.name, value))
|
||||
time.sleep(random.uniform(6,11))
|
||||
return self.page.set_select_value(self.dropdown, value)
|
||||
|
||||
@ -87,7 +94,7 @@ class CheckField(PageObject):
|
||||
return self.page.get_checkbox_value(self.el, ignore)
|
||||
|
||||
def set_checked(self, checked):
|
||||
nhsn_lo.logger.info('<%s> set_checked (%s)' % (self.name, checked))
|
||||
LOGGER.info('[%s]->set_checked (%s)' % (self.name, checked))
|
||||
time.sleep(random.uniform(2,3))
|
||||
return self.page.set_checkbox(self.el, checked)
|
||||
|
||||
|
@ -1,5 +1,3 @@
|
||||
import nhsn_lo.pages
|
||||
|
||||
import enum
|
||||
import random
|
||||
import selenium.webdriver
|
||||
@ -15,6 +13,9 @@ from PIL import Image
|
||||
import os
|
||||
import inspect
|
||||
|
||||
import logging
|
||||
|
||||
LOGGER = logging.getLogger('wabot')
|
||||
|
||||
ENABLE_GOTO_CLINIC_SELECT_OPTIMIZATION = True
|
||||
ALERT_TIMEOUT = 3
|
||||
@ -32,7 +33,6 @@ class RC(enum.Enum):
|
||||
DUPLICATE_EVENT = 3
|
||||
NO_FACILITY = 4
|
||||
MISSING_REPORTING_PLAN = 5
|
||||
|
||||
|
||||
class Page:
|
||||
"""
|
||||
@ -42,6 +42,7 @@ class Page:
|
||||
def __init__(self, parent):
|
||||
self.parent = parent
|
||||
self.driver = parent.driver
|
||||
LOGGER.info("Loaded Page() '%s'", self.__class__.__name__)
|
||||
|
||||
def find_element_locators(self, key):
|
||||
"""
|
||||
@ -63,14 +64,14 @@ class Page:
|
||||
def get_proxy(self, key):
|
||||
locators = self.find_element_locators(key)
|
||||
if not locators:
|
||||
nhsn_lo.logger.warn('element not found: %s' % (key))
|
||||
LOGGER.warn('element not found: %s' % (key))
|
||||
return NullField(None, page=self)
|
||||
|
||||
obj_type = locators[0]
|
||||
accessors = locators[1]
|
||||
|
||||
# if not el:
|
||||
# nhsn_lo.logger.warn('failed to make proxy object: %s' % key)
|
||||
# LOGGER.warn('failed to make proxy object: %s' % key)
|
||||
# return NullField(None, page=self)
|
||||
|
||||
if obj_type == 'el':
|
||||
@ -93,9 +94,9 @@ class Page:
|
||||
name=key)
|
||||
return obj
|
||||
else:
|
||||
nhsn_lo.logger.error('failed to create page element type: %s' % (obj_type))
|
||||
LOGGER.error('failed to create page element type: %s' % (obj_type))
|
||||
|
||||
nhsn_lo.logger.error('requested unknown page element: %s' % key)
|
||||
LOGGER.error('requested unknown page element: %s' % key)
|
||||
return
|
||||
|
||||
def __getitem__(self, key):
|
||||
@ -109,7 +110,7 @@ class Page:
|
||||
"""
|
||||
locators = self.find_element_locators(key)
|
||||
if not locators:
|
||||
nhsn_lo.logger.error('failed to find page element: %s' % (key))
|
||||
LOGGER.error('failed to find page element: %s' % (key))
|
||||
return False
|
||||
|
||||
try:
|
||||
@ -133,35 +134,35 @@ class Page:
|
||||
by = locator[0]
|
||||
value = locator[1]
|
||||
el = self.driver.find_element(by=by, value=value)
|
||||
nhsn_lo.logger.trace('found single element (%s) by %s = %s', key, by, value)
|
||||
LOGGER.trace('found single element (%s) by %s = %s', key, by, value)
|
||||
return el
|
||||
except Exception as ex:
|
||||
nhsn_lo.logger.warn('failed to find single element (%s) by %s = %s', key, by, value)
|
||||
LOGGER.warn('failed to find single element (%s) by %s = %s', key, by, value)
|
||||
continue
|
||||
elif type_ == 'els':
|
||||
try:
|
||||
by = locator[0]
|
||||
value = locator[1]
|
||||
els = self.driver.find_elements(by=by, value=value)
|
||||
nhsn_lo.logger.trace('found elements (%s) by %s = %s', key, by, value)
|
||||
LOGGER.trace('found elements (%s) by %s = %s', key, by, value)
|
||||
return els
|
||||
except Exception as ex:
|
||||
nhsn_lo.logger.warn('failed to find any elements (%s) by %s = %s', key, by, value)
|
||||
LOGGER.warn('failed to find any elements (%s) by %s = %s', key, by, value)
|
||||
continue
|
||||
elif type_ == 'select':
|
||||
try:
|
||||
by = locator[0]
|
||||
value = locator[1]
|
||||
el = self.driver.find_element(by=by, value=value)
|
||||
nhsn_lo.logger.trace('found dropdown element (%s) by %s = %s', key, by, value)
|
||||
LOGGER.trace('found dropdown element (%s) by %s = %s', key, by, value)
|
||||
dropdown = selenium.webdriver.support.ui.Select(el)
|
||||
return dropdown
|
||||
except Exception as ex:
|
||||
nhsn_lo.logger.warn('failed to find dropdown (%s) by %s = %s', key, by, value)
|
||||
LOGGER.warn('failed to find dropdown (%s) by %s = %s', key, by, value)
|
||||
continue
|
||||
|
||||
else:
|
||||
nhsn_lo.logger.error('unable to find element (%s): unknown type = %s' % (key, type_))
|
||||
LOGGER.error('unable to find element (%s): unknown type = %s' % (key, type_))
|
||||
return
|
||||
except Exception as ex:
|
||||
print(ex)
|
||||
@ -178,10 +179,10 @@ class Page:
|
||||
if not rc:
|
||||
return rc
|
||||
self.accept_alert()
|
||||
nhsn_lo.logger.debug('waiting for page to load...')
|
||||
LOGGER.debug('waiting for page to load...')
|
||||
rc = self._wait_for_element_to_go_stale(el)
|
||||
if not rc:
|
||||
nhsn_lo.logger.error('failed: timed out waiting for page load')
|
||||
LOGGER.error('failed: timed out waiting for page load')
|
||||
return False
|
||||
return rc
|
||||
|
||||
@ -191,7 +192,7 @@ class Page:
|
||||
wait.until(lambda driver: self.is_element_stale(el))
|
||||
return True
|
||||
except selenium.common.exceptions.TimeoutException as ex:
|
||||
nhsn_lo.logger.error('failed: timed out waiting for page load')
|
||||
LOGGER.error('failed: timed out waiting for page load')
|
||||
return False
|
||||
|
||||
def click(self, el):
|
||||
@ -203,13 +204,13 @@ class Page:
|
||||
where things are clicked at.
|
||||
"""
|
||||
if not el:
|
||||
nhsn_lo.logger.warn("refusing to click null element")
|
||||
LOGGER.warn("refusing to click null element")
|
||||
return False
|
||||
|
||||
try:
|
||||
size = el.size
|
||||
except selenium.common.exceptions.StaleElementReferenceException as ex:
|
||||
nhsn_lo.logger.error('failed to click element: stale reference')
|
||||
LOGGER.error('failed to click element: stale reference')
|
||||
return False
|
||||
|
||||
# Use a guassian distribution to click more often towards the center
|
||||
@ -226,10 +227,10 @@ class Page:
|
||||
el.click()
|
||||
return True
|
||||
except selenium.common.exceptions.ElementNotVisibleException as ex:
|
||||
nhsn_lo.logger.error('failed to click element: not visible')
|
||||
LOGGER.error('failed to click element: not visible')
|
||||
return False
|
||||
except selenium.common.exceptions.StaleElementReferenceException as ex:
|
||||
nhsn_lo.logger.error('failed to click element: stale reference')
|
||||
LOGGER.error('failed to click element: stale reference')
|
||||
return False
|
||||
time.sleep(.2)
|
||||
i = i+1
|
||||
@ -244,7 +245,7 @@ class Page:
|
||||
el.click()
|
||||
return
|
||||
|
||||
nhsn_lo.logger.trace( "clicking %s (dim: x = %s, y = %s) at %d, %d" %
|
||||
LOGGER.debug( "clicking %s (dim: x = %s, y = %s) at %d, %d" %
|
||||
(self.get_el_identifier(el), size["width"], size["height"], x,y) )
|
||||
|
||||
i = 0
|
||||
@ -257,9 +258,6 @@ class Page:
|
||||
time.sleep(.5)
|
||||
i = i + 1
|
||||
|
||||
|
||||
|
||||
|
||||
# return el.click()
|
||||
|
||||
x = int(x)
|
||||
@ -273,12 +271,12 @@ class Page:
|
||||
actions.perform()
|
||||
except Exception as ex: # type is selenium timeout... not sure
|
||||
print(ex)
|
||||
nhsn_lo.pages.logger.error("%s" % (ex))
|
||||
LOGGER.error("%s" % (ex))
|
||||
return False
|
||||
return True
|
||||
|
||||
def save_screenshot(self, filename):
|
||||
nhsn_lo.pages.logger.info("saving page screenshot: %s" % (filename))
|
||||
LOGGER.info("saving page screenshot: %s" % (filename))
|
||||
|
||||
|
||||
# Chromium2 screenshot only captures viewable area,
|
||||
@ -379,7 +377,7 @@ class Page:
|
||||
if not el:
|
||||
return None
|
||||
val = el.get_attribute("value")
|
||||
# nhsn_lo.logger.debug('peeked at field <%s>, value = %s'
|
||||
# LOGGER.debug('peeked at field <%s>, value = %s'
|
||||
# % (self.get_el_identifier(el), val))
|
||||
return val
|
||||
|
||||
@ -408,13 +406,13 @@ class Page:
|
||||
el.send_keys(ch)
|
||||
time.sleep(1)
|
||||
except Exception as ex:
|
||||
nhsn_lo.pages.logger.error("failed to send keys, element in unknown state!!: %s" % (ex))
|
||||
LOGGER.error("failed to send keys, element in unknown state!!: %s" % (ex))
|
||||
return False
|
||||
else:
|
||||
try:
|
||||
el.send_keys(value)
|
||||
except Exception as ex:
|
||||
nhsn_lo.pages.logger.error("failed to send keys, element in unknown state!!: %s" % (ex))
|
||||
LOGGER.error("failed to send keys, element in unknown state!!: %s" % (ex))
|
||||
return False
|
||||
|
||||
el_value = self.get_el_value(el)
|
||||
@ -422,13 +420,13 @@ class Page:
|
||||
print(type(el_value), type(value))
|
||||
print("values didn't match.", value, el_value)
|
||||
return False
|
||||
# nhsn_lo.logger.info('set field <%s> -> %s; previous = %s'
|
||||
# LOGGER.info('set field <%s> -> %s; previous = %s'
|
||||
# % (self.get_el_identifier(el), el_value, prev_val))
|
||||
return True
|
||||
|
||||
def get_select_value(self, select):
|
||||
if not select:
|
||||
nhsn_lo.pages.logger.error("tried to get select value of NULL element")
|
||||
LOGGER.error("tried to get select value of NULL element")
|
||||
return None
|
||||
try:
|
||||
value = select.first_selected_option.get_attribute("value")
|
||||
@ -439,7 +437,7 @@ class Page:
|
||||
def set_select_value(self, select, value=None, text=None):
|
||||
if not select:
|
||||
return False
|
||||
# nhsn_lo.pages.logger.trace("setting select value (%s) for (%s)"\
|
||||
# LOGGER.trace("setting select value (%s) for (%s)"\
|
||||
# % (value, self.get_el_identifier(select._el)))
|
||||
if value:
|
||||
try:
|
||||
@ -461,11 +459,11 @@ class Page:
|
||||
return False
|
||||
if checked:
|
||||
if not is_selected:
|
||||
# nhsn_lo.pages.logger.trace("checking unchecked box (%s)" % (self.get_el_identifier(el)))
|
||||
# LOGGER.trace("checking unchecked box (%s)" % (self.get_el_identifier(el)))
|
||||
self.click(el)
|
||||
else:
|
||||
if is_selected:
|
||||
# nhsn_lo.pages.logger.trace("unchecking checked box (%s)" % (self.get_el_identifier(el)))
|
||||
# LOGGER.trace("unchecking checked box (%s)" % (self.get_el_identifier(el)))
|
||||
self.click(el)
|
||||
val = self.get_checkbox_value(el)
|
||||
if val != checked:
|
||||
@ -516,10 +514,10 @@ class Page:
|
||||
alert.accept()
|
||||
else:
|
||||
alert.dismiss()
|
||||
nhsn_lo.pages.logger.trace('caught js alert: %s' % text)
|
||||
LOGGER.trace('caught js alert: %s' % text)
|
||||
return text
|
||||
except selenium.common.exceptions.TimeoutException:
|
||||
nhsn_lo.pages.logger.trace('no js alert present')
|
||||
LOGGER.trace('no js alert present')
|
||||
return False
|
||||
|
||||
def is_element_stale(self, webelement):
|
||||
|
Loading…
Reference in New Issue
Block a user