297 lines
8.5 KiB
Python
Executable File
297 lines
8.5 KiB
Python
Executable File
import sys
|
|
from hashlib import sha256
|
|
from mimetypes import guess_extension
|
|
from pathlib import Path
|
|
from os import path, chdir
|
|
import re
|
|
import urllib3
|
|
import validators
|
|
|
|
from flask import Flask, abort, make_response, redirect, request, send_from_directory, url_for, render_template
|
|
from flask_migrate import Migrate
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from jinja2 import ChoiceLoader, FileSystemLoader
|
|
from jinja2.exceptions import *
|
|
from magic import Magic
|
|
from short_url import UrlEncoder
|
|
from werkzeug.exceptions import HTTPException
|
|
from urllib.parse import urlparse
|
|
|
|
chdir(path.dirname(path.abspath(__file__)))
|
|
app = Flask(__name__, instance_relative_config=True)
|
|
|
|
app.config.from_pyfile('config.py')
|
|
app.jinja_loader = ChoiceLoader([
|
|
FileSystemLoader(str(Path(app.instance_path) / 'templates')),
|
|
app.jinja_loader
|
|
])
|
|
max_content_len = app.config["MAX_CONTENT_LENGTH"]
|
|
min_days = app.config['MIN_DAYS']
|
|
max_days = app.config['MAX_DAYS']
|
|
urllib3.disable_warnings()
|
|
|
|
FLASK_MAX_INTEGER = 9223372036854775807
|
|
|
|
|
|
class MisdirectedRequest(HTTPException):
|
|
code = 421
|
|
|
|
|
|
try:
|
|
mimedetect = Magic(mime=True, mime_encoding=False)
|
|
except:
|
|
print('''Error: You have installed the wrong version of the 'magic' module.
|
|
Please install python-magic.''')
|
|
sys.exit(1)
|
|
|
|
db = SQLAlchemy(app)
|
|
migrate = Migrate(app, db)
|
|
|
|
url_encoder = UrlEncoder(alphabet=app.config['URL_ALPHABET'], block_size=16)
|
|
|
|
|
|
class URL(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
url = db.Column(db.UnicodeText, unique=True)
|
|
|
|
def __init__(self, url):
|
|
self.url = url
|
|
|
|
def getname(self):
|
|
return url_encoder.enbase(self.id, 1)
|
|
|
|
def geturl(self):
|
|
return url_for('file_handler', path=self.getname(), _external=True) + '\n'
|
|
|
|
def get(url):
|
|
found_url = URL.query.filter_by(url=url).first()
|
|
|
|
if not found_url:
|
|
found_url = URL(url)
|
|
db.session.add(found_url)
|
|
db.session.commit()
|
|
return found_url
|
|
return found_url
|
|
|
|
|
|
class File(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
sha256 = db.Column(db.LargeBinary(32))
|
|
ext = db.Column(db.UnicodeText)
|
|
mime = db.Column(db.UnicodeText)
|
|
removed = db.Column(db.Boolean, default=False)
|
|
|
|
def __init__(self, sha256, ext, mime):
|
|
self.sha256 = sha256
|
|
self.ext = ext
|
|
self.mime = mime
|
|
|
|
def getname(self):
|
|
return u'{0}{1}'.format(url_encoder.enbase(self.id, 1), self.ext)
|
|
|
|
def geturl(self):
|
|
n = self.getname()
|
|
return url_for('file_handler', path=n, _external=True) + '\n'
|
|
|
|
def store(file_data, type):
|
|
def get_data():
|
|
if type == 'file':
|
|
return file_data.stream.read()
|
|
elif type == 'data':
|
|
return file_data.stream
|
|
|
|
data = get_data()
|
|
hash = sha256(data)
|
|
hexdigest_hash = hash.hexdigest()
|
|
bin_hash = hash.digest()
|
|
|
|
def get_mime():
|
|
guess = mimedetect.from_buffer(data)
|
|
app.logger.debug(f'MIME - specified: "{file_data.content_type}" - detected: "{guess}"')
|
|
|
|
if not file_data.content_type or not '/' in file_data.content_type \
|
|
or file_data.content_type == 'application/octet-stream':
|
|
mime = guess
|
|
else:
|
|
mime = file_data.content_type
|
|
|
|
if mime.startswith('text/') and not 'charset' in mime:
|
|
mime += '; charset=utf-8'
|
|
|
|
return mime
|
|
|
|
def get_ext(mime):
|
|
ext = ''.join(Path(file_data.filename).suffixes[-2:])
|
|
if mime.startswith('text/'):
|
|
mime = mime[:mime.find(';')]
|
|
|
|
if not ext:
|
|
if mime in app.config['FHOST_EXT_OVERRIDE']:
|
|
ext = app.config['FHOST_EXT_OVERRIDE'][mime]
|
|
else:
|
|
ext = guess_extension(mime)
|
|
if not ext:
|
|
return '.bin'
|
|
return ext[-app.config['FHOST_MAX_EXT_LENGTH']:]
|
|
|
|
file_obj = File.query.filter_by(sha256=bin_hash).first()
|
|
|
|
if file_obj:
|
|
if not file_obj.removed:
|
|
return file_obj, len(data)
|
|
file_obj.removed = False
|
|
else:
|
|
mime = get_mime()
|
|
ext = get_ext(mime)
|
|
file_obj = File(bin_hash, ext, mime)
|
|
|
|
storage = Path(app.config['FHOST_STORAGE_PATH'])
|
|
storage.mkdir(parents=True, exist_ok=True)
|
|
file_path = storage / hexdigest_hash
|
|
|
|
if type == 'file':
|
|
file_data.stream.seek(0)
|
|
file_data.save(file_path)
|
|
elif type == 'data':
|
|
file_path.touch()
|
|
file_path.write_bytes(data)
|
|
|
|
db.session.add(file_obj)
|
|
db.session.commit()
|
|
return file_obj, len(data)
|
|
|
|
|
|
def is_fhost_url(url):
|
|
return url.startswith(url_for('.fhost', _external=True).rstrip('/'))
|
|
|
|
|
|
def shorten(url):
|
|
if len(url) > app.config['MAX_URL_LENGTH']:
|
|
abort(414)
|
|
|
|
if not validators.url(url) or is_fhost_url(url):
|
|
abort(400)
|
|
|
|
url_obj = URL.get(url)
|
|
return url_obj.geturl()
|
|
|
|
|
|
VERBOSE_FORMAT = \
|
|
'''File address: {}
|
|
B32 file address: {}
|
|
Expires in: {} days
|
|
'''
|
|
|
|
get_max_age = lambda file_size: int(min_days + (-max_days + min_days) * (file_size / max_content_len - 1) ** 3)
|
|
|
|
|
|
def store_file(f, type, verbose=False):
|
|
file_obj, file_size = File.store(f, type)
|
|
if verbose:
|
|
max_age = get_max_age(file_size)
|
|
url = file_obj.geturl().strip()
|
|
b32_address = urlparse(url)._replace(netloc=app.config['B32_ADDRESS']).geturl()
|
|
return VERBOSE_FORMAT.format(url, b32_address, max_age)
|
|
return file_obj.geturl()
|
|
|
|
|
|
@app.route('/<path:path>', methods=('GET',))
|
|
def file_handler(path):
|
|
path = Path(path.split('/', 1)[0])
|
|
is_file = ''.join(path.suffixes[-2:])
|
|
name = path.name[:-len(is_file) or None]
|
|
if not alphabet_regex.fullmatch(name):
|
|
abort(404)
|
|
id = url_encoder.debase(name)
|
|
if id > FLASK_MAX_INTEGER:
|
|
abort(404)
|
|
|
|
if is_file:
|
|
file_obj = File.query.get(id)
|
|
if file_obj and file_obj.ext == is_file:
|
|
if file_obj.removed:
|
|
abort(410)
|
|
hexdigest_hash = file_obj.sha256.hex()
|
|
fpath = Path(app.config['FHOST_STORAGE_PATH']) / hexdigest_hash
|
|
|
|
if not fpath.is_file():
|
|
abort(404)
|
|
|
|
if app.config['FHOST_USE_X_ACCEL_REDIRECT']:
|
|
response = make_response()
|
|
response.headers['Content-Type'] = file_obj.mime
|
|
response.headers['Content-Length'] = fpath.stat().st_size
|
|
response.headers['X-Accel-Redirect'] = '/' + str(fpath)
|
|
return response
|
|
else:
|
|
return send_from_directory(app.config['FHOST_STORAGE_PATH'], hexdigest_hash, mimetype=file_obj.mime)
|
|
else:
|
|
url_obj = URL.query.get(id)
|
|
if url_obj:
|
|
return redirect(url_obj.url)
|
|
|
|
abort(404)
|
|
|
|
|
|
@app.route('/<filename>', methods=('PUT',))
|
|
def upload_handler(filename):
|
|
data_length = len(request.data)
|
|
|
|
if data_length <= app.config['MAX_CONTENT_LENGTH']:
|
|
def urlfile(**kwargs):
|
|
return type('', (), kwargs)()
|
|
|
|
f = urlfile(stream=request.data, content_type='', filename=filename)
|
|
return store_file(f, 'data')
|
|
else:
|
|
abort(413)
|
|
|
|
|
|
@app.route('/', methods=('GET', 'POST', 'PUT'))
|
|
def fhost():
|
|
if request.method == 'POST':
|
|
if 'file' in request.files:
|
|
return store_file(request.files['file'], 'file', 'v' in request.values)
|
|
elif 'shorten' in request.form:
|
|
return shorten(request.form['shorten'])
|
|
abort(400)
|
|
elif request.method == 'PUT':
|
|
message = request.stream.read(app.config['MAX_CONTENT_LENGTH'])
|
|
|
|
def urlfile(**kwargs):
|
|
return type('', (), kwargs)()
|
|
|
|
f = urlfile(stream=message, content_type='text/plain; charset=utf-8', filename='.txt')
|
|
return store_file(f, 'data')
|
|
else:
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route('/robots.txt')
|
|
def robots():
|
|
return '''User-agent: *
|
|
Disallow: /
|
|
'''
|
|
|
|
|
|
@app.errorhandler(400)
|
|
@app.errorhandler(403)
|
|
@app.errorhandler(404)
|
|
@app.errorhandler(410)
|
|
@app.errorhandler(411)
|
|
@app.errorhandler(413)
|
|
@app.errorhandler(MisdirectedRequest)
|
|
def error_handler(error):
|
|
try:
|
|
return render_template(f'{error.code}.html', id=id), error.code
|
|
except TemplateNotFound:
|
|
return f'Error: {error.code}\n', error.code
|
|
|
|
|
|
alphabet_regex = re.compile(r'[' + app.config['URL_ALPHABET'] + r']{,11}')
|
|
|
|
if __name__ == '__main__':
|
|
if not path.isfile('fhost.db'):
|
|
db.create_all()
|
|
app.run(host='0.0.0.0')
|