first working version

This commit is contained in:
Mathew Guest 2017-08-17 01:27:05 -06:00
commit 309f148700
11 changed files with 384 additions and 0 deletions

4
README.md Normal file

@ -0,0 +1,4 @@
You need selenium-server installed and running:
java -jar /usr/share/selenium-server/selenium-server-standalone.jar -timeout 0

1
app/__init__.py Normal file

@ -0,0 +1 @@

20
app/browser.py Normal file

@ -0,0 +1,20 @@
import pickle
import selenium
import selenium.webdriver
import time
import logging
settings = {}
def init(settings_obj):
global settings
settings = settings_obj
def create_webdriver():
opt = selenium.webdriver.chrome.options.Options()
opt.add_argument('--user-agent=' + settings.WEBDRIVER_USER_AGENT)
opt.add_argument('--kiosk-printing')
opt.add_argument("--focus-existing-tab-on-open=false")
driver = selenium.webdriver.Chrome(chrome_options = opt)
return driver

86
app/cli.py Normal file

@ -0,0 +1,86 @@
#!/usr/bin/env python
import baker
import logging
import readline # Needed for command history <up> and <down> arrows to work
import sys
from . import model
# Problem pages:
# Decision (from politics)
# Malaysia (goes inside parenthesis)
commander = baker.Baker()
settings = {}
def init(settings_obj):
global settings
settings = settings_obj
model.init(settings_obj)
def main():
user_interface = InteractiveInterface()
if len(sys.argv) > 1: # Command line arguments were passed in
# command-line when invoking python
user_interface.run(sys.argv)
else:
user_interface.start_command_loop()
class InteractiveInterface:
def __init__(self):
self.model = model.Model()
def run(self, args, main=True):
try:
commander.run(argv=args, main=True, help_on_error=True,
instance=self)
except baker.CommandError as ex:
logging.warn('incorrect user input: %s' % ex)
commander.usage()
except baker.TopHelp as ex:
commander.usage()
except Exception as ex:
logging.error('caught general exception!!')
print(type(ex), ex)
def start_command_loop(self):
"""
Repeatedly asks the user what command to run until they exit.
"""
commander.usage()
while True:
print('$ ', end = '') # Display to the user a command prompt
try:
inp = input()
except EOFError: # <ctrl>+D will send "End Line" and exit the command loop
break
args = ['', ] + inp.split()
if "--help" in args:
args.remove("--help")
try:
commander.usage(args[1])
except Exception as ex:
print(type(ex), ex)
continue
self.run(args, main=False)
@commander.command
def do_random_page(self):
self.model.do_random_page()
@commander.command
def do_n_pages(self, n):
try:
n = int(n)
except ValueError as ex:
return False
for i in range(n):
self.model.do_random_page()
if __name__ == '__main__':
main()

14
app/dal.py Normal file

@ -0,0 +1,14 @@
import sqlite3
import pycurl
import os
settings = {}
def init(settings_obj):
global settings
settings = settings_obj
class DataLayer:
def __init__(self):
pass

11
app/log.py Normal file

@ -0,0 +1,11 @@
import logging
settings = {}
def init(settings_obj):
global settings
settings = settings_obj
init_logging()
def init_logging():
logging.basicConfig(level=settings.LOG_LEVEL)

64
app/model.py Normal file

@ -0,0 +1,64 @@
import logging
import os
import time
from . import browser
from . import log
from . import dal
from . import pages
settings = {}
def init(settings_obj):
global settings
settings = settings_obj
browser.init(settings_obj)
dal.init(settings_obj)
pages.init(settings_obj)
log.init(settings_obj)
class Model:
def __init__(self):
self._webdriver = None
@property
def webdriver(self):
if not self._webdriver:
self._webdriver = browser.create_webdriver()
return self._webdriver
def do_random_page(self):
# Landing page (select language)
page_api = pages.LandingPage(self.webdriver)
page_api.goto_landing_page()
page_api.select_language(settings.PAGE_LANGUAGE)
# Main page
page_api = pages.MainPage(self.webdriver)
page_api.goto_random_article()
# Article page
pages_visited = []
while True:
page_api = pages.ArticlePage(self.webdriver)
title = page_api.get_title()
logging.debug('visited page: %s' % title)
if title in pages_visited:
logging.info('encountered loop at page = %s' % title)
break
if title == 'Philosophy':
logging.info('made it to philosophy in %s pages' % len(pages_visited))
pages_visited.append(title)
break
pages_visited.append(title)
rc = page_api.click_first_link()
if not rc:
logging.warn('failure: unable to continue (perhaps no valid links?)')
break
print()

156
app/pages.py Normal file

@ -0,0 +1,156 @@
import logging
import re
import selenium
import selenium.webdriver
settings = {}
def init(settings_obj):
global settings
settings = settings_obj
def breakpoint():
if settings.DO_BREAKPOINTS:
input('Breakpoint here. <Enter> to continue...')
class PageRootObject:
def __init__(self, driver=None):
if not driver:
self.driver = create_webdriver()
else:
self.driver = driver
def click(self, el):
self.highlight(el, 'red')
el.click()
def highlight(self, el, color):
if color == 'red':
js = 'arguments[0].setAttribute("style", "background: %s;font-weight:bold;color:#000")' % '#ff9292'
elif color == 'blue':
js = 'arguments[0].setAttribute("style", "background: %s;font-weight:bold;color:#000")' % '#9292ff'
self.driver.execute_script(js, el)
class LandingPage(PageRootObject):
def __init__(self, driver=None):
super().__init__(driver)
def goto_landing_page(self):
self.driver.get(settings.PAGE_BASE_URL)
def select_language(self, language):
link = self.driver.find_element_by_partial_link_text(language)
self.click(link)
class MainPage(PageRootObject):
def __init__(self, driver=None):
super().__init__(driver)
def goto_random_article(self):
link = self.driver.find_element_by_partial_link_text('Random article')
self.click(link)
class ArticlePage(PageRootObject):
elements = {
'main-window-content-text-id': 'mw-content-text',
'article-title': 'firstHeading',
}
def __init__(self, driver=None):
super().__init__(driver)
def get_title(self):
heading = self.driver.find_element_by_id(ArticlePage.elements['article-title'])
return heading.text
def click_first_link(self):
return self._iterate_paragraphs()
def _iterate_paragraphs(self):
main_window = self.driver.find_element_by_id(ArticlePage.elements['main-window-content-text-id'])
paragraphs = main_window.find_elements_by_xpath('./div[contains(@class, "mw-parser-output")]/p[not(span/span[contains(@id, "coordinates")])]')
for p in paragraphs:
rc = self._parse_paragraph(p)
if rc:
return True
def _parse_paragraph(self, p):
links = p.find_elements_by_xpath('.//a')
if len(links) == 0:
return False
for link in links:
logging.debug('processing link: %s' % link.text)
if not self._is_valid_link(p, link):
logging.debug('skipping link inside parenthesis: %s' % link.text)
self.highlight(link, 'blue')
continue
self.highlight(link, 'red')
logging.info('selected link: %s' % link.text)
breakpoint()
link.click()
return True
def _is_valid_link(self, p, el):
a = self._is_link_in_parenthesis(p, el)
b = self._is_link_a_footnote(el)
c = self._is_link_pronounciation(el)
d = self._is_link_audio(el)
print(a, b, c, d)
if not a and not b and not c and not d:
return True
return False
def _is_link_in_parenthesis(self, p, el):
# link_text = el.text
link_text = el.get_attribute('outerHTML')
p_text = p.get_attribute('innerHTML')
regex_str = '\(.*?\)'
regex = re.compile(regex_str, flags=re.UNICODE)
match = regex.search(p_text)
if not match:
return False
while match is not None:
match_text = match.group(0)
match_idx = match.end(0)
print(link_text)
print(match_text)
if link_text in match_text:
return True
match = regex.search(p_text, match_idx+1)
# Is the link inside parenthesis?
# regex_str = '\([^()]*<a.*?>%s</a>[^())]*\)' % re.escape(link_text)
# regex_str = '\(.*<a.*?>%s.*\)' % link_text
# print(regex_str)
# regex = re.compile(regex_str, flags=re.UNICODE)
# match = re.search(regex_str, p_text)
# if match: # Pattern is found in the text
# print(match.group(0))
# return True
# else:
# return False
def _is_link_a_footnote(self, el):
href = el.get_attribute('href')
if '#cite_note' in href:
return True
if '#cnote' in href:
return True
return False
def _is_link_pronounciation(self, el):
href = el.get_attribute('href')
if '/wiki/Help:IPA' in href:
return True
return False
def _is_link_audio(self, el):
href = el.get_attribute('href')
if '.ogg' in href:
return True
return False

6
launcher.py Normal file

@ -0,0 +1,6 @@
import app.cli
import settings
app.cli.init(settings.Settings)
app.cli.main()
input('<enter> to exit')

22
settings.py Normal file

@ -0,0 +1,22 @@
import logging
class Settings:
# Application Parameters
LOG_LEVEL = logging.INFO
DO_BREAKPOINTS = False
# Web Driver Parameters
WEBDRIVER_USER_AGENT = 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Trident/5.0)'
WEBDRIVER_SERIALIZE_DUMP_LOC = '/tmp/saved_webdrivers.pickle'
WEBDRIVER_STORED_NAME = 'sok-scrape'
WEBDRIVER_EXECUTOR_PORT = 4444
WEBDRIVER_REMOTE_EXECUTOR = 'http://127.0.0.1:%s/wd/hub'
# Web Page Parameters
PAGE_BASE_URL = 'https://www.wikipedia.org/'
PAGE_LANGUAGE = 'English'
PAGE_DELAY = 0
# Data Layer Parameters
SQLITE_DBFILE = '/home/mathew/.wikicrawler.db'

0
setup.py Normal file