Generate tests for core_user

Add 2 functions to core_user.py
Update README.md with new functions
Add settings.json of vscode to execute tests (Python Test Explorer)
This commit is contained in:
fboissadier
2025-01-25 01:26:57 +01:00
parent 2f85cf9e80
commit beb7ae2866
5 changed files with 346 additions and 10 deletions

11
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,11 @@
{
"python.testing.unittestArgs": [
"-v",
"-s",
"./tests",
"-p",
"test_*.py"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true
}

View File

@ -266,13 +266,14 @@ This wrapper cover the following APIs for now:
| SYNO.ActiveBackup.Version |
| SYNO.ActiveBackup.Log |
| Core User |
|-------------------------------|
| SYNO.Core.User |
| SYNO.Core.User.Group |
| SYNO.Core.User.PasswordPolicy |
| SYNO.Core.User.PasswordExpiry |
| SYNO.Core.User.PasswordConfirm|
| Core User |
|--------------------------------|
| SYNO.Core.User |
| SYNO.Core.User.Group |
| SYNO.Core.User.PasswordPolicy |
| SYNO.Core.User.PasswordExpiry |
| SYNO.Core.User.PasswordConfirm |
| SYNO.Core.User.UsernamePolicy |
| Snapshot Replication |
|--------------------------|
@ -591,7 +592,8 @@ DS info with below functions:
### core_user (DSM User Settings)
| Functions | Description |
|----------------------------|-----------------------------------------------------------------|
| `get_users()` | Retrieve groups information |
| `get_users()` | Retrieve users information |
| `get_user()` | Retrieve user information |
| `create_user()` | Create a new user |
| `modify_user()` | Modify a user |
| `delete_user()` | Delete a user |
@ -602,6 +604,7 @@ DS info with below functions:
| `get_password_expiry()` | Get the password expiry |
| `set_password_expiry()` | Set the password expiry |
| `password_confirm()` | Confirm password match with current logged user |
| `get_username_policy()` | Get the username policy (List of username that are not usable). |
### Virtualization
| Functions |

View File

@ -104,6 +104,58 @@ class User(base_api.BaseApi):
return self.request_data(api_name, api_path, req_param)
def get_user(self, name: str, additional: list[str] = []) -> dict[str, object] | str:
"""Retrieve a user information.
Args:
name (str):
The name of the user.
additional (list[str], optional):
Additional fields to retrieve. Defaults to `[]`.
All fields known are: `["description","email","expired","cannot_chg_passwd","passwd_never_expire","password_last_change","is_password_pending"]`.
Returns:
dict|str:
A dictionary containing the user information, or a string in case of an error.
Example return:
```
{
"api": "SYNO.Core.User",
"data": {
"users": [
{
"cannot_chg_passwd": false,
"description": "",
"email": "",
"expired": "normal",
"is_password_pending": false,
"name": "test_api",
"passwd_never_expire": true,
"password_last_change": 19789,
"uid": 1027
}
]
},
"method": "get",
"success": true,
"version": 1
}
```
"""
api_name = "SYNO.Core.User"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "get",
"type": "local",
"version": info['minVersion'],
"name": name,
"additional": json.dumps(additional)
}
return self.request_data(api_name, api_path, req_param)
def create_user(
self, name: str, password: str, description: str = "", email: str = "", expire: str = "never", cannot_chg_passwd: bool = False,
passwd_never_expire: bool = True, notify_by_email: bool = False, send_password: bool = False
@ -594,4 +646,32 @@ class User(base_api.BaseApi):
else:
req_param.update(self.session.encrypt_params({"password": password}))
return self.request_data(api_name, api_path, req_param, method="post")
return self.request_data(api_name, api_path, req_param, method="post")
def get_username_policy(self) -> dict[str, object] | str:
"""Get the username policy (List of username that are not usable).
Returns:
dict|str:
A dictionary containing the username policy information, or a string in case of an error.
Example return:
```
{
"api": "SYNO.Core.User.UsernamePolicy",
"data": ["root", "rootuser", "rootusr", "admin", "administrator", "adm", "adminuser", "adminusr", "user",…],
"method": "get",
"success": true,
"version": 1
}
```
"""
api_name = "SYNO.Core.User.UsernamePolicy"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "list",
"version": info['minVersion']
}
return self.request_data(api_name, api_path, req_param)

233
tests/test_core_user.py Normal file
View File

@ -0,0 +1,233 @@
import datetime
import json
from unittest import TestCase
import unittest
from synology_api.core_user import User
from synology_api.exceptions import CoreSysInfoError
import os, pathlib, random, string
def parse_config(config_path) -> dict[str, str]:
with open(config_path, 'r') as config_file:
config_data = json.load(config_file)
return config_data
def generate_test_passwords(password_policy, username):
strong_policy = password_policy.get('strong_password', {})
correct = generate_correct_password(strong_policy, username)
incorrect = generate_incorrect_password(strong_policy, username)
return correct, incorrect
def generate_correct_password(policy, username):
min_length = policy.get('min_length', 8) if policy.get('min_length_enable', False) else 8
included_numeric = policy.get('included_numeric_char', False)
included_special = policy.get('included_special_char', False)
mixed_case = policy.get('mixed_case', False)
exclude_username = policy.get('exclude_username', False)
# Characters to be included
required_chars = []
if included_numeric:
required_chars.append(random.choice(string.digits))
if mixed_case:
required_chars.append(random.choice(string.ascii_uppercase))
required_chars.append(random.choice(string.ascii_lowercase))
else:
required_chars.append(random.choice(string.ascii_letters))
if included_special:
required_chars.append(random.choice(string.punctuation))
# Determine remaining length needed
remaining_length = max(min_length - len(required_chars), 0)
# Generate allowed characters for remaining
allowed_chars = []
if included_special:
allowed_chars += list(string.punctuation)
allowed_chars += list(string.ascii_letters)
if included_numeric:
allowed_chars += list(string.digits)
# Generate remaining characters
remaining_chars = [random.choice(allowed_chars) for _ in range(remaining_length)]
all_chars = required_chars + remaining_chars
random.shuffle(all_chars)
password = ''.join(all_chars)
# Ensure password does not contain username
if exclude_username and username.lower() in password.lower():
return generate_correct_password(policy, username)
return password
def generate_incorrect_password(policy, username):
min_length = policy.get('min_length', 8)
min_length_enable = policy.get('min_length_enable', False)
included_numeric = policy.get('included_numeric_char', False)
included_special = policy.get('included_special_char', False)
mixed_case = policy.get('mixed_case', False)
exclude_username = policy.get('exclude_username', False)
# Try to violate min_length first if enabled
if min_length_enable:
# Generate password with min_length -1, try to meet other conditions
new_policy = policy.copy()
new_policy['min_length_enable'] = False # Temporarily disable to generate
password = generate_correct_password(new_policy, username)
password = password[:max(min_length -1, 1)]
if len(password) < min_length:
return password
# Next, violate included_numeric if enabled
if included_numeric:
# Generate password without numeric
new_policy = policy.copy()
new_policy['included_numeric_char'] = False
password = generate_correct_password(new_policy, username)
if not any(c.isdigit() for c in password):
return password
# Next, violate mixed_case if enabled
if mixed_case:
# Generate all lowercase
new_policy = policy.copy()
new_policy['mixed_case'] = False
password = generate_correct_password(new_policy, username).lower()
if password.islower():
return password
# Next, violate exclude_username if enabled
if exclude_username:
# Include username in password
password = generate_correct_password(policy, username)
insert_pos = random.randint(0, len(password))
password = password[:insert_pos] + username + password[insert_pos:]
return password
# Fallback: violate min_length even if not enabled
password = generate_correct_password(policy, username)
return password[:-1]
def generate_username():
# Generate 4-6 random lowercase letters
letters = ''.join(random.choice(string.ascii_lowercase) for _ in range(random.randint(4, 6)))
# Generate 2-4 random digits
numbers = ''.join(random.choice(string.digits) for _ in range(random.randint(2, 4)))
# Combine letters and numbers
return letters + numbers
class TestCoreUser(TestCase):
config: dict[str, str]
def setUp(self):
self.config = parse_config(
os.path.realpath(
os.path.join(
pathlib.Path(__file__).parent.resolve(),
'./resources/config-test.json'
)
)
)
self.user = User(
ip_address=self.config["synology_ip"],
port=self.config["synology_port"],
username=self.config["synology_user"],
password=self.config["synology_password"],
secure=bool(self.config["synology_secure"]),
cert_verify=False,
dsm_version=int(self.config["dsm_version"]),
debug=True,
otp_code=self.config["otp_code"]
)
def api_response_base_assert(self, response: dict, success: bool = True):
self.assertIsNotNone(response)
if success:
self.assertIsNotNone(response['data'])
self.assertTrue(response['success'])
else:
self.assertIsNone(response['data'])
self.assertIsNotNone(response['error'])
self.assertFalse(response['success'])
def test_core_user(self):
self.assertIsNotNone(self.user)
self.assertIsNotNone(self.user.session)
self.assertIsNotNone(self.user.session.sid)
self.assertIsNot(self.user.session.sid, '')
password_policy_response = self.user.get_password_policy()
password_policy = password_policy_response['data']
username_policy_response = self.user.get_username_policy()
self.api_response_base_assert(username_policy_response)
test_username = generate_username()
correct_pass, incorrect_pass = generate_test_passwords(password_policy, test_username)
# Test create user with correct password
create_response = self.user.create_user(
name=test_username,
password=correct_pass,
email=f"{test_username}@test.com"
)
self.api_response_base_assert(create_response)
# Test get user
get_user_response = self.user.get_user(name=test_username)
self.api_response_base_assert(get_user_response)
self.assertIsNotNone(get_user_response['data']['users'][0]['name'])
self.assertEqual(get_user_response['data']['users'][0]['name'], test_username)
self.assertIsNotNone(get_user_response['data']['users'][0]['name'])
self.assertIsNotNone(get_user_response['data']['users'][0]['uid'])
self.assertNotEqual(get_user_response['data']['users'][0]['uid'], 0)
# Test update user
new_test_username = generate_username()
update_response = self.user.modify_user(name=test_username, new_name=new_test_username)
self.api_response_base_assert(update_response)
# Delete user
delete_response = self.user.delete_user(name=new_test_username)
self.api_response_base_assert(delete_response)
# Test create user with incorrect password
try:
create_response = self.user.create_user(
name=test_username,
password=incorrect_pass,
email=f"{test_username}@test.com",
)
except Exception as e:
self.assertIsNotNone(e)
self.assertIsInstance(e, CoreSysInfoError, "Exception has to be CoreSysInfoError")
# Test create user with incorrect username
try:
create_response = self.user.create_user(
name='admin',
password=correct_pass
)
except Exception as e:
self.assertIsNotNone(e)
self.assertIsInstance(e, CoreSysInfoError, "Exception has to be CoreSysInfoError")
if __name__ == '__main__':
unittest.main()

View File

@ -4,6 +4,7 @@ from unittest import TestCase
import unittest
from synology_api.filestation import FileStation
from synology_api.surveillancestation import SurveillanceStation
import os, pathlib
def parse_config(config_path) -> dict[str, str]:
@ -16,7 +17,15 @@ class TestSynoApi(TestCase):
config: dict[str, str]
def setUp(self):
self.config = parse_config('./resources/config-test.json')
self.config = parse_config(
os.path.realpath(
os.path.join(
pathlib.Path(__file__).parent.resolve(),
'./resources/config-test.json'
)
)
)
def test_syno_filestation_login(self):
fs = FileStation(ip_address=self.config["synology_ip"], port=self.config["synology_port"],