Source code for pyhttp.requests

from http import HTTPStatus
import os
from urllib import parse
import socket
import mimetypes
import subprocess
import io


[docs]class Request(object): """ A simple class to encapsulate reading and parsing a raw HTTP request string. It takes a client socket connection and stores the http command, resource path requested, and http version. It also stores a headers dictionary. :param conn: A client socket connection """ POST = 'POST' GET = 'GET' def __init__(self, conn): req_str = conn.recv(1024) self.method = '' self.path = '' self.http_ver = '' self.query_params = '' self.headers = dict() self._parse_request(req_str.decode()) def _parse_request(self, req_str): if not len(req_str) == 0: header_line = req_str.splitlines()[0] headers = header_line.split() if len(headers) == 3: self.method = headers[0] self.path = headers[1] self.http_ver = headers[2] else: raise RequestParseError(req_str) else: raise RequestParseError(req_str) self._parse_header_fields(req_str) def _parse_header_fields(self, req_str): lines = req_str.splitlines()[1:] for line in lines: if line: vals = line.split(': ') # We're dealing with request header fields if len(vals) == 2: self.headers[vals[0]] = vals[1] # We're done with header fields. Onto content # Check for query string in post request elif len(vals) == 1 and self.method == Request.POST: query_str = line.split('?')
self.query_params = query_str[0]
[docs]class BaseHttpRequestHandler(object): """Base request handler. Parses connection into a :class:`Request <Request>` object :param conn: A client connection socket :param addr: Client address :param server: Server instance that accepted the client connection :param directory: Base directory to serve files from. Defaults to current working directory """ def __init__(self, conn, addr, server, directory=None): self.conn = conn self.client_addr = addr self.server = server self.header_buffer = [] self.wfile = SocketWriter(conn) self.http_version = 'HTTP/1.1' self.request = None if directory is None: directory = os.getcwd() self.directory = directory self.handle()
[docs] def get_path(self, path): """ Takes the path specified in the request header and translates to an absolute path on the server's filesystem. Doesn't accept relative path markers like '.' or '..' currently. :param path: path from http request :return: absolute file path on the server's FS :rtype: str """ # just in case there are query arguments # there shouldn't be for basic filesystem ops path = self.request.path path = path.split('?')[0] # remove any url encodings like %xx path = parse.unquote(path) # trailing slashes? # join request path to server directory sub_dirs = path.split('/') path = self.directory for d in sub_dirs: if d != '': path = os.path.join(path, d)
return path
[docs] def list_dir(self, path): """ Takes an absolute path on the server's filesystem and generates HTML which lists the path's contents if it is a directory or file contents otherwise. :param path: an absolute file system path :type path: str :return: HTML string. :rtype: str """ buffer = [] buffer.append(doctype()) buffer.append(html_head(f'Directory listing for {path}')) buffer.append('<body>') buffer.append(h1_header(f'Directory listing for {path}')) buffer.append('<hr>') files = os.listdir(path) buffer.append("<ul>\n") for file in files: abs_path = os.path.join(path, file) ref_name = display = file if os.path.isdir(abs_path): ref_name = ref_name + '/' display = display + '/' buffer.append("<li>\n") ref_name = parse.quote(ref_name) link = f'<a href="{ref_name}">{display}</a>\n</li>\n' # link = parse.quote(link) # link = html.escape(link) buffer.append(link) buffer.append("</ul>\n") buffer.append("</body>\n</html>") html_txt = "".join(buffer)
return html_txt def send_response(self, http_status): """Writes the http response first line. Expects an HTTPStatus type""" if isinstance(http_status, HTTPStatus): code = http_status.value msg = http_status.name line = f'{self.http_version} {code} {msg}\r\n' self.header_buffer.append(line) def send_header(self, field, value): """ Writes a field: value pair to http response :param field: Http response field :param value: Http response value """ header = f'{field}: {value}\r\n' self.header_buffer.append(header) def end_header(self): """ Writes the blank line in http response to signal start of response content """ self.header_buffer.append('\r\n') def flush_header(self): """ Writes the header buffer to the socket and clears header buffer """ header_bytes = ''.join(self.header_buffer) self.wfile.write(header_bytes.encode('utf-8')) self.header_buffer = []
[docs] def handle(self): """ Dispatches appropriate method to handle a request. Could be serving a static file, directory listing, or running a CGI script. """ try: self.request = Request(self.conn) # For some reason we keep getting empty requests # from the browser. In that case we'll just return # them to the root of the directory except RequestParseError: self.send_response(HTTPStatus.MOVED_PERMANENTLY) self.send_header('Location', '/') self.end_header() self.flush_header() return None path = self.get_path(self.request.path) if os.path.isdir(path): self.do_directory(path) # Check if it's a file elif os.path.isfile(path): self.do_file(path) else: self.send_response(HTTPStatus.NOT_FOUND) self.send_header('Content-Type', 'text/plain') self.send_header('Connection', 'close') self.end_header() self.flush_header() self.wfile.write(b"I can't handle that...")
self.finish() def do_directory(self, path): html_contents = self.list_dir(path) if html_contents is not None: self.send_response(HTTPStatus.OK) self.send_header('Content-Type', 'text/html') self.send_header('Connection', 'close') self.end_header() self.flush_header() self.wfile.write(html_contents.encode('utf-8'))
[docs] def do_file(self, path): """ Handles the response for all file types. Determines whether to serve file contents or if the path is a CGI script that should be executed. :param path: An absolute filesystem path :type path: str """ # TODO: Need to check if we're in the cgi-bin dir first cgi_exts = ['.py', '.cgi'] cgi_dir = 'cgi-bin' base_path, ext = os.path.splitext(path) content_type = mimetypes.types_map.get(ext, 'text/plain') abs_dir = os.path.dirname(path) parent_dir = os.path.basename(abs_dir) # First, see if we need to run a cgi script if parent_dir in cgi_dir and ext in cgi_exts: self.run_cgi(path) # Otherwise, treat like regular file else: f = None try: f = open(path, 'rb') except OSError: # TODO: do error handling self.send_response(HTTPStatus.NOT_FOUND) if f is not None: f_size = os.path.getsize(path) self.send_response(HTTPStatus.OK) self.send_header('Content-Type', content_type) self.send_header('Connection', 'close') self.send_header('Content-Length', f_size) self.end_header() self.flush_header()
self.wfile.write(f.read())
[docs] def run_cgi(self, path): """ Executes a CGI script in a separate process and writes the output into the response buffer. :param path: An absolute filesystem path :type path: str """ param_str = self.request.query_params p = None out = b'' if param_str: field, calc_str = param_str.split("=") if calc_str: calc_str = parse.unquote_plus(calc_str) p = subprocess.Popen(['python', path, calc_str], stdout=subprocess.PIPE) else: p = subprocess.Popen(['python', path], stdout=subprocess.PIPE) if p is not None: out, err = p.communicate() if out: self.send_response(HTTPStatus.OK) else: self.send_response(HTTPStatus.INTERNAL_SERVER_ERROR) out = b''' <html> <body> <h2>I'm not sure what went wrong....</h2> </body> </html> ''' f2 = io.BytesIO(out) self.send_header('Content-Type', 'text/html') self.send_header('Connection', 'close') self.send_header('Content-Length', len(out)) self.end_header() self.flush_header()
self.wfile.write(f2.read()) def finish(self):
self.wfile.close()
[docs]class SocketWriter(object): """ A simple wrapper around a socket to make interacting with client connections similar to doing file writes. Doesn't do much, but modeled on Python's SocketWriter used by the standard library's Http server. This one doesn't inherit from BufferedIOBase but just expects the caller sends appropriately encoded bytes to send to client connection. :param sock: Open socket connection to client """ def __init__(self, sock): self._sock = sock
[docs] def write(self, b): """ Write an encoded byte string to the SocketWriter's client connection. This is unbuffered. :param b: An encoded byte string. We're currently using utf-8 encodings :type b: bytes :return: Number of bytes written """ self._sock.sendall(b)
return len(b)
[docs] def close(self): """ Shutdown and close the client connection. """ self._sock.shutdown(socket.SHUT_WR)
self._sock.close() class RequestError(Exception): def __init__(self, request, msg=None): if msg is None: msg = f'An error occurred with request:\n{request}' super(RequestError, self).__init__(msg) self.request = request class RequestParseError(RequestError): def __init__(self, req, msg=None): super(RequestParseError, self).__init__(req, msg) def doctype(): return "<!DOCTYPE html>\n" def html_head(title=''): return f''' <html> <head> <meta charset="UTF-8" content="text/html"> <title>{title}</title> </head> ''' def h1_header(txt): return f'<h1>{txt}</h1>'