diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..e9e6a80 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,11 @@ +{ + "python.testing.unittestArgs": [ + "-v", + "-s", + "./tests", + "-p", + "test_*.py" + ], + "python.testing.pytestEnabled": false, + "python.testing.unittestEnabled": true +} \ No newline at end of file diff --git a/README.md b/README.md index f52d011..845aac5 100644 --- a/README.md +++ b/README.md @@ -266,13 +266,14 @@ This wrapper cover the following APIs for now: | SYNO.ActiveBackup.Version | | SYNO.ActiveBackup.Log | -| Core User | -|-------------------------------| -| SYNO.Core.User | -| SYNO.Core.User.Group | -| SYNO.Core.User.PasswordPolicy | -| SYNO.Core.User.PasswordExpiry | -| SYNO.Core.User.PasswordConfirm| +| Core User | +|--------------------------------| +| SYNO.Core.User | +| SYNO.Core.User.Group | +| SYNO.Core.User.PasswordPolicy | +| SYNO.Core.User.PasswordExpiry | +| SYNO.Core.User.PasswordConfirm | +| SYNO.Core.User.UsernamePolicy | | Snapshot Replication | |--------------------------| @@ -591,7 +592,8 @@ DS info with below functions: ### core_user (DSM User Settings) | Functions | Description | |----------------------------|-----------------------------------------------------------------| -| `get_users()` | Retrieve groups information | +| `get_users()` | Retrieve users information | +| `get_user()` | Retrieve user information | | `create_user()` | Create a new user | | `modify_user()` | Modify a user | | `delete_user()` | Delete a user | @@ -602,6 +604,7 @@ DS info with below functions: | `get_password_expiry()` | Get the password expiry | | `set_password_expiry()` | Set the password expiry | | `password_confirm()` | Confirm password match with current logged user | +| `get_username_policy()` | Get the username policy (List of username that are not usable). | ### Virtualization | Functions | diff --git a/synology_api/core_user.py b/synology_api/core_user.py index 19fbaa2..610c4c4 100644 --- a/synology_api/core_user.py +++ b/synology_api/core_user.py @@ -104,6 +104,58 @@ class User(base_api.BaseApi): return self.request_data(api_name, api_path, req_param) + def get_user(self, name: str, additional: list[str] = []) -> dict[str, object] | str: + """Retrieve a user information. + + Args: + name (str): + The name of the user. + additional (list[str], optional): + Additional fields to retrieve. Defaults to `[]`. + All fields known are: `["description","email","expired","cannot_chg_passwd","passwd_never_expire","password_last_change","is_password_pending"]`. + + Returns: + dict|str: + A dictionary containing the user information, or a string in case of an error. + + Example return: + ``` + { + "api": "SYNO.Core.User", + "data": { + "users": [ + { + "cannot_chg_passwd": false, + "description": "", + "email": "", + "expired": "normal", + "is_password_pending": false, + "name": "test_api", + "passwd_never_expire": true, + "password_last_change": 19789, + "uid": 1027 + } + ] + }, + "method": "get", + "success": true, + "version": 1 + } + ``` + """ + api_name = "SYNO.Core.User" + info = self.core_list[api_name] + api_path = info["path"] + req_param = { + "method": "get", + "type": "local", + "version": info['minVersion'], + "name": name, + "additional": json.dumps(additional) + } + + return self.request_data(api_name, api_path, req_param) + def create_user( self, name: str, password: str, description: str = "", email: str = "", expire: str = "never", cannot_chg_passwd: bool = False, passwd_never_expire: bool = True, notify_by_email: bool = False, send_password: bool = False @@ -594,4 +646,32 @@ class User(base_api.BaseApi): else: req_param.update(self.session.encrypt_params({"password": password})) - return self.request_data(api_name, api_path, req_param, method="post") \ No newline at end of file + return self.request_data(api_name, api_path, req_param, method="post") + + + def get_username_policy(self) -> dict[str, object] | str: + """Get the username policy (List of username that are not usable). + + Returns: + dict|str: + A dictionary containing the username policy information, or a string in case of an error. + + Example return: + ``` + { + "api": "SYNO.Core.User.UsernamePolicy", + "data": ["root", "rootuser", "rootusr", "admin", "administrator", "adm", "adminuser", "adminusr", "user",…], + "method": "get", + "success": true, + "version": 1 + } + ``` + """ + api_name = "SYNO.Core.User.UsernamePolicy" + info = self.core_list[api_name] + api_path = info["path"] + req_param = { + "method": "list", + "version": info['minVersion'] + } + return self.request_data(api_name, api_path, req_param) \ No newline at end of file diff --git a/tests/test_core_user.py b/tests/test_core_user.py new file mode 100644 index 0000000..2ac03be --- /dev/null +++ b/tests/test_core_user.py @@ -0,0 +1,233 @@ +import datetime +import json +from unittest import TestCase +import unittest +from synology_api.core_user import User +from synology_api.exceptions import CoreSysInfoError +import os, pathlib, random, string + + +def parse_config(config_path) -> dict[str, str]: + with open(config_path, 'r') as config_file: + config_data = json.load(config_file) + return config_data + +def generate_test_passwords(password_policy, username): + strong_policy = password_policy.get('strong_password', {}) + + correct = generate_correct_password(strong_policy, username) + incorrect = generate_incorrect_password(strong_policy, username) + + return correct, incorrect + +def generate_correct_password(policy, username): + min_length = policy.get('min_length', 8) if policy.get('min_length_enable', False) else 8 + included_numeric = policy.get('included_numeric_char', False) + included_special = policy.get('included_special_char', False) + mixed_case = policy.get('mixed_case', False) + exclude_username = policy.get('exclude_username', False) + + # Characters to be included + required_chars = [] + if included_numeric: + required_chars.append(random.choice(string.digits)) + if mixed_case: + required_chars.append(random.choice(string.ascii_uppercase)) + required_chars.append(random.choice(string.ascii_lowercase)) + else: + required_chars.append(random.choice(string.ascii_letters)) + if included_special: + required_chars.append(random.choice(string.punctuation)) + + # Determine remaining length needed + remaining_length = max(min_length - len(required_chars), 0) + + # Generate allowed characters for remaining + allowed_chars = [] + if included_special: + allowed_chars += list(string.punctuation) + allowed_chars += list(string.ascii_letters) + if included_numeric: + allowed_chars += list(string.digits) + + # Generate remaining characters + remaining_chars = [random.choice(allowed_chars) for _ in range(remaining_length)] + all_chars = required_chars + remaining_chars + random.shuffle(all_chars) + password = ''.join(all_chars) + + # Ensure password does not contain username + if exclude_username and username.lower() in password.lower(): + return generate_correct_password(policy, username) + + return password + +def generate_incorrect_password(policy, username): + min_length = policy.get('min_length', 8) + min_length_enable = policy.get('min_length_enable', False) + included_numeric = policy.get('included_numeric_char', False) + included_special = policy.get('included_special_char', False) + mixed_case = policy.get('mixed_case', False) + exclude_username = policy.get('exclude_username', False) + + # Try to violate min_length first if enabled + if min_length_enable: + # Generate password with min_length -1, try to meet other conditions + new_policy = policy.copy() + new_policy['min_length_enable'] = False # Temporarily disable to generate + password = generate_correct_password(new_policy, username) + password = password[:max(min_length -1, 1)] + if len(password) < min_length: + return password + + # Next, violate included_numeric if enabled + if included_numeric: + # Generate password without numeric + new_policy = policy.copy() + new_policy['included_numeric_char'] = False + password = generate_correct_password(new_policy, username) + if not any(c.isdigit() for c in password): + return password + + # Next, violate mixed_case if enabled + if mixed_case: + # Generate all lowercase + new_policy = policy.copy() + new_policy['mixed_case'] = False + password = generate_correct_password(new_policy, username).lower() + if password.islower(): + return password + + # Next, violate exclude_username if enabled + if exclude_username: + # Include username in password + password = generate_correct_password(policy, username) + insert_pos = random.randint(0, len(password)) + password = password[:insert_pos] + username + password[insert_pos:] + return password + + # Fallback: violate min_length even if not enabled + password = generate_correct_password(policy, username) + return password[:-1] + +def generate_username(): + # Generate 4-6 random lowercase letters + letters = ''.join(random.choice(string.ascii_lowercase) for _ in range(random.randint(4, 6))) + + # Generate 2-4 random digits + numbers = ''.join(random.choice(string.digits) for _ in range(random.randint(2, 4))) + + # Combine letters and numbers + return letters + numbers + +class TestCoreUser(TestCase): + config: dict[str, str] + + def setUp(self): + self.config = parse_config( + os.path.realpath( + os.path.join( + pathlib.Path(__file__).parent.resolve(), + './resources/config-test.json' + ) + ) + ) + self.user = User( + ip_address=self.config["synology_ip"], + port=self.config["synology_port"], + username=self.config["synology_user"], + password=self.config["synology_password"], + secure=bool(self.config["synology_secure"]), + cert_verify=False, + dsm_version=int(self.config["dsm_version"]), + debug=True, + otp_code=self.config["otp_code"] + ) + + def api_response_base_assert(self, response: dict, success: bool = True): + self.assertIsNotNone(response) + if success: + self.assertIsNotNone(response['data']) + self.assertTrue(response['success']) + else: + self.assertIsNone(response['data']) + self.assertIsNotNone(response['error']) + self.assertFalse(response['success']) + + + def test_core_user(self): + + self.assertIsNotNone(self.user) + self.assertIsNotNone(self.user.session) + self.assertIsNotNone(self.user.session.sid) + self.assertIsNot(self.user.session.sid, '') + + password_policy_response = self.user.get_password_policy() + + password_policy = password_policy_response['data'] + + username_policy_response = self.user.get_username_policy() + self.api_response_base_assert(username_policy_response) + + test_username = generate_username() + + + correct_pass, incorrect_pass = generate_test_passwords(password_policy, test_username) + + # Test create user with correct password + create_response = self.user.create_user( + name=test_username, + password=correct_pass, + email=f"{test_username}@test.com" + ) + + self.api_response_base_assert(create_response) + + # Test get user + get_user_response = self.user.get_user(name=test_username) + + self.api_response_base_assert(get_user_response) + self.assertIsNotNone(get_user_response['data']['users'][0]['name']) + self.assertEqual(get_user_response['data']['users'][0]['name'], test_username) + self.assertIsNotNone(get_user_response['data']['users'][0]['name']) + + self.assertIsNotNone(get_user_response['data']['users'][0]['uid']) + self.assertNotEqual(get_user_response['data']['users'][0]['uid'], 0) + + # Test update user + new_test_username = generate_username() + update_response = self.user.modify_user(name=test_username, new_name=new_test_username) + + self.api_response_base_assert(update_response) + + # Delete user + delete_response = self.user.delete_user(name=new_test_username) + self.api_response_base_assert(delete_response) + + # Test create user with incorrect password + try: + create_response = self.user.create_user( + name=test_username, + password=incorrect_pass, + email=f"{test_username}@test.com", + ) + except Exception as e: + self.assertIsNotNone(e) + self.assertIsInstance(e, CoreSysInfoError, "Exception has to be CoreSysInfoError") + + # Test create user with incorrect username + try: + create_response = self.user.create_user( + name='admin', + password=correct_pass + ) + + except Exception as e: + self.assertIsNotNone(e) + self.assertIsInstance(e, CoreSysInfoError, "Exception has to be CoreSysInfoError") + + + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_syno_api.py b/tests/test_syno_api.py index 914ed91..c49718f 100644 --- a/tests/test_syno_api.py +++ b/tests/test_syno_api.py @@ -4,6 +4,7 @@ from unittest import TestCase import unittest from synology_api.filestation import FileStation from synology_api.surveillancestation import SurveillanceStation +import os, pathlib def parse_config(config_path) -> dict[str, str]: @@ -16,7 +17,15 @@ class TestSynoApi(TestCase): config: dict[str, str] def setUp(self): - self.config = parse_config('./resources/config-test.json') + self.config = parse_config( + os.path.realpath( + os.path.join( + pathlib.Path(__file__).parent.resolve(), + './resources/config-test.json' + ) + ) + ) + def test_syno_filestation_login(self): fs = FileStation(ip_address=self.config["synology_ip"], port=self.config["synology_port"],