0xFF/fhost.py

297 lines
8.5 KiB
Python
Executable File

import sys
from hashlib import sha256
from mimetypes import guess_extension
from pathlib import Path
from os import path, chdir
import re
import urllib3
import validators
from flask import Flask, abort, make_response, redirect, request, send_from_directory, url_for, render_template
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from jinja2 import ChoiceLoader, FileSystemLoader
from jinja2.exceptions import *
from magic import Magic
from short_url import UrlEncoder
from werkzeug.exceptions import HTTPException
from urllib.parse import urlparse
chdir(path.dirname(path.abspath(__file__)))
app = Flask(__name__, instance_relative_config=True)
app.config.from_pyfile('config.py')
app.jinja_loader = ChoiceLoader([
FileSystemLoader(str(Path(app.instance_path) / 'templates')),
app.jinja_loader
])
max_content_len = app.config["MAX_CONTENT_LENGTH"]
min_days = app.config['MIN_DAYS']
max_days = app.config['MAX_DAYS']
urllib3.disable_warnings()
FLASK_MAX_INTEGER = 9223372036854775807
class MisdirectedRequest(HTTPException):
code = 421
try:
mimedetect = Magic(mime=True, mime_encoding=False)
except:
print('''Error: You have installed the wrong version of the 'magic' module.
Please install python-magic.''')
sys.exit(1)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
url_encoder = UrlEncoder(alphabet=app.config['URL_ALPHABET'], block_size=16)
class URL(db.Model):
id = db.Column(db.Integer, primary_key=True)
url = db.Column(db.UnicodeText, unique=True)
def __init__(self, url):
self.url = url
def getname(self):
return url_encoder.enbase(self.id, 1)
def geturl(self):
return url_for('file_handler', path=self.getname(), _external=True) + '\n'
def get(url):
found_url = URL.query.filter_by(url=url).first()
if not found_url:
found_url = URL(url)
db.session.add(found_url)
db.session.commit()
return found_url
return found_url
class File(db.Model):
id = db.Column(db.Integer, primary_key=True)
sha256 = db.Column(db.LargeBinary(32))
ext = db.Column(db.UnicodeText)
mime = db.Column(db.UnicodeText)
removed = db.Column(db.Boolean, default=False)
def __init__(self, sha256, ext, mime):
self.sha256 = sha256
self.ext = ext
self.mime = mime
def getname(self):
return u'{0}{1}'.format(url_encoder.enbase(self.id, 1), self.ext)
def geturl(self):
n = self.getname()
return url_for('file_handler', path=n, _external=True) + '\n'
def store(file_data, type):
def get_data():
if type == 'file':
return file_data.stream.read()
elif type == 'data':
return file_data.stream
data = get_data()
hash = sha256(data)
hexdigest_hash = hash.hexdigest()
bin_hash = hash.digest()
def get_mime():
guess = mimedetect.from_buffer(data)
app.logger.debug(f'MIME - specified: "{file_data.content_type}" - detected: "{guess}"')
if not file_data.content_type or not '/' in file_data.content_type \
or file_data.content_type == 'application/octet-stream':
mime = guess
else:
mime = file_data.content_type
if mime.startswith('text/') and not 'charset' in mime:
mime += '; charset=utf-8'
return mime
def get_ext(mime):
ext = ''.join(Path(file_data.filename).suffixes[-2:])
if mime.startswith('text/'):
mime = mime[:mime.find(';')]
if not ext:
if mime in app.config['FHOST_EXT_OVERRIDE']:
ext = app.config['FHOST_EXT_OVERRIDE'][mime]
else:
ext = guess_extension(mime)
if not ext:
return '.bin'
return ext[-app.config['FHOST_MAX_EXT_LENGTH']:]
file_obj = File.query.filter_by(sha256=bin_hash).first()
if file_obj:
if not file_obj.removed:
return file_obj, len(data)
file_obj.removed = False
else:
mime = get_mime()
ext = get_ext(mime)
file_obj = File(bin_hash, ext, mime)
storage = Path(app.config['FHOST_STORAGE_PATH'])
storage.mkdir(parents=True, exist_ok=True)
file_path = storage / hexdigest_hash
if type == 'file':
file_data.stream.seek(0)
file_data.save(file_path)
elif type == 'data':
file_path.touch()
file_path.write_bytes(data)
db.session.add(file_obj)
db.session.commit()
return file_obj, len(data)
def is_fhost_url(url):
return url.startswith(url_for('.fhost', _external=True).rstrip('/'))
def shorten(url):
if len(url) > app.config['MAX_URL_LENGTH']:
abort(414)
if not validators.url(url) or is_fhost_url(url):
abort(400)
url_obj = URL.get(url)
return url_obj.geturl()
VERBOSE_FORMAT = \
'''File address: {}
B32 file address: {}
Expires in: {} days
'''
get_max_age = lambda file_size: int(min_days + (-max_days + min_days) * (file_size / max_content_len - 1) ** 3)
def store_file(f, type, verbose=False):
file_obj, file_size = File.store(f, type)
if verbose:
max_age = get_max_age(file_size)
url = file_obj.geturl().strip()
b32_address = urlparse(url)._replace(netloc=app.config['B32_ADDRESS']).geturl()
return VERBOSE_FORMAT.format(url, b32_address, max_age)
return file_obj.geturl()
@app.route('/<path:path>', methods=('GET',))
def file_handler(path):
path = Path(path.split('/', 1)[0])
is_file = ''.join(path.suffixes[-2:])
name = path.name[:-len(is_file) or None]
if not alphabet_regex.fullmatch(name):
abort(404)
id = url_encoder.debase(name)
if id > FLASK_MAX_INTEGER:
abort(404)
if is_file:
file_obj = File.query.get(id)
if file_obj and file_obj.ext == is_file:
if file_obj.removed:
abort(410)
hexdigest_hash = file_obj.sha256.hex()
fpath = Path(app.config['FHOST_STORAGE_PATH']) / hexdigest_hash
if not fpath.is_file():
abort(404)
if app.config['FHOST_USE_X_ACCEL_REDIRECT']:
response = make_response()
response.headers['Content-Type'] = file_obj.mime
response.headers['Content-Length'] = fpath.stat().st_size
response.headers['X-Accel-Redirect'] = '/' + str(fpath)
return response
else:
return send_from_directory(app.config['FHOST_STORAGE_PATH'], hexdigest_hash, mimetype=file_obj.mime)
else:
url_obj = URL.query.get(id)
if url_obj:
return redirect(url_obj.url)
abort(404)
@app.route('/<filename>', methods=('PUT',))
def upload_handler(filename):
data_length = len(request.data)
if data_length <= app.config['MAX_CONTENT_LENGTH']:
def urlfile(**kwargs):
return type('', (), kwargs)()
f = urlfile(stream=request.data, content_type='', filename=filename)
return store_file(f, 'data')
else:
abort(413)
@app.route('/', methods=('GET', 'POST', 'PUT'))
def fhost():
if request.method == 'POST':
if 'file' in request.files:
return store_file(request.files['file'], 'file', 'v' in request.values)
elif 'shorten' in request.form:
return shorten(request.form['shorten'])
abort(400)
elif request.method == 'PUT':
message = request.stream.read(app.config['MAX_CONTENT_LENGTH'])
def urlfile(**kwargs):
return type('', (), kwargs)()
f = urlfile(stream=message, content_type='text/plain; charset=utf-8', filename='.txt')
return store_file(f, 'data')
else:
return render_template('index.html')
@app.route('/robots.txt')
def robots():
return '''User-agent: *
Disallow: /
'''
@app.errorhandler(400)
@app.errorhandler(403)
@app.errorhandler(404)
@app.errorhandler(410)
@app.errorhandler(411)
@app.errorhandler(413)
@app.errorhandler(MisdirectedRequest)
def error_handler(error):
try:
return render_template(f'{error.code}.html', id=id), error.code
except TemplateNotFound:
return f'Error: {error.code}\n', error.code
alphabet_regex = re.compile(r'[' + app.config['URL_ALPHABET'] + r']{,11}')
if __name__ == '__main__':
if not path.isfile('fhost.db'):
db.create_all()
app.run(host='0.0.0.0')