Files
synology-api/synology_api/auth.py

942 lines
34 KiB
Python

"""Provides authentication and API request handling for Synology DSM, including session management, encryption utilities, and error handling for various Synology services."""
from __future__ import annotations
from random import randint
from typing import Optional
import requests
import json
from .error_codes import error_codes, CODE_SUCCESS, download_station_error_codes, file_station_error_codes
from .error_codes import auth_error_codes, virtualization_error_codes
from urllib3 import disable_warnings
from urllib3.exceptions import InsecureRequestWarning
from .exceptions import CoreError
from .exceptions import SynoConnectionError, HTTPError, JSONDecodeError, LoginError, LogoutError, DownloadStationError
from .exceptions import FileStationError, AudioStationError, ActiveBackupError, ActiveBackupMicrosoftError, VirtualizationError, BackupError
from .exceptions import CertificateError, CloudSyncError, DHCPServerError, DirectoryServerError, DockerError, DriveAdminError
from .exceptions import LogCenterError, NoteStationError, OAUTHError, PhotosError, SecurityAdvisorError, TaskSchedulerError, EventSchedulerError
from .exceptions import UniversalSearchError, USBCopyError, VPNError, CoreSysInfoError, UndefinedError
import hashlib
from os import urandom
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes
from cryptography.hazmat.primitives.asymmetric import padding, rsa
import base64
import hashlib
import urllib
USE_EXCEPTIONS: bool = True
class Authentication:
"""
Handles authentication and API requests for Synology DSM.
Parameters
----------
ip_address : str
The IP address of the Synology device.
port : str
The port to connect to.
username : str
The username for authentication.
password : str
The password for authentication.
secure : bool, optional
Whether to use HTTPS (default is False).
cert_verify : bool, optional
Whether to verify SSL certificates (default is False).
dsm_version : int, optional
DSM API version (default is 7).
debug : bool, optional
Enable debug output (default is True).
otp_code : str, optional
One-time password for 2FA.
device_id : str, optional
Device ID for device binding.
device_name : str, optional
Device name for device binding.
"""
def __init__(self,
ip_address: str,
port: str,
username: str,
password: str,
secure: bool = False,
cert_verify: bool = False,
dsm_version: int = 7,
debug: bool = True,
otp_code: Optional[str] = None,
device_id: Optional[str] = None,
device_name: Optional[str] = None
) -> None:
"""
Initialize the Authentication object for Synology DSM.
Parameters
----------
ip_address : str
The IP address of the Synology device.
port : str
The port to connect to.
username : str
The username for authentication.
password : str
The password for authentication.
secure : bool, optional
Whether to use HTTPS (default is False).
cert_verify : bool, optional
Whether to verify SSL certificates (default is False).
dsm_version : int, optional
DSM API version (default is 7).
debug : bool, optional
Enable debug output (default is True).
otp_code : str, optional
One-time password for 2FA (default is None).
device_id : str, optional
Device ID for device binding (default is None).
device_name : str, optional
Device name for device binding (default is None).
Returns
-------
None
Just setter, no return values.
"""
self._ip_address: str = ip_address
self._port: str = port
self._username: str = username
self._password: str = password
self._secure: bool = secure
self._sid: Optional[str] = None
self._syno_token: Optional[str] = None
self._session_expire: bool = True
self._verify: bool = cert_verify
self._version: int = dsm_version
self._debug: bool = debug
self._otp_code: Optional[str] = otp_code
self._device_id: Optional[str] = device_id
self._device_name: Optional[str] = device_name
if self._verify is False:
disable_warnings(InsecureRequestWarning)
schema = 'https' if secure else 'http'
self._base_url = '%s://%s:%s/webapi/' % (
schema, self._ip_address, self._port)
self.full_api_list = {}
self.app_api_list = {}
def verify_cert_enabled(self) -> bool:
"""
Check if SSL certificate verification is enabled.
Returns
-------
bool
True if certificate verification is enabled, False otherwise.
"""
return self._verify
def login(self) -> None:
"""
Log in to the Synology DSM and obtain a session ID and token.
Raises
------
SynoConnectionError
If a connection error occurs.
HTTPError
If an HTTP error occurs.
JSONDecodeError
If the response cannot be decoded as JSON.
LoginError
If login fails due to an API error.
"""
login_api = 'auth.cgi'
params = {'api': "SYNO.API.Auth", 'version': self._version,
'method': 'login', 'enable_syno_token': 'yes', 'client': 'browser'}
params_enc = {
'account': self._username,
'enable_device_token': 'no',
'logintype': 'local',
'otp_code': '',
'rememberme': 0,
'passwd': self._password,
'session': 'webui', # Hardcoded for handle non administrator users API usage
'format': 'cookie'
}
if self._secure:
params.update(params_enc)
else:
encrypted_params = self.encrypt_params(params_enc)
params.update(encrypted_params)
if self._otp_code:
params['otp_code'] = self._otp_code
if self._device_id is not None and self._device_name is not None:
params['device_id'] = self._device_id
params['device_name'] = self._device_name
if self._device_id is not None and self._device_name is None or self._device_id is None and self._device_name is not None:
print("device_id and device_name must be set together")
if not self._session_expire and self._sid is not None:
self._session_expire = False
if self._debug is True:
print('User already logged in')
else:
# Check request for error:
session_request_json: dict[str, object] = {}
if USE_EXCEPTIONS:
try:
session_request = requests.post(
self._base_url + login_api, data=params, verify=self._verify)
session_request.raise_for_status()
session_request_json = session_request.json()
except requests.exceptions.ConnectionError as e:
raise SynoConnectionError(error_message=e.args[0])
except requests.exceptions.HTTPError as e:
raise HTTPError(error_message=str(e.args))
except requests.exceptions.JSONDecodeError as e:
raise JSONDecodeError(error_message=str(e.args))
else:
# Will raise its own errors:
session_request = requests.post(
self._base_url + login_api, data=params, verify=self._verify)
session_request_json = session_request.json()
# Check dsm response for error:
error_code = self._get_error_code(session_request_json)
if not error_code:
self._sid = session_request_json['data']['sid']
self._syno_token = session_request_json['data']['synotoken']
self._session_expire = False
if self._debug is True:
print('User logged in, new session started!')
else:
self._sid = None
if self._debug is True:
print('Login failed: ' +
self._get_error_message(error_code, 'Auth'))
if USE_EXCEPTIONS:
raise LoginError(error_code=error_code)
return
def logout(self) -> None:
"""
Log out from the Synology DSM and invalidate the session.
Raises
------
SynoConnectionError
If a connection error occurs.
HTTPError
If an HTTP error occurs.
JSONDecodeError
If the response cannot be decoded as JSON.
LogoutError
If logout fails due to an API error.
"""
logout_api = 'auth.cgi?api=SYNO.API.Auth'
param = {'version': self._version,
'method': 'logout', 'session': 'webui'}
if USE_EXCEPTIONS:
try:
response = requests.get(
self._base_url + logout_api, param, verify=self._verify)
response.raise_for_status()
response_json = response.json()
error_code = self._get_error_code(response_json)
except requests.exceptions.ConnectionError as e:
raise SynoConnectionError(error_message=e.args[0])
except requests.exceptions.HTTPError as e:
raise HTTPError(error_message=str(e.args))
except requests.exceptions.JSONDecodeError as e:
raise JSONDecodeError(error_message=str(e.args))
else:
response = requests.get(
self._base_url + logout_api, param, verify=self._verify)
error_code = self._get_error_code(response.json())
self._session_expire = True
self._sid = None
if self._debug is True:
if not error_code:
print('Successfully logged out.')
else:
print('Logout failed: ' +
self._get_error_message(error_code, 'Auth'))
if USE_EXCEPTIONS and error_code:
raise LogoutError(error_code=error_code)
return
def get_api_list(self, app: Optional[str] = None) -> None:
"""
Retrieve the list of available APIs from the Synology DSM.
Parameters
----------
app : str, optional
Filter APIs by application name.
Raises
------
SynoConnectionError
If a connection error occurs.
HTTPError
If an HTTP error occurs.
JSONDecodeError
If the response cannot be decoded as JSON.
"""
query_path = 'query.cgi?api=SYNO.API.Info'
list_query = {'version': '1', 'method': 'query', 'query': 'all'}
if USE_EXCEPTIONS:
# Check request for error, and raise our own error.:
try:
response = requests.get(
self._base_url + query_path, list_query, verify=self._verify)
response.raise_for_status()
response_json = response.json()
except requests.exceptions.ConnectionError as e:
raise SynoConnectionError(error_message=e.args[0])
except requests.exceptions.HTTPError as e:
raise HTTPError(error_message=str(e.args))
except requests.JSONDecodeError as e:
raise JSONDecodeError(error_message=str(e.args))
else:
# Will raise its own errors:
response_json = requests.get(
self._base_url + query_path, list_query, verify=self._verify).json()
if app is not None:
for key in response_json['data']:
if app.lower() in key.lower():
self.app_api_list[key] = response_json['data'][key]
else:
self.full_api_list = response_json['data']
return
def show_api_name_list(self) -> None:
"""Print the list of available API names."""
prev_key = ''
for key in self.full_api_list:
if key != prev_key:
print(key)
prev_key = key
return
def show_json_response_type(self) -> None:
"""Print API names that return JSON data."""
for key in self.full_api_list:
for sub_key in self.full_api_list[key]:
if sub_key == 'requestFormat':
if self.full_api_list[key]['requestFormat'] == 'JSON':
print(key + ' Returns JSON data')
return
def search_by_app(self, app: str) -> None:
"""
Search and print API names containing the specified application name.
Parameters
----------
app : str
Application name to search for.
"""
print_check = 0
for key in self.full_api_list:
if app.lower() in key.lower():
print(key)
print_check += 1
continue
if print_check == 0:
print('Not Found')
return
def _random_AES_passphrase(self, length):
"""
Generate a random passphrase for AES encryption.
Parameters
----------
length : int
Length of the passphrase.
Returns
-------
bytes
Randomly generated passphrase.
"""
available = ('0123456789'
'abcdefghijklmnopqrstuvwxyz'
'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
'~!@#$%^&*()_+-/')
key = b''
while length > 0:
key += available[randint(0, len(available) - 1)].encode('utf-8')
length -= 1
return key
def _get_enc_info(self):
"""
Retrieve encryption information from the Synology API.
Returns
-------
dict
Encryption information including public key and cipher details.
"""
api_name = 'SYNO.API.Encryption'
req_params = {
"method": "getinfo",
"version": 1,
"format": "module"
}
response = self.request_data(api_name, "encryption.cgi", req_params)
return response["data"]
def _encrypt_RSA(self, modulus, passphrase, text):
"""
Encrypt text using RSA public key encryption.
Parameters
----------
modulus : int
RSA modulus.
passphrase : int
RSA public exponent.
text : str or bytes
Text to encrypt.
Returns
-------
bytes
Encrypted ciphertext.
"""
public_numbers = rsa.RSAPublicNumbers(passphrase, modulus)
public_key = public_numbers.public_key(default_backend())
if isinstance(text, str):
text = text.encode('utf-8')
ciphertext = public_key.encrypt(
text,
padding.PKCS1v15()
)
return ciphertext
def _encrypt_AES(self, passphrase, text):
"""
Encrypt text using AES encryption.
Parameters
----------
passphrase : bytes
AES passphrase.
text : str
Text to encrypt.
Returns
-------
bytes
Encrypted ciphertext.
"""
cipher = AESCipher(passphrase)
return cipher.encrypt(text)
def encrypt_params(self, params):
"""
Encrypt login parameters using RSA and AES.
Parameters
----------
params : dict
Parameters to encrypt.
Returns
-------
dict
Encrypted parameters suitable for login.
"""
enc_info = self._get_enc_info()
public_key = enc_info["public_key"]
cipher_key = enc_info["cipherkey"]
cipher_token = enc_info["ciphertoken"]
server_time = enc_info["server_time"]
random_passphrase = self._random_AES_passphrase(501)
params[cipher_token] = server_time
encrypted_passphrase = self._encrypt_RSA(int(public_key, 16),
int("10001", 16),
random_passphrase)
encrypted_params = self._encrypt_AES(random_passphrase,
urllib.parse.urlencode(params))
enc_params = {
"rsa": base64.b64encode(encrypted_passphrase).decode("utf-8"),
"aes": base64.b64encode(encrypted_params).decode("utf-8")
}
return {cipher_key: json.dumps(enc_params)}
def request_multi_datas(self,
compound: dict[object] = None,
method: Optional[str] = None,
# "sequential" or "parallel"
mode: Optional[str] = "sequential",
response_json: bool = True
) -> dict[str, object] | str | list | requests.Response: # 'post' or 'get'
"""
Send multiple requests to the Synology API, either sequentially or in parallel.
Parameters
----------
compound : dict[object], optional
A JSON structure containing multiple requests to be executed.
Example:
compound = [
{
"api": "SYNO.Core.User",
"method": "list",
"version": self.core_list["SYNO.Core.User"]
}
].
method : str, optional
The HTTP method to use ('get' or 'post'). Defaults to 'get' if not specified.
mode : str, optional
The execution mode for the requests, either "sequential" or "parallel".
Defaults to "sequential".
response_json : bool, optional
Whether to return the response as JSON. If False, returns the raw response object.
Returns
-------
dict[str, object] or str or list or requests.Response
The response from the API, either as a JSON-decoded object, string, list, or the raw response.
Raises
------
SynoConnectionError
If a connection error occurs.
HTTPError
If an HTTP error occurs.
"""
api_name = 'hotfix' # fix for docs_parser.py issue
api_path = self.full_api_list['SYNO.Entry.Request']['path']
api_version = self.full_api_list['SYNO.Entry.Request']['maxVersion']
url = f"{self._base_url}{api_path}"
req_param = {
"api": "SYNO.Entry.Request",
"method": "request",
"version": f"{api_version}",
"mode": mode,
"stop_when_error": "true",
"_sid": self._sid,
"compound": json.dumps(compound)
}
if method is None:
method = 'get'
# Request need some headers to work properly
# X-SYNO-TOKEN is the token that we get when we login
# We get it from the self._syno_token variable and by param 'enable_syno_token':'yes' in the login request
if method == 'get':
response = requests.get(url, req_param, verify=self._verify, headers={
"X-SYNO-TOKEN": self._syno_token})
elif method == 'post':
response = requests.post(url, req_param, verify=self._verify, headers={
"X-SYNO-TOKEN": self._syno_token})
if response_json is True:
return response.json()
else:
return response
def request_data(self,
api_name: str,
api_path: str,
req_param: dict[str, object],
method: Optional[str] = None,
response_json: bool = True
) -> dict[str, object] | str | list | requests.Response: # 'post' or 'get'
"""
Send a request to the Synology API and handle errors based on the API name.
Parameters
----------
api_name : str
The name of the Synology API to call.
api_path : str
The path to the API endpoint.
req_param : dict[str, object]
The parameters to include in the request.
method : str, optional
The HTTP method to use ('get' or 'post'). Defaults to 'get' if not specified.
response_json : bool, optional
Whether to return the response as JSON. If False, returns the raw response object.
Returns
-------
dict[str, object] or str or list or requests.Response
The response from the API, either as a JSON-decoded object, string, list, or the raw response.
Raises
------
SynoConnectionError
If a connection error occurs.
HTTPError
If an HTTP error occurs.
DownloadStationError, FileStationError, AudioStationError, ActiveBackupError, ActiveBackupMicrosoftError, VirtualizationError, BackupError, CloudSyncError, CertificateError, DHCPServerError, DirectoryServerError, DockerError, DriveAdminError, LogCenterError, NoteStationError, OAUTHError, PhotosError, SecurityAdvisorError, TaskSchedulerError, EventSchedulerError, UniversalSearchError, USBCopyError, VPNError, CoreError, CoreSysInfoError, UndefinedError
If the API returns an error code specific to the API being called.
"""
# Convert all boolean in string in lowercase because Synology API is waiting for "true" or "false"
for k, v in req_param.items():
if isinstance(v, bool):
req_param[k] = str(v).lower()
if method is None:
method = 'get'
req_param['_sid'] = self._sid
url = ('%s%s' % (self._base_url, api_path)) + '?api=' + api_name
# Do request and check for error:
response: Optional[requests.Response] = None
if USE_EXCEPTIONS:
# Catch and raise our own errors:
try:
if method == 'get':
response = requests.get(url, req_param, verify=self._verify, headers={
"X-SYNO-TOKEN": self._syno_token})
elif method == 'post':
response = requests.post(url, req_param, verify=self._verify, headers={
"X-SYNO-TOKEN": self._syno_token})
except requests.exceptions.ConnectionError as e:
raise SynoConnectionError(error_message=e.args[0])
except requests.exceptions.HTTPError as e:
raise HTTPError(error_message=str(e.args))
else:
# Will raise its own error:
if method == 'get':
response = requests.get(url, req_param, verify=self._verify, headers={
"X-SYNO-TOKEN": self._syno_token})
elif method == 'post':
response = requests.post(url, req_param, verify=self._verify, headers={
"X-SYNO-TOKEN": self._syno_token})
# Check for error response from dsm:
error_code = 0
if USE_EXCEPTIONS:
# Catch a JSON Decode error:
try:
error_code = self._get_error_code(response.json())
except requests.exceptions.JSONDecodeError:
pass
else:
# Will raise its own error:
error_code = self._get_error_code(response.json())
if error_code:
if self._debug is True:
print('Data request failed: ' +
self._get_error_message(error_code, api_name))
if USE_EXCEPTIONS:
# Download station error:
if api_name.find('DownloadStation') > -1:
raise DownloadStationError(error_code=error_code)
# File station error:
elif api_name.find('FileStation') > -1:
raise FileStationError(error_code=error_code)
# Audio station error:
elif api_name.find('AudioStation') > -1:
raise AudioStationError(error_code=error_code)
# ABM (ActiveBackupOffice365) error:
elif api_name.find('ActiveBackupOffice365') > -1:
raise ActiveBackupMicrosoftError(error_code=error_code)
# Active backup error:
elif api_name.find('ActiveBackup') > -1:
raise ActiveBackupError(error_code=error_code)
# Virtualization error:
elif api_name.find('Virtualization') > -1:
raise VirtualizationError(error_code=error_code)
# Syno backup error:
elif api_name.find('SYNO.Backup') > -1:
raise BackupError(error_code=error_code)
# CloudSync error:
elif api_name.find('CloudSync') > -1:
raise CloudSyncError(error_code=error_code)
# Core certificate error:
elif api_name.find('Core.Certificate') > -1:
raise CertificateError(error_code=error_code)
# DHCP Server error:
elif api_name.find('DHCPServer') > -1 or api_name == 'SYNO.Core.TFTP':
raise DHCPServerError(error_code=error_code)
# Active Directory error:
elif api_name.find('ActiveDirectory') > -1 or api_name in ('SYNO.Auth.ForgotPwd', 'SYNO.Entry.Request'):
raise DirectoryServerError(error_code=error_code)
# Docker Error:
elif api_name.find('Docker') > -1:
raise DockerError(error_code=error_code)
# Synology drive admin error:
elif api_name.find('SynologyDrive') > -1 or api_name == 'SYNO.C2FS.Share':
raise DriveAdminError(error_code=error_code)
# Log center error:
elif api_name.find('LogCenter') > -1:
raise LogCenterError(error_code=error_code)
# Note station error:
elif api_name.find('NoteStation') > -1:
raise NoteStationError(error_code=error_code)
# OAUTH error:
elif api_name.find('SYNO.OAUTH') > -1:
raise OAUTHError(error_code=error_code)
# Photo station error:
elif api_name.find('SYNO.Foto') > -1:
raise PhotosError(error_code=error_code)
# Security advisor error:
elif api_name.find('SecurityAdvisor') > -1:
raise SecurityAdvisorError(error_code=error_code)
# Task Scheduler error:
elif api_name.find('SYNO.Core.TaskScheduler') > -1:
raise TaskSchedulerError(error_code=error_code)
# Event Scheduler error:
elif api_name.find('SYNO.Core.EventScheduler') > -1:
raise EventSchedulerError(error_code=error_code)
# Universal search error:
elif api_name.find('SYNO.Finder') > -1:
raise UniversalSearchError(error_code=error_code)
# USB Copy error:
elif api_name.find('SYNO.USBCopy') > -1:
raise USBCopyError(error_code=error_code)
# VPN Server error:
elif api_name.find('VPNServer') > -1:
raise VPNError(error_code=error_code)
# Core:
elif api_name.find('SYNO.Core') > -1:
raise CoreError(error_code=error_code)
# Core Sys Info:
elif api_name.find('SYNO.Storage') > -1:
raise CoreSysInfoError(error_code=error_code)
elif api_name.find('SYNO.ResourceMonitor') > -1:
raise CoreSysInfoError(error_code=error_code)
elif (api_name in ('SYNO.Backup.Service.NetworkBackup', 'SYNO.Finder.FileIndexing.Status',
'SYNO.S2S.Server.Pair')):
raise CoreSysInfoError(error_code=error_code)
# Unhandled API:
else:
raise UndefinedError(
error_code=error_code, api_name=api_name)
if response_json is True:
return response.json()
else:
return response
@staticmethod
def _get_error_code(response: dict[str, object]) -> int:
"""
Extract the error code from an API response.
Parameters
----------
response : dict
The API response.
Returns
-------
int
Error code, or 0 if successful.
"""
if response.get('success'):
code = CODE_SUCCESS
else:
code = response.get('error').get('code')
return code
@staticmethod
def _get_error_message(code: int, api_name: str) -> str:
"""
Get a human-readable error message for a given error code and API.
Parameters
----------
code : int
Error code.
api_name : str
Name of the API.
Returns
-------
str
Error message.
"""
if code in error_codes.keys():
message = error_codes[code]
elif api_name == 'Auth':
message = auth_error_codes.get(code, "<Undefined.Auth.Error>")
elif api_name.find('DownloadStation') > -1:
message = download_station_error_codes.get(
code, "<Undefined.DownloadStation.Error>")
elif api_name.find('Virtualization') > -1:
message = virtualization_error_codes.get(
code, "<Undefined.Virtualization.Error>")
elif api_name.find('FileStation') > -1:
message = file_station_error_codes.get(
code, "<Undefined.FileStation.Error>")
else:
message = "<Undefined.%s.Error>" % api_name
return 'Error {} - {}'.format(code, message)
@property
def sid(self) -> Optional[str]:
"""
Get the current session ID.
Returns
-------
str or None
Session ID if logged in, else None.
"""
return self._sid
@property
def base_url(self) -> str:
"""
Get the base URL for API requests.
Returns
-------
str
Base URL.
"""
return self._base_url
@property
def syno_token(self) -> str:
"""
Get the Synology token for API requests.
Returns
-------
str
Synology token.
"""
return self._syno_token
class AESCipher(object):
"""
Encrypt with OpenSSL-compatible way.
Parameters
----------
password : bytes
The password to derive the key from.
key_length : int, optional
Length of the key (default is 32).
"""
SALT_MAGIC = b'Salted__'
def __init__(self, password, key_length=32):
"""
Initialize the AESCipher object.
Parameters
----------
password : bytes
The password to derive the key from.
key_length : int, optional
Length of the key (default is 32).
"""
self._bs = 16
self._salt = urandom(self._bs - len(self.SALT_MAGIC))
self._key, self._iv = self._derive_key_and_iv(password,
self._salt,
key_length,
self._bs)
def _pad(self, s):
"""
Pad the input string to a multiple of the block size.
Parameters
----------
s : str
String to pad.
Returns
-------
bytes
Padded string as bytes.
"""
bs = self._bs
return (s + (bs - len(s) % bs) * chr(bs - len(s) % bs)).encode('utf-8')
def _derive_key_and_iv(self, password, salt, key_length, iv_length):
"""
Derive the key and IV from the password and salt.
Parameters
----------
password : bytes
Password.
salt : bytes
Salt.
key_length : int
Length of the key.
iv_length : int
Length of the IV.
Returns
-------
tuple
(key, iv).
"""
d = d_i = b''
while len(d) < key_length + iv_length:
md5_str = d_i + password + salt
d_i = hashlib.md5(md5_str).digest()
d += d_i
return d[:key_length], d[key_length:key_length + iv_length]
def encrypt(self, text):
"""
Encrypt the given text using AES CBC mode.
Parameters
----------
text : str
Text to encrypt.
Returns
-------
bytes
Encrypted ciphertext with OpenSSL salt header.
"""
cipher = Cipher(
algorithms.AES(self._key),
modes.CBC(self._iv),
backend=default_backend()
)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(self._pad(text)) + encryptor.finalize()
return self.SALT_MAGIC + self._salt + ciphertext