Protect against CSRF

This commit is contained in:
Gabriel Corona 2018-10-02 22:03:29 +02:00
parent cee5eb562a
commit 7abd3f641b
3 changed files with 133 additions and 1 deletions

View file

@ -12,6 +12,7 @@ from gevent.queue import Queue
from geventwebsocket import WebSocketError from geventwebsocket import WebSocketError
from bottle import run, request, response, Bottle, HTTPResponse from bottle import run, request, response, Bottle, HTTPResponse
from bottle import get, post, install, abort, delete, put
from moulinette import msignals, m18n, DATA_DIR from moulinette import msignals, m18n, DATA_DIR
from moulinette.core import MoulinetteError, clean_session from moulinette.core import MoulinetteError, clean_session
@ -27,6 +28,35 @@ logger = log.getLogger('moulinette.interface.api')
# API helpers ---------------------------------------------------------- # API helpers ----------------------------------------------------------
CSRF_TYPES = set(["text/plain",
"application/x-www-form-urlencoded",
"multipart/form-data"])
def is_csrf():
"""Checks is this is a CSRF request."""
if request.method != "POST":
return False
if request.content_type is None:
return True
content_type = request.content_type.lower().split(';')[0]
if content_type not in CSRF_TYPES:
return False
return request.headers.get("X-Requested-With") is None
# Protection against CSRF
def filter_csrf(callback):
def wrapper(*args, **kwargs):
if is_csrf():
abort(403, "CSRF protection")
else:
return callback(*args, **kwargs)
return wrapper
class LogQueues(dict): class LogQueues(dict):
"""Map of session id to queue.""" """Map of session id to queue."""
pass pass
@ -723,6 +753,7 @@ class Interface(BaseInterface):
return callback return callback
# Install plugins # Install plugins
app.install(filter_csrf)
app.install(apiheader) app.install(apiheader)
app.install(api18n) app.install(api18n)
app.install(_ActionsMapPlugin(actionsmap, use_websocket, log_queues)) app.install(_ActionsMapPlugin(actionsmap, use_websocket, log_queues))

View file

@ -30,5 +30,6 @@ setup(name='Moulinette',
'moulinette.interfaces', 'moulinette.interfaces',
'moulinette.utils', 'moulinette.utils',
], ],
data_files=[(LOCALES_DIR, locale_files)] data_files=[(LOCALES_DIR, locale_files)],
tests_require=["pytest", "webtest"],
) )

100
tests/test_api.py Normal file
View file

@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
from webtest import TestApp as WebTestApp
from bottle import Bottle
from moulinette.interfaces.api import filter_csrf
URLENCODED = 'application/x-www-form-urlencoded'
FORMDATA = 'multipart/form-data'
TEXT = 'text/plain'
TYPES = [URLENCODED, FORMDATA, TEXT]
SAFE_METHODS = ["HEAD", "GET", "PUT", "DELETE"]
app = Bottle(autojson=True)
app.install(filter_csrf)
@app.get('/')
def get_hello():
return "Hello World!\n"
@app.post('/')
def post_hello():
return "OK\n"
@app.put('/')
def put_hello():
return "OK\n"
@app.delete('/')
def delete_hello():
return "OK\n"
webtest = WebTestApp(app)
def test_get():
r = webtest.get("/")
assert r.status_code == 200
def test_csrf_post():
r = webtest.post("/", "test", expect_errors=True)
assert r.status_code == 403
def test_post_json():
r = webtest.post("/", "test",
headers=[("Content-Type", "application/json")])
assert r.status_code == 200
def test_csrf_post_text():
r = webtest.post("/", "test",
headers=[("Content-Type", "text/plain")],
expect_errors=True)
assert r.status_code == 403
def test_csrf_post_urlencoded():
r = webtest.post("/", "test",
headers=[("Content-Type",
"application/x-www-form-urlencoded")],
expect_errors=True)
assert r.status_code == 403
def test_csrf_post_form():
r = webtest.post("/", "test",
headers=[("Content-Type", "multipart/form-data")],
expect_errors=True)
assert r.status_code == 403
def test_ok_post_text():
r = webtest.post("/", "test",
headers=[("Content-Type", "text/plain"),
("X-Requested-With", "XMLHttpRequest")])
assert r.status_code == 200
def test_ok_post_urlencoded():
r = webtest.post("/", "test",
headers=[("Content-Type",
"application/x-www-form-urlencoded"),
("X-Requested-With", "XMLHttpRequest")])
assert r.status_code == 200
def test_ok_post_form():
r = webtest.post("/", "test",
headers=[("Content-Type", "multipart/form-data"),
("X-Requested-With", "XMLHttpRequest")])
assert r.status_code == 200