diff --git a/tools/search/crawler/.gitignore b/tools/search/crawler/.gitignore new file mode 100644 index 00000000..8ea1f054 --- /dev/null +++ b/tools/search/crawler/.gitignore @@ -0,0 +1 @@ +search.ini diff --git a/tools/search/crawler/lib/__init__.py b/tools/search/crawler/lib/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tools/search/crawler/lib/archives.py b/tools/search/crawler/lib/archives.py new file mode 100644 index 00000000..fe136a2e --- /dev/null +++ b/tools/search/crawler/lib/archives.py @@ -0,0 +1,167 @@ +import datetime +import httplib +from Queue import Queue +import threading +import sys +import time + +from lib.log import log +from lib.parsers import ArchivesParser + +class MultiListCrawler(object): + def __init__(self, lists, conn, status_interval=30, commit_interval=500): + self.lists = lists + self.conn = conn + self.status_interval = status_interval + self.commit_interval = commit_interval + + self.queue = Queue() + self.counter = 0 + self.counterlock = threading.RLock() + self.stopevent = threading.Event() + + def crawl(self, full=False, month=None): + # Each thread can independently run on one month, so we can get + # a reasonable spread. Therefor, submit them as separate jobs + # to the queue. + for listid, listname in self.lists: + if full: + # Generate a sequence of everything to index + for year in range(1997, datetime.datetime.now().year+1): + for month in range(1,13): + self.queue.put((listid, listname, year, month, -1)) + elif month: + # Do one specific month + pieces = month.split("-") + if len(pieces) != 2: + print "Month format is -, cannot parse '%s'" % month + sys.exit(1) + try: + pieces = [int(x) for x in pieces] + except: + print "Month format is -, cannot convert '%s' to integers" % month + sys.exit(1) + self.queue.put((listid, listname, pieces[0], pieces[1], -1)) + else: + # In incremental scan, we check the current month and the + # previous one, but only for new messages. + curs = self.conn.cursor() + curr = datetime.date.today() + if curr.month == 1: + prev = datetime.date(curr.year-1, 12, 1) + else: + prev = datetime.date(curr.year, curr.month-1, 1) + + for d in curr, prev: + # Figure out what the highest indexed page in this + # month is. + curs.execute("SELECT max(msgnum) FROM messages WHERE list=%(list)s AND year=%(year)s AND month=%(month)s", { + 'list': listid, + 'year': d.year, + 'month': d.month, + }) + x = curs.fetchall() + if x[0][0]: + maxmsg = x[0][0] + else: + maxmsg = -1 + self.queue.put((listid, listname, d.year, d.month, maxmsg)) + + for x in range(5): + t = threading.Thread(name="Indexer %s" % x, + target = lambda: self.crawl_from_queue()) + t.daemon= True + t.start() + + t = threading.Thread(name="statusthread", target = lambda: self.status_thread()) + t.daemon = True + t.start() + + # XXX: need to find a way to deal with all threads crashed and + # not done here yet! + self.queue.join() + self.stopevent.set() + + return self.counter + + def status_thread(self): + lastcommit = 0 + starttime = time.time() + while not self.stopevent.is_set(): + self.stopevent.wait(self.status_interval) + nowtime = time.time() + with self.counterlock: + log("Indexed %s messages so far (%s active threads, %s months still queued, %.1f msg/sec)" % ( + self.counter, + threading.active_count() - 2 , # main thread + status thread + self.queue.qsize(), + self.counter / (nowtime - starttime), + )) + # Commit every 500 messages + if self.counter - lastcommit > self.commit_interval: + lastcommit = self.counter + self.conn.commit() + + def crawl_from_queue(self): + while not self.stopevent.is_set(): + (listid, listname, year, month, maxmsg) = self.queue.get() + self.crawl_month(listid, listname, year, month, maxmsg) + self.queue.task_done() + + def crawl_month(self, listid, listname, year, month, maxmsg): + currentmsg = maxmsg + while True: + currentmsg += 1 + try: + if not self.crawl_single_message(listid, listname, year, month, currentmsg): + break + except Exception, e: + log("Exception when crawling %s/%s/%s/%s - %s" % ( + listname, year, month, currentmsg, e)) + # Continue on to try the next message + + def crawl_single_message(self, listid, listname, year, month, msgnum): + curs = self.conn.cursor() + h = httplib.HTTPConnection(host="archives.postgresql.org", + port=80, + strict=True, + timeout=10) + url = "/%s/%04d-%02d/msg%05d.php" % ( + listname, + year, + month, + msgnum) + h.putrequest("GET", url) + h.putheader("User-agent", "pgsearch/0.2") + h.putheader("Connection", "close") + h.endheaders() + resp = h.getresponse() + txt = resp.read() + h.close() + + if resp.status == 404: + # Past the end of the month + return False + elif resp.status != 200: + raise Exception("%s/%s/%s/%s returned status %s" % (listname, year, month, msgnum, reps.status)) + + # Else we have the message! + p = ArchivesParser() + if not p.parse(txt): + log("Failed to parse %s/%s/%s/%s" % (listname, year, month, msgnum)) + # We return true to move on to the next message anyway! + return True + curs.execute("INSERT INTO messages (list, year, month, msgnum, date, subject, author, txt, fti) VALUES (%(listid)s, %(year)s, %(month)s, %(msgnum)s, %(date)s, %(subject)s, %(author)s, %(txt)s, setweight(to_tsvector('pg', %(subject)s), 'A') || to_tsvector('pg', %(txt)s))", { + 'listid': listid, + 'year': year, + 'month': month, + 'msgnum': msgnum, + 'date': p.date, + 'subject': p.subject[:127], + 'author': p.author[:127], + 'txt': p.body, + }) + with self.counterlock: + self.counter += 1 + + return True diff --git a/tools/search/crawler/lib/basecrawler.py b/tools/search/crawler/lib/basecrawler.py new file mode 100644 index 00000000..915d73a2 --- /dev/null +++ b/tools/search/crawler/lib/basecrawler.py @@ -0,0 +1,250 @@ +import datetime +import httplib +import time +from email.utils import formatdate, parsedate +import urlparse + +from Queue import Queue +import threading + +from lib.log import log +from lib.parsers import GenericHtmlParser, lossy_unicode + +class BaseSiteCrawler(object): + def __init__(self, hostname, dbconn, siteid, serverip=None): + self.hostname = hostname + self.dbconn = dbconn + self.siteid = siteid + self.serverip = serverip + self.pages_crawled = {} + self.pages_new = 0 + self.pages_updated = 0 + self.pages_deleted = 0 + self.status_interval = 5 + + curs = dbconn.cursor() + curs.execute("SELECT suburl, lastscanned FROM webpages WHERE site=%(id)s AND lastscanned IS NOT NULL", {'id': siteid}) + self.scantimes = dict(curs.fetchall()) + self.queue = Queue() + self.counterlock = threading.RLock() + self.stopevent = threading.Event() + + def crawl(self): + self.init_crawl() + + # Fire off worker threads + for x in range(5): + t = threading.Thread(name="Indexer %s" % x, + target = lambda: self.crawl_from_queue()) + t.daemon = True + t.start() + + t = threading.Thread(name="statusthread", target = lambda: self.status_thread()) + t.daemon = True + t.start() + + # XXX: need to find a way to deal with all threads crashed and + # not done here yet! + self.queue.join() + self.stopevent.set() + + # Remove all pages that we didn't crawl + curs = self.dbconn.cursor() + curs.execute("DELETE FROM webpages WHERE site=%(site)s AND NOT suburl=ANY(%(urls)s)", { + 'site': self.siteid, + 'urls': self.pages_crawled.keys(), + }) + if curs.rowcount: + log("Deleted %s pages no longer accessible" % curs.rowcount) + self.pages_deleted += curs.rowcount + + self.dbconn.commit() + log("Considered %s pages, wrote %s updated and %s new, deleted %s." % (len(self.pages_crawled), self.pages_updated, self.pages_new, self.pages_deleted)) + + def status_thread(self): + starttime = time.time() + while not self.stopevent.is_set(): + self.stopevent.wait(self.status_interval) + nowtime = time.time() + with self.counterlock: + log("Considered %s pages, wrote %s upd, %s new, %s del (%s threads, %s in queue, %.1f pages/sec)" % ( + len(self.pages_crawled), + self.pages_updated, + self.pages_new, + self.pages_deleted, + threading.active_count() - 2, + self.queue.qsize(), + len(self.pages_crawled) / (nowtime - starttime), + )) + + def crawl_from_queue(self): + while not self.stopevent.is_set(): + (url, relprio) = self.queue.get() + try: + self.crawl_page(url, relprio) + except Exception, e: + log("Exception crawling '%s': %s" % (url, e)) + self.queue.task_done() + + def exclude_url(self, url): + return False + + def crawl_page(self, url, relprio): + if self.pages_crawled.has_key(url) or self.pages_crawled.has_key(url+"/"): + return + + if self.exclude_url(url): + return + + self.pages_crawled[url] = 1 + (result, pagedata, lastmod) = self.fetch_page(url) + + if result == 0: + if pagedata == None: + # Result ok but no data, means that the page was not modified. + # Thus we can happily consider ourselves done here. + return + else: + # Page failed to load or was a redirect, so remove from database + curs = self.dbconn.cursor() + curs.execute("DELETE FROM webpages WHERE site=%(id)s AND suburl=%(url)s", { + 'id': self.siteid, + 'url': url, + }) + with self.counterlock: + self.pages_deleted += curs.rowcount + + if result == 1: + # Page was a redirect, so crawl into that page if we haven't + # already done so. + self.queue_url(pagedata) + return + + # Try to convert pagedata to a unicode string + pagedata = lossy_unicode(pagedata) + try: + self.page = self.parse_html(pagedata) + except Exception, e: + log("Failed to parse HTML for %s" % url) + log(e) + return + + self.save_page(url, lastmod, relprio) + self.post_process_page(url) + + def save_page(self, url, lastmod, relprio): + if relprio == 0.0: + relprio = 0.5 + params = { + 'title': self.page.title, + 'txt': self.page.gettext(), + 'lastmod': lastmod, + 'site': self.siteid, + 'url': url, + 'relprio': relprio, + } + curs = self.dbconn.cursor() + curs.execute("UPDATE webpages SET title=%(title)s, txt=%(txt)s, fti=to_tsvector(%(txt)s), lastscanned=%(lastmod)s, relprio=%(relprio)s WHERE site=%(site)s AND suburl=%(url)s", params) + if curs.rowcount != 1: + curs.execute("INSERT INTO webpages (site, suburl, title, txt, fti, lastscanned, relprio) VALUES (%(site)s, %(url)s, %(title)s, %(txt)s, to_tsvector(%(txt)s), %(lastmod)s, %(relprio)s)", params) + with self.counterlock: + self.pages_new += 1 + else: + with self.counterlock: + self.pages_updated += 1 + + ACCEPTED_CONTENTTYPES = ("text/html", "text/plain", ) + def accept_contenttype(self, contenttype): + # Split apart if there is a "; charset=" in it + if contenttype.find(";"): + contenttype = contenttype.split(';',2)[0] + return contenttype in self.ACCEPTED_CONTENTTYPES + + def fetch_page(self, url): + try: + # Unfortunatley, persistent connections seem quite unreliable, + # so create a new one for each page. + h = httplib.HTTPConnection(host=self.serverip and self.serverip or self.hostname, + port=80, + strict=True, + timeout=10) + h.putrequest("GET", url) + h.putheader("User-agent","pgsearch/0.2") + if self.serverip: + h.putheader("Host", self.hostname) + h.putheader("Connection","close") + if self.scantimes.has_key(url): + h.putheader("If-Modified-Since", formatdate(time.mktime(self.scantimes[url].timetuple()))) + h.endheaders() + resp = h.getresponse() + + if resp.status == 200: + if not self.accept_contenttype(resp.getheader("content-type")): + # Content-type we're not interested in + return (2, None, None) + return (0, resp.read(), self.get_date(resp.getheader("last-modified"))) + elif resp.status == 304: + # Not modified, so no need to reprocess, but also don't + # give an error message for it... + return (0, None, None) + elif resp.status == 301: + # A redirect... So try again with the redirected-to URL + # We send this through our link resolver to deal with both + # absolute and relative URLs + if resp.getheader('location', '') == '': + log("Url %s returned empty redirect" % url) + return (2, None, None) + + for tgt in self.resolve_links([resp.getheader('location', '')], url): + return (1, tgt, None) + # No redirect at all found, becaue it was invalid? + return (2, None, None) + else: + #print "Url %s returned status %s" % (url, resp.status) + pass + except Exception, e: + log("Exception when loading url %s: %s" % (url, e)) + return (2, None, None) + + def get_date(self, date): + d = parsedate(date) + if d: + return datetime.datetime.fromtimestamp(time.mktime(d)) + return datetime.datetime.now() + + def parse_html(self, page): + if page == None: + return None + + p = GenericHtmlParser() + p.feed(page) + return p + + def resolve_links(self, links, pageurl): + for x in links: + p = urlparse.urlsplit(x) + if p.scheme == "http": + if p.netloc != self.hostname: + # Remote link + continue + # Turn this into a host-relative url + p = ('', '', p.path, p.query, '') + + if p[4] != "" or p[3] != "": + # Remove fragments (part of the url past #) + p = (p[0], p[1], p[2], '', '') + + if p[0] == "": + if p[2] == "": + # Nothing in the path, so it's a pure fragment url + continue + + if p[2][0] == "/": + # Absolute link on this host, so just return it + yield urlparse.urlunsplit(p) + else: + # Relative link + yield urlparse.urljoin(pageurl, urlparse.urlunsplit(p)) + else: + # Ignore unknown url schemes like mailto + pass diff --git a/tools/search/crawler/lib/genericsite.py b/tools/search/crawler/lib/genericsite.py new file mode 100644 index 00000000..6aa5780e --- /dev/null +++ b/tools/search/crawler/lib/genericsite.py @@ -0,0 +1,50 @@ +import re + +from basecrawler import BaseSiteCrawler +from parsers import RobotsParser + +class GenericSiteCrawler(BaseSiteCrawler): + def __init__(self, hostname, dbconn, siteid): + super(GenericSiteCrawler, self).__init__(hostname, dbconn, siteid) + + def init_crawl(self): + # Load robots.txt + self.robots = RobotsParser("http://%s/robots.txt" % self.hostname) + + # We need to seed the crawler with every URL we've already seen, since + # we don't recrawl the contents if they haven't changed. + allpages = self.scantimes.keys() + + # Figure out if there are any excludes to deal with (beyond the + # robots.txt ones) + curs = self.dbconn.cursor() + curs.execute("SELECT suburlre FROM site_excludes WHERE site=%(site)s", { + 'site': self.siteid, + }) + self.extra_excludes = [re.compile(x) for x, in curs.fetchall()] + + # We *always* crawl the root page, of course + self.queue.put(("/", 0.5)) + + # Now do all the other pages + for x in allpages: + self.queue.put((x, 0.5)) + + def exclude_url(self, url): + if self.robots and self.robots.block_url(url): + return True + for r in self.extra_excludes: + if r.search(url): + return True + return False + + def queue_url(self, url): + self.queue.put((url.strip(), 0.5)) + + def post_process_page(self, url): + for l in self.resolve_links(self.page.links, url): + if self.pages_crawled.has_key(l) or self.pages_crawled.has_key(l+"/"): + continue + if self.exclude_url(l): + continue + self.queue_url(l) diff --git a/tools/search/crawler/lib/log.py b/tools/search/crawler/lib/log.py new file mode 100644 index 00000000..8c147899 --- /dev/null +++ b/tools/search/crawler/lib/log.py @@ -0,0 +1,6 @@ +# Yes, this is trivial, but we might want to put something +# more here in the future :) +import datetime +def log(msg): + print "%s: %s" % (datetime.datetime.now(), msg) + diff --git a/tools/search/crawler/lib/parsers.py b/tools/search/crawler/lib/parsers.py new file mode 100644 index 00000000..87aeb84d --- /dev/null +++ b/tools/search/crawler/lib/parsers.py @@ -0,0 +1,172 @@ +import re +import string +import urllib +from StringIO import StringIO +import codecs +import dateutil.parser +from datetime import timedelta + +from HTMLParser import HTMLParser + +from lib.log import log + +class GenericHtmlParser(HTMLParser): + def __init__(self): + HTMLParser.__init__(self) + self.lasttag = None + self.title = "" + self.pagedata = StringIO() + self.links = [] + self.inbody = False + + def handle_starttag(self, tag, attrs): + self.lasttag = tag + if tag == "body": + self.inbody = True + if tag == "a": + for a,v in attrs: + if a == "href": + self.links.append(v) + + def handle_endtag(self, tag): + if tag == "body": + self.inbody = False + + DATA_IGNORE_TAGS = ("script",) + def handle_data(self, data): + d = data.strip() + if len(d) < 2: + return + + if self.lasttag == "title": + self.title += d + return + + # Never store text found in the HEAD + if not self.inbody: + return + + # Ignore specific tags, like SCRIPT + if self.lasttag in self.DATA_IGNORE_TAGS: + return + + self.pagedata.write(d) + self.pagedata.write("\n") + + def gettext(self): + self.pagedata.seek(0) + return self.pagedata.read() + + +class ArchivesParser(object): + rematcher = re.compile(".*.*.*(.*)", re.DOTALL) + hp = HTMLParser() + def __init__(self): + self.subject = None + self.author = None + self.date = None + self.body = None + + def parse(self, contents): + contents = lossy_unicode(contents) + match = self.rematcher.search(contents) + if not match: + return False + self.subject = self.hp.unescape(match.group(1)) + self.author = self.almost_rot13(self.hp.unescape(match.group(2))) + if not self.parse_date(self.hp.unescape(match.group(3))): + return False + self.body = self.hp.unescape(match.group(4)) + return True + + _date_multi_re = re.compile(' \((\w+\s\w+|)\)$') + _date_trailing_envelope = re.compile('\s+\(envelope.*\)$') + def parse_date(self, d): + # For some reason, we have dates that look like this: + # http://archives.postgresql.org/pgsql-bugs/1999-05/msg00018.php + # Looks like an mhonarc bug, but let's just remove that trailing + # stuff here to be sure... + if self._date_trailing_envelope.search(d): + d = self._date_trailing_envelope.sub('', d) + + # We have a number of dates in the format + # " +0200 (MET DST)" + # or similar. The problem coming from the space within the + # parenthesis, or if the contents of the parenthesis is + # completely empty + if self._date_multi_re.search(d): + d = self._date_multi_re.sub('', d) + # Isn't it wonderful with a string with a trailing quote but no + # leading quote? MUA's are weird... + if d.endswith('"') and not d.startswith('"'): + d = d[:-1] + + # We also have "known incorrect timezone specs". + if d.endswith('MST7MDT'): + d = d[:-4] + elif d.endswith('METDST'): + d = d[:-3] + elif d.endswith('"MET'): + d = d[:-4] + "MET" + + try: + self.date = dateutil.parser.parse(d) + except ValueError, e: + log("Failed to parse date '%s'" % d) + return False + + if self.date.utcoffset(): + # We have some messages with completely incorrect utc offsets, + # so we need to reject those too + if self.date.utcoffset() > timedelta(hours=12) or self.date.utcoffset() < timedelta(hours=-12): + log("Failed to parse date %s', timezone offset out of range." % d) + return False + + return True + + # Semi-hacked rot13, because the one used by mhonarc is broken. + # So we copy the brokenness here. + # This code is from MHonArc/ewhutil.pl, mrot13() + _arot13_trans = dict(zip(map(ord, + u'@ABCDEFGHIJKLMNOPQRSTUVWXYZ[abcdefghijklmnopqrstuvwxyz'), + u'NOPQRSTUVWXYZ[@ABCDEFGHIJKLMnopqrstuvwxyzabcdefghijklm')) + def almost_rot13(self, s): + return unicode(s).translate(self._arot13_trans) + +class RobotsParser(object): + def __init__(self, url): + try: + u = urllib.urlopen(url) + txt = u.read() + u.close() + self.disallows = [] + activeagent = False + for l in txt.splitlines(): + if l.lower().startswith("user-agent: ") and len(l) > 12: + if l[12] == "*" or l[12:20] == "pgsearch": + activeagent = True + else: + activeagent = False + if activeagent and l.lower().startswith("disallow: "): + self.disallows.append(l[10:]) + except Exception, e: + self.disallows = [] + + def block_url(self, url): + # Assumes url comes in as relative + for d in self.disallows: + if url.startswith(d): + return True + return False + + +# Convert a string to unicode, try utf8 first, then latin1, then give +# up and do a best-effort utf8. +def lossy_unicode(s): + try: + return unicode(s, 'utf8') + except UnicodeDecodeError: + try: + return unicode(s, 'latin1') + except UnicodeDecodeError: + return unicode(s, 'utf8', 'replace') diff --git a/tools/search/crawler/lib/sitemapsite.py b/tools/search/crawler/lib/sitemapsite.py new file mode 100644 index 00000000..9dbc6703 --- /dev/null +++ b/tools/search/crawler/lib/sitemapsite.py @@ -0,0 +1,94 @@ +import urllib +import xml.parsers.expat +import dateutil.parser + +from lib.log import log +from lib.basecrawler import BaseSiteCrawler + +class SitemapParser(object): + def __init__(self): + self.parser = xml.parsers.expat.ParserCreate() + self.currenturl = "" + self.currentprio = 0 + self.currentlastmod = None + self.geturl = False + self.getprio = False + self.getlastmod = False + self.currstr = "" + self.urls = [] + + def parse(self, f): + self.parser.StartElementHandler = lambda name,attrs: self.processelement(name,attrs) + self.parser.EndElementHandler = lambda name: self.processendelement(name) + self.parser.CharacterDataHandler = lambda data: self.processcharacterdata(data) + + self.parser.ParseFile(f) + + def processelement(self, name, attrs): + if name == "url": + self.currenturl = "" + self.currentprio = 0 + self.currentlastmod = None + elif name == "loc": + self.geturl = True + self.currstr = "" + elif name == "priority": + self.getprio = True + self.currstr = "" + elif name == "lastmod": + self.getlastmod = True + self.currstr = "" + + def processendelement(self, name): + if name == "loc": + self.geturl = False + self.currenturl = self.currstr + elif name == "priority": + self.getprio = False + self.currentprio = float(self.currstr) + elif name == "lastmod": + self.getlastmod = False + self.currentlastmod = dateutil.parser.parse(self.currstr) + elif name == "url": + self.urls.append((self.currenturl, self.currentprio, self.currentlastmod)) + + def processcharacterdata(self, data): + if self.geturl or self.getprio or self.getlastmod: + self.currstr += data + +class SitemapSiteCrawler(BaseSiteCrawler): + def __init__(self, hostname, dbconn, siteid, serverip): + super(SitemapSiteCrawler, self).__init__(hostname, dbconn, siteid, serverip) + + def init_crawl(self): + # We need to seed the crawler with every URL we've already seen, since + # we don't recrawl the contents if they haven't changed. + allpages = self.scantimes.keys() + + # Fetch the sitemap. We ignore robots.txt in this case, and + # assume it's always under /sitemap.xml + u = urllib.urlopen("http://%s/sitemap.xml" % self.hostname) + p = SitemapParser() + p.parse(u) + u.close() + + for url, prio, lastmod in p.urls: + url = url[len(self.hostname)+7:] + if lastmod: + if self.scantimes.has_key(url): + if lastmod < self.scantimes[url]: + # Not modified since last scan, so don't reload + # Stick it in the list of pages we've scanned though, + # to make sure we don't remove it... + self.pages_crawled[url] = 1 + continue + self.queue.put((url, prio)) + + log("About to crawl %s pages from sitemap" % self.queue.qsize()) + + # Stub functions used when crawling, ignored here + def queue_url(self, url): + pass + + def post_process_page(self, url): + pass diff --git a/tools/search/crawler/lib/threadwrapper.py b/tools/search/crawler/lib/threadwrapper.py new file mode 100644 index 00000000..4b39ac9e --- /dev/null +++ b/tools/search/crawler/lib/threadwrapper.py @@ -0,0 +1,22 @@ +from multiprocessing import Process + +# Wrap a method call in a different process, so that we can process +# keyboard interrupts and actually terminate it if we have to. +# python threading makes it often impossible to Ctlr-C it otherwise.. +# +# NOTE! Database connections and similar objects must be instantiated +# in the subprocess, and not in the master, to be fully safe! +def threadwrapper(func, *args): + p = Process(target=func, args=args) + p.start() + + # Wait for the child to exit, or if an interrupt signal is delivered, + # forcibly terminate the child. + try: + p.join() + except KeyboardInterrupt, e: + print "Keyboard interrupt, terminating child process!" + p.terminate() + except Exception, e: + print "Exception %s, terminating child process!" % e + p.terminate() diff --git a/tools/search/crawler/listcrawler.py b/tools/search/crawler/listcrawler.py new file mode 100755 index 00000000..e528ab5c --- /dev/null +++ b/tools/search/crawler/listcrawler.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from lib.log import log +from lib.archives import MultiListCrawler +from lib.threadwrapper import threadwrapper +from ConfigParser import ConfigParser +from optparse import OptionParser +import psycopg2 +import sys +import time + +def doit(opt): + cp = ConfigParser() + cp.read("search.ini") + psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) + conn = psycopg2.connect(cp.get("search","db")) + + curs = conn.cursor() + + if opt.list: + curs.execute("SELECT id,name FROM lists WHERE name=%(name)s", { + 'name': opt.list, + }) + else: + curs.execute("SELECT id,name FROM lists WHERE active ORDER BY id") + + listinfo = [(id,name) for id,name in curs.fetchall()] + c = MultiListCrawler(listinfo, conn, opt.status_interval, opt.commit_interval) + n = c.crawl(opt.full, opt.month) + conn.commit() + + log("Indexed %s messages" % n) + time.sleep(1) + +if __name__=="__main__": + parser = OptionParser() + parser.add_option("-l", "--list", dest='list', help="Crawl only this list") + parser.add_option("-m", "--month", dest='month', help="Crawl only this month") + parser.add_option("-f", "--full", dest='full', action="store_true", help="Make a full crawl") + parser.add_option("-t", "--status-interval", dest='status_interval', help="Seconds between status updates") + parser.add_option("-c", "--commit-interval", dest='commit_interval', help="Messages between each commit") + + (opt, args) = parser.parse_args() + + if opt.full and opt.month: + print "Can't use both full and specific month!" + sys.exit(1) + + # assign default values + opt.status_interval = opt.status_interval and int(opt.status_interval) or 30 + opt.commit_interval = opt.commit_interval and int(opt.commit_interval) or 500 + + threadwrapper(doit, opt) diff --git a/tools/search/crawler/listsync.py b/tools/search/crawler/listsync.py new file mode 100755 index 00000000..09f9e205 --- /dev/null +++ b/tools/search/crawler/listsync.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from lib.log import log +from ConfigParser import ConfigParser +import psycopg2 +import urllib +import simplejson as json + +if __name__=="__main__": + cp = ConfigParser() + cp.read("search.ini") + psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) + conn = psycopg2.connect(cp.get("search","db")) + curs = conn.cursor() + + u = urllib.urlopen("http://%s/community/lists/listinfo/" % cp.get("search", "web")) + obj = json.load(u) + u.close() + + # We don't care about the groups here, just the lists! + curs.execute("SELECT id, name, active FROM lists") + lists = curs.fetchall() + for id, name, active in lists: + thislist = [x for x in obj['lists'] if x['id'] == id] + if len(thislist) == 0: + log("List %s should be removed, do that manually!" % name) + else: + # Compare contents of list + l = thislist[0] + if l['name'] != name: + log("Renaming list %s -> %s" % (name, l['name'])) + curs.execute("UPDATE lists SET name=%(name)s WHERE id=%(id)s", l) + + if thislist[0]['active'] != active: + log("Changing active flag for %s to %s" % (l['name'], l['active'])) + curs.execute("UPDATE lists SET active=%(active)s WHERE id=%(id)s", l) + for l in obj['lists']: + thislist = [x for x in lists if x[0] == l['id']] + if len(thislist) == 0: + log("Adding list %s" % l['name']) + curs.execute("INSERT INTO lists (id, name, active, pagecount) VALUES (%(id)s, %(name)s, %(active)s, 0)", + l) + + conn.commit() diff --git a/tools/search/crawler/search.ini.sample b/tools/search/crawler/search.ini.sample new file mode 100644 index 00000000..a3e2741e --- /dev/null +++ b/tools/search/crawler/search.ini.sample @@ -0,0 +1,4 @@ +[search] +db=dbname=search +web=www.postgresql.org +frontendip=1.2.3.4 \ No newline at end of file diff --git a/tools/search/crawler/webcrawler.py b/tools/search/crawler/webcrawler.py new file mode 100755 index 00000000..c0967421 --- /dev/null +++ b/tools/search/crawler/webcrawler.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from lib.log import log +from lib.genericsite import GenericSiteCrawler +from lib.sitemapsite import SitemapSiteCrawler +from lib.threadwrapper import threadwrapper + +from ConfigParser import ConfigParser +import psycopg2 +import time + +def doit(): + psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) + conn = psycopg2.connect(cp.get("search","db")) + + curs = conn.cursor() + + # Start by indexing the main website + log("Starting indexing of main website") + SitemapSiteCrawler("www.postgresql.org", conn, 1, cp.get("search", "frontendip")).crawl() + conn.commit() + + # Skip id=1, which is the main site.. + curs.execute("SELECT id, hostname FROM sites WHERE id>1") + for siteid, hostname in curs.fetchall(): + log("Starting indexing of %s" % hostname) + GenericSiteCrawler(hostname, conn, siteid).crawl() + conn.commit() + + curs.execute("WITH t AS (SELECT site,count(*) AS c FROM webpages GROUP BY site) UPDATE sites SET pagecount=t.c FROM t WHERE id=t.site") + conn.commit() + + time.sleep(1) + + +if __name__=="__main__": + cp = ConfigParser() + cp.read("search.ini") + + threadwrapper(doit) diff --git a/tools/search/sql/README b/tools/search/sql/README new file mode 100644 index 00000000..9f08a3a0 --- /dev/null +++ b/tools/search/sql/README @@ -0,0 +1,20 @@ +Script load order and description: + +1) Load schema.sql + Creates all tables + +2) Load tsearch.sql + Configures full text indexing + +3) Load functions.sql + Creates PL/pgsql functions + +4) Load data.sql + Loads sites, exclusions and lists. It's either this or restore a backup + of those tables. + +-> recommended to perform initial indexing here, for performance reasons + +5) indexes.sql + Create fulltext indexes and date index + diff --git a/tools/search/sql/data.sql b/tools/search/sql/data.sql new file mode 100644 index 00000000..5e12e9d2 --- /dev/null +++ b/tools/search/sql/data.sql @@ -0,0 +1,20 @@ +INSERT INTO sites (id, hostname, description, pagecount) + VALUES (1, 'www.postgresql.org', 'Main PostgreSQL Website', 0); + +INSERT INTO sites (id, hostname, description, pagecount) + VALUES (2, 'www.pgadmin.org','pgAdmin III', 0); + +INSERT INTO sites (id, hostname, description, pagecount) + VALUES (3, 'jdbc.postgresql.org','JDBC driver', 0); + + +INSERT INTO site_excludes VALUES (2,'^/archives'); +INSERT INTO site_excludes VALUES (2,'^/docs/dev'); +INSERT INTO site_excludes VALUES (2,'^/docs/1.4'); +INSERT INTO site_excludes VALUES (2,'^/docs/[^/]+/pg'); +INSERT INTO site_excludes VALUES (2,'^/snapshots'); +INSERT INTO site_excludes VALUES (3,'^/development'); +INSERT INTO site_excludes VALUES (3,'^/\.\./'); +INSERT INTO site_excludes VALUES (3,'\.tar\.'); +INSERT INTO site_excludes VALUES (3,'\.jar'); +INSERT INTO site_excludes VALUES (3,'\.tgz'); diff --git a/tools/search/sql/functions.sql b/tools/search/sql/functions.sql new file mode 100644 index 00000000..7cf263a1 --- /dev/null +++ b/tools/search/sql/functions.sql @@ -0,0 +1,109 @@ +CREATE OR REPLACE FUNCTION archives_search(query text, _lists int, firstdate timestamptz, lastdate timestamptz, startofs int, hitsperpage int, sort char) +RETURNS TABLE (listname text, year int, month int, msgnum int, date timestamptz, subject text, author text, headline text, rank float) +AS $$ +DECLARE + tsq tsquery; + qry text; + hits int; + hit RECORD; + curs refcursor; + pagecount int; + listary int[]; +BEGIN + tsq := plainto_tsquery(query); + IF numnode(tsq) = 0 THEN + RETURN QUERY SELECT NULL::text, 0, 0, NULL::int, NULL::timestamptz, NULL::text, NULL::text, NULL::text, NULL:: float; + RETURN; + END IF; + + hits := 0; + + IF _lists IS NULL THEN + SELECT INTO pagecount sum(lists.pagecount) FROM lists; + IF sort = 'd' THEN + OPEN curs FOR SELECT m.list,m.year,m.month,m.msgnum,ts_rank_cd(m.fti,tsq) FROM messages m WHERE m.fti @@ tsq AND m.date>COALESCE(firstdate,'1900-01-01') ORDER BY m.date DESC LIMIT 1000; + ELSE + OPEN curs FOR SELECT m.list,m.year,m.month,m.msgnum,ts_rank_cd(m.fti,tsq) FROM messages m WHERE m.fti @@ tsq AND m.date>COALESCE(firstdate,'1900-01-01') ORDER BY ts_rank_cd(m.fti,tsq) DESC LIMIT 1000; + END IF; + ELSE + IF _lists < 0 THEN + SELECT INTO listary ARRAY(SELECT id FROM lists WHERE grp=-_lists); + ELSE + listary = ARRAY[_lists]; + END IF; + SELECT INTO pagecount sum(lists.pagecount) FROM lists WHERE id=ANY(listary); + IF sort = 'd' THEN + OPEN curs FOR SELECT m.list,m.year,m.month,m.msgnum,ts_rank_cd(m.fti,tsq) FROM messages m WHERE (m.list=ANY(listary)) AND m.fti @@ tsq AND m.date>COALESCE(firstdate,'1900-01-01') ORDER BY m.date DESC LIMIT 1000; + ELSE + OPEN curs FOR SELECT m.list,m.year,m.month,m.msgnum,ts_rank_cd(m.fti,tsq) FROM messages m WHERE (m.list=ANY(listary)) AND m.fti @@ tsq AND m.date>COALESCE(firstdate,'1900-01-01') ORDER BY ts_rank_cd(m.fti,tsq) DESC LIMIT 1000; + END IF; + END IF; + LOOP + FETCH curs INTO hit; + IF NOT FOUND THEN + EXIT; + END IF; + hits := hits+1; + IF (hits < startofs+1) OR (hits > startofs + hitsperpage) THEN + CONTINUE; + END IF; + RETURN QUERY SELECT lists.name::text, hit.year, hit.month, hit.msgnum, messages.date, messages.subject::text, messages.author::text, ts_headline(messages.txt,tsq,'StartSel="[[[[[[",StopSel="]]]]]]"'), hit.ts_rank_cd::float FROM messages INNER JOIN lists ON messages.list=lists.id WHERE messages.list=hit.list AND messages.year=hit.year AND messages.month=hit.month AND messages.msgnum=hit.msgnum; + END LOOP; + + listname := NULL; msgnum := NULL; date := NULL; subject := NULL; author := NULL; headline := NULL; rank := NULL; + year=hits; + month=pagecount; + RETURN NEXT; +END; +$$ +LANGUAGE 'plpgsql'; +ALTER FUNCTION archives_search(text, int, timestamptz, timestamptz, int, int, char) SET default_text_search_config = 'public.pg'; + + +CREATE OR REPLACE FUNCTION site_search(query text, startofs int, hitsperpage int, allsites bool, _suburl text) +RETURNS TABLE (siteid int, baseurl text, suburl text, title text, headline text, rank float) +AS $$ +DECLARE + tsq tsquery; + qry text; + hits int; + hit RECORD; + curs refcursor; + pagecount int; +BEGIN + tsq := plainto_tsquery(query); + IF numnode(tsq) = 0 THEN + siteid = 0;baseurl=NULL;suburl=NULL;title=NULL;headline=NULL;rank=0; + RETURN NEXT; + RETURN; + END IF; + + hits := 0; + + IF allsites THEN + SELECT INTO pagecount sum(sites.pagecount) FROM sites; + OPEN curs FOR SELECT sites.id AS siteid, sites.baseurl, webpages.suburl, ts_rank_cd(fti,tsq) FROM webpages INNER JOIN sites ON webpages.site=sites.id WHERE fti @@ tsq ORDER BY ts_rank_cd(fti,tsq) DESC LIMIT 1000; + ELSE + SELECT INTO pagecount sites.pagecount FROM sites WHERE id=1; + IF _suburl IS NULL THEN + OPEN curs FOR SELECT sites.id AS siteid, sites.baseurl, webpages.suburl, ts_rank_cd(fti,tsq) FROM webpages INNER JOIN sites ON webpages.site=sites.id WHERE fti @@ tsq AND site=1 ORDER BY ts_rank_cd(fti,tsq) DESC LIMIT 1000; + ELSE + OPEN curs FOR SELECT sites.id AS siteid, sites.baseurl, webpages.suburl, ts_rank_cd(fti,tsq) FROM webpages INNER JOIN sites ON webpages.site=sites.id WHERE fti @@ tsq AND site=1 AND suburl LIKE _suburl||'%' ORDER BY ts_rank_cd(fti,tsq) DESC LIMIT 1000; + END IF; + END IF; + LOOP + FETCH curs INTO hit; + IF NOT FOUND THEN + EXIT; + END IF; + hits := hits+1; + IF (hits < startofs+1) OR (hits > startofs+hitsperpage) THEN + CONTINUE; + END IF; + RETURN QUERY SELECT hit.siteid, hit.baseurl::text, hit.suburl::text, webpages.title::text, ts_headline(webpages.txt,tsq,'StartSel="[[[[[[",StopSel="]]]]]]"'), hit.ts_rank_cd::float FROM webpages WHERE webpages.site=hit.siteid AND webpages.suburl=hit.suburl; + END LOOP; + RETURN QUERY SELECT pagecount, NULL::text, NULL::text, NULL::text, NULL::text, pagecount::float; +END; +$$ +LANGUAGE 'plpgsql'; +ALTER FUNCTION site_search(text, int, int, bool, text) SET default_text_search_config = 'public.pg'; \ No newline at end of file diff --git a/tools/search/sql/indexes.sql b/tools/search/sql/indexes.sql new file mode 100644 index 00000000..c5be4f0b --- /dev/null +++ b/tools/search/sql/indexes.sql @@ -0,0 +1,10 @@ +DROP INDEX IF EXISTS messages_date_idx; +CREATE INDEX messages_date_idx ON messages(date); + +DROP INDEX IF EXISTS webpages_fti_idx; +CREATE INDEX webpages_fti_idx ON webpages USING gin(fti); +ANALYZE webpages; + +DROP INDEX IF EXISTS messages_fti_idx; +CREATE INDEX messages_fti_idx ON messages USING gin(fti); +ANALYZE messages; diff --git a/tools/search/sql/pg_dict.syn b/tools/search/sql/pg_dict.syn new file mode 100644 index 00000000..b200527f --- /dev/null +++ b/tools/search/sql/pg_dict.syn @@ -0,0 +1,5 @@ +postgres postgres +postgresql postgres +pgsql postgres +pg postgres +postgre postgres diff --git a/tools/search/sql/schema.sql b/tools/search/sql/schema.sql new file mode 100644 index 00000000..e88abf5e --- /dev/null +++ b/tools/search/sql/schema.sql @@ -0,0 +1,45 @@ +CREATE TABLE lists ( + id int NOT NULL PRIMARY KEY, + name varchar(64) NOT NULL, + active bool NOT NULL, + pagecount int NOT NULL +); + +CREATE TABLE messages ( + list int NOT NULL REFERENCES lists(id) ON DELETE CASCADE, + year int NOT NULL, + month int NOT NULL, + msgnum int NOT NULL, + date timestamptz NOT NULL, + subject varchar(128) NOT NULL, + author varchar(128) NOT NULL, + txt text NOT NULL, + fti tsvector NOT NULL +); +ALTER TABLE messages ADD CONSTRAINT pk_messages PRIMARY KEY (list,year,month,msgnum); + + +CREATE TABLE sites ( + id int NOT NULL PRIMARY KEY, + hostname text NOT NULL UNIQUE, + description text NOT NULL, + pagecount int NOT NULL +); + +CREATE TABLE webpages ( + site int NOT NULL REFERENCES sites(id) ON DELETE CASCADE, + suburl varchar(512) NOT NULL, + title varchar(128) NOT NULL, + relprio float NOT NULL DEFAULT 0.5, + lastscanned timestamptz NULL, + txt text NOT NULL, + fti tsvector NOT NULL +); +ALTER TABLE webpages ADD CONSTRAINT pk_webpages PRIMARY KEY (site, suburl); + +CREATE TABLE site_excludes ( + site int NOT NULL REFERENCES sites(id) ON DELETE CASCADE, + suburlre varchar(512) NOT NULL +); +ALTER TABLE site_excludes ADD CONSTRAINT pk_site_excludes PRIMARY KEY (site,suburlre); + diff --git a/tools/search/sql/tsearch.sql b/tools/search/sql/tsearch.sql new file mode 100644 index 00000000..6db3a555 --- /dev/null +++ b/tools/search/sql/tsearch.sql @@ -0,0 +1,33 @@ +-- Creates configuration 'pg' + +BEGIN; + +-- create our configuration to work from +CREATE TEXT SEARCH CONFIGURATION pg (COPY = pg_catalog.english ); + +-- create english ispell dictionary +CREATE TEXT SEARCH DICTIONARY english_ispell ( + TEMPLATE = ispell, + DictFile = en_us, + AffFile = en_us, + StopWords = english +); +-- create our dictionary +CREATE TEXT SEARCH DICTIONARY pg_dict ( + TEMPLATE = synonym, + SYNONYMS = pg_dict +); + +-- activate the dictionaries +ALTER TEXT SEARCH CONFIGURATION pg + ALTER MAPPING FOR asciiword, asciihword, hword_asciipart, + word, hword, hword_part + WITH pg_dict, english_ispell, english_stem; + +-- parts we don't want to index at all +ALTER TEXT SEARCH CONFIGURATION pg + DROP MAPPING FOR email, url, url_path, sfloat, float; + +-- All done + +COMMIT;