""" Copyright (C) 2000-2001 Brandon Long VirtualBrowser - a controllable/scriptable web agent """ VERSION = "0.10" import sys, string, re import timeoutsocket import urllib import httplib import Cookie class VirtualBrowser: def __init__ (self, auth_callback = None): self._user_agent = "VirtualBrowser/%s" % VERSION self._current_url = None self._host = "localhost" self._last_url = None self._cookies = {} self._auth_cache = {} self._languages = ["en"] self._types = ["text/html", "text/plain", "image/jpeg", "image/gif"] # We open URLs we don't understand with urllib self._fallback_opener = urllib.URLopener() # HTTP traffic timesout in 120 seconds self._timeout = 120 # Callback function provided by user to get user/pass auth self._auth_callback = auth_callback def timeout_ (self, o): try: self._timeout = int (o) except ValueError: pass return self def fetchpage (self, url): if self._last_url: url = urllib.basejoin (self._last_url, url) self._current_url = url type, rest = urllib.splittype (url) if not type: type = "http" host, path = urllib.splithost (rest) if not host: host = self._host else: self._host = host host, port = urllib.splitport (host) if type == "http": errcode, errmsg, page, headers = self.fetch_http (host, port, path) self._last_url = self._current_url return errcode, errmsg, page, headers else: return self._fallback_opener.retrieve (url) def fetch_http (self, host, port, path, realm = None, method = "GET", body = None, ctype = None): if not port: port = 80 else: try: port = int (port) except ValueError: port = 80 cookie = self.getCookie (host, path) h = httplib.HTTP (host, port) h.sock.set_timeout(self._timeout) if method: h.putrequest (method, path) else: h.putrequest ("GET", path) h.putheader ("Host", host) h.putheader ("User-Agent", self._user_agent) h.putheader ("Accept", string.join (self._types, ", ")) h.putheader ("Accept-Language", string.join (self._languages, ", ")) if self._last_url: h.putheader ("Referer", self._last_url) if cookie: h.putheader ("Cookie", cookie) if realm: auth = self.getAuth (host, realm) h.putheader ("Authorization", "Basic %s" % auth) if body and ctype: h.putheader ("Content-Type", ctype) h.putheader ("Content-Length", str(len(body))) h.endheaders () if body: h.send(body) errcode, errmsg, headers = h.getreply() f = h.getfile() page = f.read() f.close() if errcode == -1: return errcode, errmsg, page, headers if headers.has_key ("set-cookie"): # This isn't actually "headers" its a MimeMessage (rfc822.Message) # And, when you have more than one instance of a header, you have # to (sigh) ask for all matching, then decode them yourself for line in headers.getallmatchingheaders ("set-cookie"): header, value = string.split (line, ':', 1) m = Cookie.Morsel (string.strip(value)) self.addCookie (m, host) if errcode == 302: self._last_url = self._current_url newurl = headers['location'] return self.fetchpage(newurl) if errcode == 401: if headers.has_key('www-authenticate'): auth_header = headers['www-authenticate'] match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', auth_header) if match: scheme, realm = match.groups() headers['virtualbrowser-auth-scheme'] = scheme headers['virtualbrowser-auth-realm'] = realm if string.lower(scheme) == 'basic': # Only attempt retry if we have an authorization to attempt if self.getAuth (host, realm): return self.fetch_http (host, port, path, realm) return errcode, errmsg, page, headers def do_post (self, url, post_dict, ctype="application/x-www-form-urlencoded"): if self._last_url: url = urllib.basejoin (self._last_url, url) self._current_url = url type, rest = urllib.splittype (url) if not type: type = "http" host, path = urllib.splithost (rest) if not host: host = self._host else: self._host = host host, port = urllib.splitport (host) data = urllib.urlencode(post_dict) errcode, errmsg, page, headers = self.fetch_http (host, port, path, method = "POST", body = data, ctype = ctype) self._last_url = self._current_url return errcode, errmsg, page, headers def getAuth (self, host, realm): key = realm + '@' + string.lower (host) if self._auth_cache.has_key(key): return self._auth_cache[key] if self._auth_callback: user, password = self._auth_callback return self.addAuth (host, realm, user, password) return None def addAuth (self, host, realm, user, password): import base64 key = realm + '@' + string.lower (host) up = "%s:%s" % (user, password) auth = string.strip (base64.encodestring(up)) self._auth_cache[key] = auth return auth def getCookie (self, host, path): cookie_str = [] hlen = len (host) for domain, cpath in self._cookies.keys(): dlen = len (domain) if (host[hlen-dlen:] == domain) and (path[:len(cpath)] == cpath): for morsel in self._cookies[(domain, cpath)]: cookie_str.append ("%s=%s" % (morsel.key, morsel.value)) if cookie_str == []: return None return string.join (cookie_str, '; ') def addCookie (self, morsel, host): try: domain = morsel['domain'] except KeyError: domain = host try: path = morsel['path'] except KeyError: path = '/' uniq = (domain, path) if self._cookies.has_key (uniq): for sm in self._cookies[uniq]: if sm.key == morsel.key: self._cookies[uniq].remove(sm) self._cookies[uniq].append (morsel) else: self._cookies[uniq] = [morsel]