import sys from hashlib import sha256 from mimetypes import guess_extension from pathlib import Path from os import path, chdir import re import urllib3 import validators from flask import Flask, abort, make_response, redirect, request, send_from_directory, url_for, render_template from flask_migrate import Migrate from flask_sqlalchemy import SQLAlchemy from jinja2 import ChoiceLoader, FileSystemLoader from jinja2.exceptions import * from magic import Magic from short_url import UrlEncoder from werkzeug.exceptions import HTTPException from urllib.parse import urlparse chdir(path.dirname(path.abspath(__file__))) app = Flask(__name__, instance_relative_config=True) app.config.from_pyfile('config.py') app.jinja_loader = ChoiceLoader([ FileSystemLoader(str(Path(app.instance_path) / 'templates')), app.jinja_loader ]) max_content_len = app.config["MAX_CONTENT_LENGTH"] min_days = app.config['MIN_DAYS'] max_days = app.config['MAX_DAYS'] urllib3.disable_warnings() FLASK_MAX_INTEGER = 9223372036854775807 class MisdirectedRequest(HTTPException): code = 421 try: mimedetect = Magic(mime=True, mime_encoding=False) except: print('''Error: You have installed the wrong version of the 'magic' module. Please install python-magic.''') sys.exit(1) db = SQLAlchemy(app) migrate = Migrate(app, db) url_encoder = UrlEncoder(alphabet=app.config['URL_ALPHABET'], block_size=16) class URL(db.Model): id = db.Column(db.Integer, primary_key=True) url = db.Column(db.UnicodeText, unique=True) def __init__(self, url): self.url = url def getname(self): return url_encoder.enbase(self.id, 1) def geturl(self): return url_for('file_handler', path=self.getname(), _external=True) + '\n' def get(url): found_url = URL.query.filter_by(url=url).first() if not found_url: found_url = URL(url) db.session.add(found_url) db.session.commit() return found_url return found_url class File(db.Model): id = db.Column(db.Integer, primary_key=True) sha256 = db.Column(db.LargeBinary(32)) ext = db.Column(db.UnicodeText) mime = db.Column(db.UnicodeText) removed = db.Column(db.Boolean, default=False) def __init__(self, sha256, ext, mime): self.sha256 = sha256 self.ext = ext self.mime = mime def getname(self): return u'{0}{1}'.format(url_encoder.enbase(self.id, 1), self.ext) def geturl(self): n = self.getname() return url_for('file_handler', path=n, _external=True) + '\n' def store(file_data, type): def get_data(): if type == 'file': return file_data.stream.read() elif type == 'data': return file_data.stream data = get_data() hash = sha256(data) hexdigest_hash = hash.hexdigest() bin_hash = hash.digest() def get_mime(): guess = mimedetect.from_buffer(data) app.logger.debug(f'MIME - specified: "{file_data.content_type}" - detected: "{guess}"') if not file_data.content_type or not '/' in file_data.content_type \ or file_data.content_type == 'application/octet-stream': mime = guess else: mime = file_data.content_type if mime.startswith('text/') and not 'charset' in mime: mime += '; charset=utf-8' return mime def get_ext(mime): ext = ''.join(Path(file_data.filename).suffixes[-2:]) if mime.startswith('text/'): mime = mime[:mime.find(';')] if not ext: if mime in app.config['FHOST_EXT_OVERRIDE']: ext = app.config['FHOST_EXT_OVERRIDE'][mime] else: ext = guess_extension(mime) if not ext: return '.bin' return ext[-app.config['FHOST_MAX_EXT_LENGTH']:] file_obj = File.query.filter_by(sha256=bin_hash).first() if file_obj: if not file_obj.removed: return file_obj, len(data) file_obj.removed = False else: mime = get_mime() ext = get_ext(mime) file_obj = File(bin_hash, ext, mime) storage = Path(app.config['FHOST_STORAGE_PATH']) storage.mkdir(parents=True, exist_ok=True) file_path = storage / hexdigest_hash if type == 'file': file_data.stream.seek(0) file_data.save(file_path) elif type == 'data': file_path.touch() file_path.write_bytes(data) db.session.add(file_obj) db.session.commit() return file_obj, len(data) def is_fhost_url(url): return url.startswith(url_for('.fhost', _external=True).rstrip('/')) def shorten(url): if len(url) > app.config['MAX_URL_LENGTH']: abort(414) if not validators.url(url) or is_fhost_url(url): abort(400) url_obj = URL.get(url) return url_obj.geturl() VERBOSE_FORMAT = \ '''File address: {} B32 file address: {} Expires in: {} days ''' get_max_age = lambda file_size: int(min_days + (-max_days + min_days) * (file_size / max_content_len - 1) ** 3) def store_file(f, type, verbose=False): file_obj, file_size = File.store(f, type) if verbose: max_age = get_max_age(file_size) url = file_obj.geturl().strip() b32_address = urlparse(url)._replace(netloc=app.config['B32_ADDRESS']).geturl() return VERBOSE_FORMAT.format(url, b32_address, max_age) return file_obj.geturl() @app.route('/', methods=('GET',)) def file_handler(path): path = Path(path.split('/', 1)[0]) is_file = ''.join(path.suffixes[-2:]) name = path.name[:-len(is_file) or None] if not alphabet_regex.fullmatch(name): abort(404) id = url_encoder.debase(name) if id > FLASK_MAX_INTEGER: abort(404) if is_file: file_obj = File.query.get(id) if file_obj and file_obj.ext == is_file: if file_obj.removed: abort(410) hexdigest_hash = file_obj.sha256.hex() fpath = Path(app.config['FHOST_STORAGE_PATH']) / hexdigest_hash if not fpath.is_file(): abort(404) if app.config['FHOST_USE_X_ACCEL_REDIRECT']: response = make_response() response.headers['Content-Type'] = file_obj.mime response.headers['Content-Length'] = fpath.stat().st_size response.headers['X-Accel-Redirect'] = '/' + str(fpath) return response else: return send_from_directory(app.config['FHOST_STORAGE_PATH'], hexdigest_hash, mimetype=file_obj.mime) else: url_obj = URL.query.get(id) if url_obj: return redirect(url_obj.url) abort(404) @app.route('/', methods=('PUT',)) def upload_handler(filename): data_length = len(request.data) if data_length <= app.config['MAX_CONTENT_LENGTH']: def urlfile(**kwargs): return type('', (), kwargs)() f = urlfile(stream=request.data, content_type='', filename=filename) return store_file(f, 'data') else: abort(413) @app.route('/', methods=('GET', 'POST', 'PUT')) def fhost(): if request.method == 'POST': if 'file' in request.files: return store_file(request.files['file'], 'file', 'v' in request.values) elif 'shorten' in request.form: return shorten(request.form['shorten']) abort(400) elif request.method == 'PUT': message = request.stream.read(app.config['MAX_CONTENT_LENGTH']) def urlfile(**kwargs): return type('', (), kwargs)() f = urlfile(stream=message, content_type='text/plain; charset=utf-8', filename='.txt') return store_file(f, 'data') else: return render_template('index.html') @app.route('/robots.txt') def robots(): return '''User-agent: * Disallow: / ''' @app.errorhandler(400) @app.errorhandler(403) @app.errorhandler(404) @app.errorhandler(410) @app.errorhandler(411) @app.errorhandler(413) @app.errorhandler(MisdirectedRequest) def error_handler(error): try: return render_template(f'{error.code}.html', id=id), error.code except TemplateNotFound: return f'Error: {error.code}\n', error.code alphabet_regex = re.compile(r'[' + app.config['URL_ALPHABET'] + r']{,11}') if __name__ == '__main__': if not path.isfile('fhost.db'): db.create_all() app.run(host='0.0.0.0')