Files
synology-api/synology_api/core_user.py

760 lines
24 KiB
Python

"""
Synology Core User API wrapper.
This module provides a Python interface for managing users on Synology NAS devices,
including user creation, modification, deletion, group membership, password policies,
and password expiry.
"""
import json
from typing import Any
from . import base_api
class User(base_api.BaseApi):
"""
Core User API implementation.
Methods
-------
get_users(offset=0, limit=-1, sort_by="name", sort_direction="ASC", additional=[])
Retrieve groups information.
get_user(name, additional=[])
Retrieve user information.
create_user(...)
Create a new user.
modify_user(...)
Modify a user.
delete_user(name)
Delete a user.
affect_groups(name, join_groups=[], leave_groups=[])
Affect or disaffect groups to a user.
affect_groups_status(task_id)
Get the status of a join task.
get_password_policy()
Get the password policy.
set_password_policy(...)
Set the password policy.
get_password_expiry()
Get the password expiry.
set_password_expiry(...)
Set the password expiry.
password_confirm(password)
Confirm password/session.
get_username_policy()
Get the username policy.
Examples
--------
See individual method docstrings for usage examples.
"""
def get_users(
self, offset: int = 0, limit: int = -1, sort_by: str = "name", sort_direction: str = "ASC", additional: list[str] = []
) -> dict[str, object]:
"""
Retrieve groups information.
Parameters
----------
offset : int, optional
The offset of the groups to retrieve. Defaults to `0`.
limit : int, optional
The maximum number of groups to retrieve. Defaults to `-1`.
sort_by : str, optional
Sort by a specific field. Defaults to `"name"`.
sort_direction : str, optional
The sort direction. Defaults to `"ASC"` else `"DESC"`.
additional : list[str], optional
Additional fields to retrieve. Defaults to `[]`.
All fields known are: `["description","email","expired","cannot_chg_passwd","passwd_never_expire","password_last_change", "groups", "2fa_status"]`.
Returns
-------
dict[str, object]
A dictionary containing the groups information.
Examples
--------
```json
{
"data": {
"offset": 0,
"total": 5,
"users": [
{
"description": "System default user",
"email": "",
"expired": "now",
"name": "admin",
"passwd_never_expire": true
},
{
"description": "Guest",
"email": "",
"expired": "now",
"name": "guest",
"passwd_never_expire": true
},
{
"description": "",
"email": "",
"expired": "normal",
"name": "test_api",
"passwd_never_expire": true
},
{
"description": "test description",
"email": "testemail@test.com",
"expired": "normal",
"name": "test_user",
"passwd_never_expire": true
}
]
},
"success": true
}
```
"""
api_name = "SYNO.Core.User"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "list",
"version": info['minVersion'],
"type": "local", # TODO: Test with ldap and parameter "all"
"offset": offset,
"limit": limit,
"sort_by": sort_by,
"sort_direction": sort_direction,
"additional": json.dumps(additional)
}
return self.request_data(api_name, api_path, req_param)
def get_user(self, name: str, additional: list[str] = []) -> dict[str, object]:
"""
Retrieve user information.
Parameters
----------
name : str
The name of the user.
additional : list[str], optional
Additional fields to retrieve. Defaults to `[]`.
All fields known are: `["description","email","expired","cannot_chg_passwd","passwd_never_expire","password_last_change","is_password_pending"]`.
Returns
-------
dict[str, object]
A dictionary containing the user information.
Examples
--------
```json
{
"api": "SYNO.Core.User",
"data": {
"users": [
{
"cannot_chg_passwd": false,
"description": "",
"email": "",
"expired": "normal",
"is_password_pending": false,
"name": "test_api",
"passwd_never_expire": true,
"password_last_change": 19789,
"uid": 1027
}
]
},
"method": "get",
"success": true,
"version": 1
}
```
"""
api_name = "SYNO.Core.User"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "get",
"type": "local",
"version": info['minVersion'],
"name": name,
"additional": json.dumps(additional)
}
return self.request_data(api_name, api_path, req_param)
def create_user(
self, name: str, password: str, description: str = "", email: str = "", expire: str = "never", cannot_chg_passwd: bool = False,
passwd_never_expire: bool = True, notify_by_email: bool = False, send_password: bool = False
) -> dict[str, object]:
"""
Create a new user.
Parameters
----------
name : str
The name of the user.
password : str
The password of the user.
description : str, optional
The description of the user. Defaults to `""`.
email : str, optional
The email of the user. Defaults to `""`.
expire : str, optional
The expiration date of the user. Defaults to `"never"`.
cannot_chg_passwd : bool, optional
Whether the password can be changed. Defaults to `False`.
passwd_never_expire : bool, optional
Whether the password should never expire. Defaults to `True`.
notify_by_email : bool, optional
Whether to notify by email. Defaults to `False`.
send_password : bool, optional
Whether to send the password. Defaults to `False`.
Returns
-------
dict[str, object]
A dictionary containing the user information.
Examples
--------
```json
{
"data":
{
"name":"toto",
"uid": 1030
},
"success": true
}
```
"""
api_name = "SYNO.Core.User"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "create",
"version": info['minVersion'],
"name": name,
"description": description,
"email": email,
"expired": expire,
"cannot_chg_passwd": cannot_chg_passwd,
"passwd_never_expire": passwd_never_expire,
"notify_by_email": notify_by_email,
"send_password": send_password,
}
param_enc = {
"password": password
}
# Using https
if self.session._secure:
req_param.update(param_enc)
# Using http and self encrypted
else:
encrypted_param = self.session.encrypt_params(param_enc)
req_param.update(encrypted_param)
return self.request_data(api_name, api_path, req_param, method="post")
def modify_user(
self, name: str, new_name: str, password: str = "", description: str = "", email: str = "", expire: str = "never", cannot_chg_passwd: bool = False,
passwd_never_expire: bool = True, notify_by_email: bool = False, send_password: bool = False
) -> dict[str, object]:
"""
Modify a user.
Parameters
----------
name : str
The name of the actual user.
new_name : str
The new name of the user.
password : str, optional
The password of the user. Defaults to `""`.
description : str, optional
The description of the user. Defaults to `""`.
email : str, optional
The email of the user. Defaults to `""`.
expire : str, optional
The expiration date of the user. Defaults to `"never"`.
cannot_chg_passwd : bool, optional
Whether the password can be changed. Defaults to `False`.
passwd_never_expire : bool, optional
Whether the password should never expire. Defaults to `True`.
notify_by_email : bool, optional
Whether to notify by email. Defaults to `False`.
send_password : bool, optional
Whether to send the password. Defaults to `False`.
Returns
-------
dict[str, object]
A dictionary containing the user information.
Examples
--------
```json
{
"data":{
"name": "test_user2",
"password_last_change": 20106,
"uid": 1028
},
"success": true
}
```
"""
api_name = "SYNO.Core.User"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "set",
"version": info['minVersion'],
"name": name,
"new_name": new_name,
"description": description,
"email": email,
"expired": expire,
"cannot_chg_passwd": cannot_chg_passwd,
"passwd_never_expire": passwd_never_expire,
"notify_by_email": notify_by_email,
"send_password": send_password
}
# Using https
if self.session._secure:
req_param.update({"password": password})
# Using http and self encrypted
else:
req_param.update(self.session.encrypt_params(
{"password": password}))
return self.request_data(api_name, api_path, req_param, method="post")
def delete_user(self, name: str) -> dict[str, object]:
"""
Delete a user.
Parameters
----------
name : str
The name of the user to delete.
Returns
-------
dict[str, object]
A dictionary containing the user information.
Examples
--------
```json
{
"data": {
"name": "toto",
"uid": 1030
},
"success": true
}
```
"""
api_name = "SYNO.Core.User"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "delete",
"version": info['minVersion'],
"name": name
}
return self.request_data(api_name, api_path, req_param)
def affect_groups(self, name: str, join_groups: list[str] = [], leave_groups: list[str] = []) -> dict[str, object]:
"""
Affect or disaffect groups to a user.
Tip: This request is asynchronous and will return a task id to check the status of the join task. Use `affect_groups_status` func to check the status of the task.
Parameters
----------
name : str
The name of the user.
join_groups : list[str]
The names of the groups to join.
leave_groups : list[str]
The names of the groups to leave.
Returns
-------
dict[str, object]
A dictionary containing the task id to check the status of the join task. Use `affect_groups_status` func to check the status of the task.
Examples
--------
```json
{
"api": "SYNO.Core.User.Group",
"data": {
"task_id": "@administrators/groupbatch1737238746C6723E33"
},
"method": "join",
"success": true,
"version": 1
}
```
"""
api_name = "SYNO.Core.User.Group"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "join",
"version": info['minVersion'],
"join_group": json.dumps(join_groups),
"leave_group": json.dumps(leave_groups),
"name": name
}
return self.request_data(api_name, api_path, req_param)
def affect_groups_status(self, task_id: str):
"""
Get the status of a join task.
Parameters
----------
task_id : str
The task id of the join task.
Returns
-------
dict[str, object]
A dictionary containing the status of the join task.
Examples
--------
```json
{
"data": {
"auto_remove": false,
"data": {
"name": "test_user2",
"pid": 18126,
"progress": 1,
"total": 1,
"uid": 1028
},
"finish": false,
"info": {
"api": "SYNO.Core.User.Group",
"group": "admin",
"method": "join",
"prefix": "groupbatch",
"version": 1
},
"success": true
},
"success": true
}
```
"""
api_name = "SYNO.Core.User.Group"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "join_status",
"version": info['minVersion'],
"task_id": task_id
}
return self.request_data(api_name, api_path, req_param)
def get_password_policy(self) -> dict[str, object]:
"""
Get the password policy.
Returns
-------
dict[str, object]
A dictionary containing the password policy information.
Examples
--------
```json
{
"api": "SYNO.Core.User.PasswordPolicy",
"data": {
"enable_reset_passwd_by_email": false,
"password_must_change": false,
"strong_password": {
"exclude_username": true,
"history_num": 0,
"included_numeric_char": true,
"included_special_char": false,
"min_length": 8,
"min_length_enable": true,
"mixed_case": true
}
},
"method": "get",
"success": true,
"version": 1
}
```
"""
api_name = "SYNO.Core.User.PasswordPolicy"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "get",
"version": info['minVersion']
}
return self.request_data(api_name, api_path, req_param)
def set_password_policy(
self, enable_reset_passwd_by_email: bool = False, password_must_change: bool = False,
exclude_username: bool = True, included_numeric_char: bool = True, included_special_char: bool = False,
min_length: int = 8, min_length_enable: bool = True, mixed_case: bool = True,
exclude_common_password: bool = False, exclude_history: bool = False
) -> dict[str, object]:
"""
Set the password policy.
Parameters
----------
enable_reset_passwd_by_email : bool, optional
Defaults to `False`.
password_must_change : bool, optional
Defaults to `False`.
exclude_username : bool, optional
Defaults to `True`.
included_numeric_char : bool, optional
Defaults to `True`.
included_special_char : bool, optional
Defaults to `False`.
min_length : int, optional
Defaults to `8`.
min_length_enable : bool, optional
Defaults to `True`.
mixed_case : bool, optional
Defaults to `True`.
exclude_common_password : bool, optional
Defaults to `False`.
exclude_history : bool, optional
Defaults to `False`.
Returns
-------
dict[str, object]
A dictionary indicating the success of the operation.
Examples
--------
```json
{
"api": "SYNO.Core.User.PasswordPolicy",
"data": {},
"method": "set",
"success": true,
"version": 1
}
```
"""
api_name = "SYNO.Core.User.PasswordPolicy"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "set",
"version": info['minVersion'],
"enable_reset_passwd_by_email": enable_reset_passwd_by_email,
"password_must_change": password_must_change,
"strong_password": {
"exclude_username": exclude_username,
"included_numeric_char": included_numeric_char,
"included_special_char": included_special_char,
"min_length_enable": min_length_enable,
"min_length": min_length,
"mixed_case": mixed_case,
"exclude_common_password": exclude_common_password,
"exclude_history": exclude_history
}
}
return self.request_data(api_name, api_path, req_param)
def get_password_expiry(self) -> dict[str, object]:
"""
Get the password expiry.
Returns
-------
dict[str, object]
A dictionary containing the password expiry information.
Examples
--------
```json
{
"api": "SYNO.Core.User.PasswordExpiry",
"data": {
"allow_reset_after_expired": true,
"enable_login_prompt": false,
"enable_mail_notification": false,
"mail_notification_days": "",
"min_age_enable": false,
"password_expire_enable": false
},
"method": "get",
"success": true,
"version": 1
}
```
"""
api_name = "SYNO.Core.User.PasswordExpiry"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "get",
"version": info['minVersion']
}
return self.request_data(api_name, api_path, req_param)
def set_password_expiry(
self, password_expire_enable: bool = False, max_age: int = 30, min_age_enable: bool = False, min_age: int = 1,
enable_login_prompt: bool = False, login_prompt_days: int = 1, allow_reset_after_expired: bool = True,
enable_mail_notification: bool = False, never_expired_list: list[str] = []
) -> dict[str, object]:
"""
Set the password expiry.
Parameters
----------
password_expire_enable : bool, optional
Enable password expiry. Defaults to `False`.
max_age : int, optional
Maximum time before password expiry. Defaults to `30`.
min_age_enable : bool, optional
Enable minimum time before password expiry. Defaults to `False`.
min_age : int, optional
Minimum time before password expiry. Defaults to `1`.
enable_login_prompt : bool, optional
Enable login prompt. Defaults to `False`.
login_prompt_days : int, optional
Days before login prompt. Defaults to `1`.
allow_reset_after_expired : bool, optional
Allow reset after password expiry. Defaults to `True`.
enable_mail_notification : bool, optional
Enable mail notification. Defaults to `False`.
never_expired_list : list[str], optional
List of users that should never expire.
Returns
-------
dict[str, object]
A dictionary indicating the success of the operation.
Examples
--------
```json
{
"api": "SYNO.Core.User.PasswordExpiry",
"method": "set",
"success": true,
"version": 1
}
```
"""
api_name = "SYNO.Core.User.PasswordExpiry"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "set",
"version": info["minVersion"],
"password_expire_enable": password_expire_enable,
"max_age": max_age,
"min_age_enable": min_age_enable,
"min_age": min_age,
"enable_login_prompt": enable_login_prompt,
"login_prompt_days": login_prompt_days,
"allow_reset_after_expired": allow_reset_after_expired,
"enable_mail_notification": enable_mail_notification,
"never_expired_list": json.dumps(never_expired_list)
}
return self.request_data(api_name, api_path, req_param)
def password_confirm(self, password: str) -> dict[str, object]:
"""
Confirm password/session to ensure the given password matches the auth of the current session.
Note: This is needed by some APIs as a confirmation method, for example, when creating/modifying a scheduled task with root permissions, seldom needed by end users.
Parameters
----------
password : str
The password with which the session was initiated.
Returns
-------
dict[str, object]
A dictionary containing a `SynoConfirmPWToken`, or an error message.
Examples
--------
```json
{
"data": {
"SynoConfirmPWToken": "xxxxx"
},
"success": true
}
```
"""
api_name = 'SYNO.Core.User.PasswordConfirm'
info = self.core_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'auth'}
# Using https
if self.session._secure:
req_param.update({"password": password})
# Using http and self encrypted
else:
req_param.update(self.session.encrypt_params(
{"password": password}))
return self.request_data(api_name, api_path, req_param, method="post")
def get_username_policy(self) -> dict[str, object]:
"""
Get the username policy (list of usernames that are not usable).
Returns
-------
dict[str, object]
A dictionary containing the username policy information.
Examples
--------
```json
{
"api": "SYNO.Core.User.UsernamePolicy",
"data": ["root", "rootuser", "rootusr", "admin", "administrator", "adm", "adminuser", "adminusr", "user",…],
"method": "get",
"success": true,
"version": 1
}
```
"""
api_name = "SYNO.Core.User.UsernamePolicy"
info = self.core_list[api_name]
api_path = info["path"]
req_param = {
"method": "list",
"version": info['minVersion']
}
return self.request_data(api_name, api_path, req_param)