Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions aikido_zen/vulnerabilities/sql_injection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def detect_sql_injection(query, user_input, dialect):
query_l = query.lower()
userinput_l = user_input.lower()
if should_return_early(query_l, userinput_l):
return False
return 0

internals_lib = ctypes.CDLL(get_binary_path())
internals_lib.detect_sql_injection.argtypes = [
Expand Down Expand Up @@ -52,17 +52,12 @@ def detect_sql_injection(query, user_input, dialect):
logger.debug(
"Unable to check for SQL Injection, an error occurred in the library"
)
return False
return 0

# This means that the library failed to tokenize the SQL query
if c_int_res == 3:
logger.debug("Unable to check for SQL Injection, SQL tokenization failed")
return False

return c_int_res == 1
return c_int_res
except Exception as e:
logger.debug("Exception in SQL algo: %s", e)
return False
return 0


def should_return_early(query, user_input):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@
This will check the context of the request for SQL Injections
"""

import os

from aikido_zen.helpers.extract_strings_from_context import extract_strings_from_context
from aikido_zen.vulnerabilities.sql_injection import detect_sql_injection


def should_block_invalid_sql_queries():
env_val = os.environ.get("AIKIDO_BLOCK_INVALID_SQL")
if env_val is None:
return True
return env_val.lower() in ["true", "1"]


def context_contains_sql_injection(sql, operation, context, dialect):
"""
This will check the context of the request for SQL Injections
Expand All @@ -14,7 +23,22 @@ def context_contains_sql_injection(sql, operation, context, dialect):
# Only supports SQL queries that are strings, return otherwise.
return {}
for user_input, path, source in extract_strings_from_context(context):
if detect_sql_injection(sql, user_input, dialect):
result = detect_sql_injection(sql, user_input, dialect)

if result == 1:
return {
"operation": operation,
"kind": "sql_injection",
"source": source,
"pathToPayload": path,
"metadata": {
"sql": sql,
"dialect": dialect,
},
"payload": user_input,
}

if result == 3 and should_block_invalid_sql_queries():
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two branches build and return nearly identical SQL injection result dicts (operation, kind, source, pathToPayload, metadata.sql, metadata.dialect, payload). Consolidate into a single helper to avoid duplicated response construction.

Details

✨ AI Reasoning
​The new code adds two conditional branches that both construct and return a very similar dictionary describing a detected SQL injection. Both branches set the same keys: operation, kind, source, pathToPayload, metadata with sql and dialect, and payload. The second branch adds only one extra metadata field failedToTokenize. This repeats nearly identical business-logic for building the result object in two places, increasing maintenance burden: any future change to the shape of the result would need to be applied in both branches and could lead to inconsistent responses.

🔧 How do I fix it?
Delete extra code. Extract repeated code sequences into reusable functions or methods. Use loops or data structures to eliminate repetitive patterns.

Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info

return {
"operation": operation,
"kind": "sql_injection",
Expand All @@ -23,6 +47,7 @@ def context_contains_sql_injection(sql, operation, context, dialect):
"metadata": {
"sql": sql,
"dialect": dialect,
"failedToTokenize": "true",
},
"payload": user_input,
}
Expand Down
28 changes: 19 additions & 9 deletions aikido_zen/vulnerabilities/sql_injection/init_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,25 @@ def is_sql_injection(sql, input, dialect="all"):
if dialect == "all" or dialect == current:
result = detect_sql_injection(sql, input, current)
assert (
result == True
result == 1
), f"Expected SQL injection for SQL: {sql} and input: {input} in {current} dialect"


def is_invalid_sql(sql, input, dialect="all"):
for current in DIALECTS:
if dialect == "all" or dialect == current:
result = detect_sql_injection(sql, input, current)
assert (
result == 3
), f"Expected failed tokenization for SQL: {sql} and input: {input} in {current} dialect"


def is_not_sql_injection(sql, input, dialect="all"):
for current in DIALECTS:
if dialect == "all" or dialect == current:
result = detect_sql_injection(sql, input, current)
assert (
result == False
result == 0
), f"Expected no SQL injection for SQL: {sql} and input: {input} in {current} dialect"


Expand Down Expand Up @@ -171,9 +180,6 @@ def test_is_not_injection():


def test_allow_escape_sequences():
# Invalid queries :
is_not_sql_injection("SELECT * FROM users WHERE id = 'users\\'", "users\\")

is_not_sql_injection("SELECT * FROM users WHERE id = 'users\\\\'", "users\\\\")
is_not_sql_injection("SELECT * FROM users WHERE id = '\nusers'", "\nusers")
is_not_sql_injection("SELECT * FROM users WHERE id = '\rusers'", "\rusers")
Expand Down Expand Up @@ -212,10 +218,6 @@ def test_check_string_safely_escaped():
is_not_sql_injection(
'SELECT * FROM comments WHERE comment = "I`m writting you"', "I`m writting you"
)
# Invalid query (strings don't terminate)
is_not_sql_injection(
"SELECT * FROM comments WHERE comment = 'I'm writting you'", "I'm writting you"
)
# Positive example of same query :
is_sql_injection(
"SELECT * FROM comments WHERE comment = 'I'm writting you--'",
Expand Down Expand Up @@ -393,6 +395,14 @@ def test_lowercased_input_sql_injection():
"""


def test_block_invalid_sql_queries():
# These are invalid queries (e.g. unterminated strings) that fail tokenization
is_invalid_sql("SELECT * FROM users WHERE id = 'users\\'", "users\\", "mysql")
is_invalid_sql(
"SELECT * FROM comments WHERE comment = 'I'm writting you'", "I'm writting you"
)


def test_function_calls_as_sql_injections():
is_sql_injection("foobar()", "foobar()")
is_sql_injection("foobar(1234567)", "foobar(1234567)")
Expand Down
11 changes: 11 additions & 0 deletions docs/invalid-sql-queries.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Blocking invalid SQL queries

Zen blocks SQL queries that it can't tokenize when they contain user input. This prevents attackers from bypassing SQL injection detection with malformed queries. For example, ClickHouse ignores invalid SQL after `;`, and SQLite runs queries before an unclosed `/*` comment.

This is on by default. In blocking mode, these queries are blocked. In detection-only mode, they are reported but still executed.

If you see false positives (legitimate queries being blocked), disable it with:

```
AIKIDO_BLOCK_INVALID_SQL=false python app.py
```
Loading