Files
synology-api/synology_api/filestation.py

901 lines
27 KiB
Python

import os
import time
from datetime import datetime
import requests
import sys
from urllib import parse
from .synology import Synology, api_call
def to_compatible_str(lst: list) -> str:
"""
Convert a list to an API compatible string representation.
"""
# The API only accepts double quoted strings, but str() returns single quoted strings
return str(lst).replace('\'', '\"')
class FileStation(Synology):
app = 'FileStation'
def __init__(self):
super(FileStation, self).__init__()
self._dir_taskid = ''
self._dir_taskid_list = []
self._md5_taskid = ''
self._md5_taskid_list = []
self._search_taskid = ''
self._search_taskid_list = []
self._copy_move_taskid = ''
self._copy_move_taskid_list = []
self._delete_taskid = ''
self._delete_taskid_list = []
self._extract_taskid = ''
self._extract_taskid_list = []
self._compress_taskid = ''
self._compress_taskid_list = []
self.file_station_list = self.app_api_dict
@api_call()
def get_info(self):
r = self.api_request('Info', 'getInfo')
return r
"""
method: get_list_share
kwargs: additional,
offset,
limit,
sort_by,
sort_direction,
only_writable
"""
@api_call()
def get_list_share(self, **kwargs):
param = kwargs
if 'additional' not in param.keys():
param['additional'] = "real_path,size,owner,time"
if type(param['additional']) is list:
additional = kwargs.get('additional')
param['additional'] = ','.join(additional)
return self.api_request('List', 'list_share', param)
"""
method: get_file_list
args: folder_path
kwargs: offset,
limit,
sort_by,
sort_direction,
pattern,
filetype,
goto_path,
additional
"""
@api_call()
def get_file_list(self, folder_path, **kwargs):
param = kwargs
param['folder_path'] = folder_path
if 'filetype' in param:
param['filetype'] = str(param['filetype']).lower()
if 'additional' not in param:
param['additional'] = "real_path,size,owner,time"
if type(param['additional']) is list:
param['additional'] = ','.join(param['additional'])
return self.api_request('List', 'list', param)
@api_call()
def get_file_info(self, path=None, additional=None):
param = {}
if type(path) is list:
new_path = []
[new_path.append('"' + x + '"') for x in path]
path = new_path
path = '[' + ','.join(path) + ']'
param['path'] = path
elif path is not None:
param['path'] = path
if additional is None:
additional = ['real_path', 'size', 'owner', 'time']
if type(additional) is list:
additional = ','.join(additional)
param['additional'] = additional
return self.api_request('List', 'getinfo', param)
# TODO all working if specify extension check if correct [pattern, extension]
# it works if you put extension='...'
@api_call()
def _search_start(self, folder_path, **kwargs):
param = kwargs
if 'time' in param.keys():
t = param['time']
is_str = isinstance(t, str)
is_int = isinstance(t, int)
if not is_str and is_int:
raise ValueError(
"Timestamp must be string or Unix timestamp, type is {t}.".format(
t=type(param['time'])))
elif is_int and not is_str:
val = datetime.fromtimestamp(t).strftime('%Y-%m-%d %H:%M:%S')
param['time'] = val
else: # is_str is True, is_int might be True or False
date = time.strptime(t, "%Y-%m-%d %H:%M:%S")
timestamp = time.mktime(date)
param['time'] = timestamp
param['folder_path'] = '"{p}"'.format(p=param['folder_path'])
if 'filetype' in param.keys():
param['filetype'] = '"{f}"'.format(f=param['filetype'])
return self.api_request('Search', 'start', param)
"""
method: search_start
args: folder_path
kwargs: recursive,
pattern,
extension,
filetype,
size_from,
site_to,
mtime_from,
mtime_to,
crtime_from,
crtime_to,
atime_from,
atime_to,
owner,
group
"""
def search_start(self, folder_path, **kwargs):
response = self._search_start(folder_path, **kwargs)
if response['success']:
self._search_taskid = response['data']['taskid']
self._search_taskid_list.append(response['data']['taskid'])
else:
raise self.SynologyError(
"Could not start search task.\nResponse: {r}".format(
r=response))
return response
"""
method: get_search_list
args: task_id
kwargs: filetype,
limit,
sort_by,
sort_direction,
offset,
additional
"""
@api_call()
def get_search_list(self, task_id, **kwargs):
param = kwargs
param['taskid'] = task_id
if 'filetype' in param.keys():
param['filetype'] = param['filetype']
if 'additional' not in param.keys():
param['additional'] = ['size', 'owner', 'time']
if type(param['additional']) is list:
param['additional'] = str(param['additional'])
return self.api_request('Search', 'list', param)
@api_call()
def _stop_search_task(self, taskid):
param = {}
if taskid is None:
if self._search_taskid is None:
raise self.SynologyError(
"Tried to stop search tasks but none exist.")
else:
taskid = self._search_taskid
param['taskid'] = taskid
return self.api_request('Search', 'stop', param)
def stop_search_task(self, taskid=None):
response = self._stop_search_task(taskid)
if not response['success']:
raise self.SynologyError(
"Could not stop search jobs. Response: {r}".format(r=response))
self._search_taskid_list.remove(taskid)
self._search_taskid = self._search_taskid_list[-1]
return response
"""
method: stop_all_search_task
args: log
Argument 'log' should be an object with a method 'error', such as the
logger from the standard logging library.
Two values are returned: a list of the tasks which were not stopped
(or an empty list), and the response of the first task that couldn't
be stopped (or None if all were successfully stopped).
"""
def stop_all_search_task(self, log=None):
for task in self._search_taskid_list:
try:
response = self.stop_search_task(task)
except self.SynologyError as err:
if log is not None:
try:
log.error(str(err))
except AttributeError as err:
pass
return self._search_taskid_list, response
return self._search_taskid_list, None
"""
method: get_mount_point_list
args:
kwargs: type,
offset,
limit,
sort_by
sort_direction,
additional
"""
@api_call()
def get_mount_point_list(self, **kwargs):
param = kwargs
if 'additional' not in param.keys():
param['additional'] = ['real_path', 'size', 'owner', 'time']
if type(param['additional']) is list:
param['additional'] = str(param['additional'])
return self.api_request('VirtualFolder', 'list', param)
"""
method: get_favorite_list
args:
kwargs: offset,
limit,
sort_by,
status_filter,
additional
"""
@api_call()
def get_favorite_list(self, **kwargs):
param = kwargs
if 'additional' not in param.keys():
param['additional'] = ['real_path', 'size', 'owner', 'time']
if type(param['additional']) is list:
param['additional'] = str(param['additional'])
return self.api_request('Favorite', 'list', param)
@api_call()
def add_a_favorite(self, path, name=None, index=None):
param = {'path': path}
if name is not None:
param['name'] = name
if param['index'] is not None:
param['index'] = index
return self.api_request('Favorite', 'add', param)
@api_call()
def delete_a_favorite(self, path):
return self.api_request('Favorite', 'delete', {'path': path})
@api_call()
def clear_broken_favorite(self):
return self.api_request(self.app, 'Favorite', 'clear_broken')
@api_call()
def edit_favorite_name(self, path, new_name):
return self.api_request('Favorite', 'edit',
{'path': path, 'new_name': new_name})
@api_call()
def replace_all_favorite(self, path, name):
if type(path) is list:
path = ",".join(path)
if type(name) is list:
path = ",".join(name)
return self.api_request('Favorite', 'edit',
{'path': path, 'name': name})
@api_call()
def _start_dir_size_calc(self, path):
if type(path) is list:
path = str(path)
return self.api_request('DirSize', 'start', path)
def start_dir_size_calc(self, path):
response = self._start_dir_size_calc(path)
self._dir_taskid = response['data']['taskid']
self._dir_taskid_list.append(self._dir_taskid)
return response
@api_call()
def _stop_dir_size_calc(self, taskid):
return self.api_request('DirName', 'stop',
{'taskid': '"{t}"'.format(t=taskid)})
def stop_dir_size_calc(self, taskid):
response = self._stop_dir_size_calc(taskid)
if not response['success']:
raise self.SynologyError(
"Cannot stop task {t}.\nResponse: {r}".format(
t=taskid, r=response))
self._dir_taskid_list.remove(taskid)
if self._dir_taskid is taskid:
self._dir_taskid = self._dir_taskid_list[-1]
return response
@api_call()
def get_dir_status(self, taskid=None):
if taskid is None:
if self._dir_taskid is None:
raise self.SynologyError("No DirSize tasks currently running.")
else:
taskid = self._dir_taskid
return self.api_request('DirSize', 'status',
{'taskid': taskid})
@api_call()
def _start_md5_calc(self, file_path):
return self.api_request('MD5', 'start',
{'file_path': file_path})
def start_md5_calc(self, file_path):
response = self._start_md5_calc(file_path)
self._md5_taskid = response['data']['taskid']
self._md5_taskid_list.append(self._md5_taskid)
return response
@api_call()
def get_md5_status(self, taskid=None):
if taskid is None:
if self._md5_taskid is not None:
taskid = self._md5_taskid
else:
raise self.SynologyError("No MD5 tasks currently running.")
return self.api_request('MD5', 'status',
{'taskid': taskid})
@api_call()
def _stop_md5_calc(self, taskid):
self.api_request('MD5', 'stop',
{'taskid': taskid})
def stop_md5_calc(self, taskid):
response = self._stop_md5_calc(taskid)
if response['success']:
self._md5_taskid_list.remove(taskid)
if self._md5_taskid is taskid:
self._md5_taskid = self._md5_taskid_list[-1]
return response
"""
method: check_permissions
args: path, filename
kwargs: overwrite, create_only
"""
@api_call()
def check_permissions(self, path, filename, **kwargs):
param = kwargs
param['path'] = path
param['filename'] = filename
return self.api_request('CheckPermission', 'write', param)
_url_ptrn = "{u}{p}?api={n}&version={m}&method=upload&_sid={s}"
"""
method: upload_file
args: dest_path, file_path, log, create_parets=True, overwrite=False
FileStation.upload_file() works differently from other methods. Instead of
sending a single requests.request(), it opens a requests.session() to
upload the file found at 'file_path' on the client to 'dest_path' on the
NAS. As such, it does not use the usual @api_call()
decorator.
The 'log' argument is an object with an "error" method, such as the logger
from Python's standard library "logging". This is used in case of an error
during the file upload to make reporting on any errors easier. If any
exceptions are caught, the method returns false. Note that it is on the
client to check for any errors regarding the file to be uploaded (such as
anything covered by OSError).
Note: I have not seen any other methods using a session yet, but if it's
common enough it may be worthwhile to create another decorator similar to
@api_call()
, such as @Synology.api_session.
"""
def upload_file(self, dest_path, file_path, log,
create_parents=True, overwrite=False):
api_name = 'SYNO.FileStation.Upload'
info = self.file_station_list[api_name]
api_path = info['path']
filename = os.path.basename(file_path)
session = requests.session()
try:
with open(file_path, 'rb') as payload:
url = self._url_ptrn.format(u=self.base_url, p=api_path,
n=api_name, m=info['minVersion'], s=self._sid)
args = {
'path': dest_path,
'create_parents': create_parents,
'overwrite': overwrite,
}
file_info = {'file': (filename, payload,
'application/octet-stream')}
r = session.post(url, data=args, files=file_info)
return r
except ConnectionError as err:
log.error("Could not connect to Synology.\nError: {e}".format(
e=str(err)))
except requests.Timeout as err:
log.error("Request timed out.\nError: {e}".format(e=str(err)))
except requests.exceptions.RequestException as err:
log.error("Session raised an exception.\nError: {e}".format(
e=str(err)))
return False
@api_call()
def get_shared_link_info(self, link_id):
return self.api_request('Sharing', 'getinfo',
{'id': link_id})
"""
method: get_shared_link_list
kwargs: offset,
limit,
sort_by,
sort_direction,
force_clean
"""
@api_call()
def get_shared_link_list(self, **kwargs):
return self.api_request('Sharing', 'list', kwargs)
"""
method: create_sharing_link
args: path
kwargs: password,
date_expired,
date_available
"""
@api_call()
def create_sharing_link(self, path, **kwargs):
param = kwargs
param['path'] = path
return self.api_request('Sharing', 'create', param)
@api_call()
def delete_shared_link(self, link_id):
return self.api_request('Sharing', 'delete', {'id': link_id})
@api_call()
def clear_invalid_shared_link(self):
return self.api_request('Sharing', 'clear_invalid')
"""
method: edit_shared_link
args: link_id
kwargs: password,
date_expired,
date_available
"""
@api_call()
def edit_shared_link(self, link_id, **kwargs):
param = kwargs
param['id'] = link_id
return self.api_request('Sharing', 'edit', param)
"""
method: create_folder
args: folder_path, name
kwargs: force_parent, additional
"""
@api_call()
def create_folder(self, folder_path, name, **kwargs):
param = kwargs
if type(folder_path) is list:
new_fp = []
for path in folder_path:
path = '"{p}"'.format(p=path)
new_fp.append(path)
folder_path = str(new_fp)
param['folder_path'] = folder_path
if type(name) is list:
new_name = []
for nm in name:
nm = '"{n}"'.format(n=nm)
new_name.append(nm)
name = str(new_name)
param['name'] = name
if 'additional' not in param.keys():
param['additional'] = ['real_path', 'size', 'owner', 'time']
if type(param['additional']) is list:
param['additional'] = ','.join(param['additional'])
return self.api_request('CreateFolder', 'create', param)
"""
method: rename_folder
args: path, name
kwargs: additional, search_taskid
"""
@api_call()
def rename_folder(self, path, name, **kwargs):
param = kwargs
if type(path) is list:
new_path = []
for np in path:
np = '"{n}"'.format(n=np)
new_path.append(np)
path = str(new_path)
param['path'] = path
if type(name) is list:
new_path = []
for np in path:
np = '"{n}"'.format(n=np)
new_path.append(np)
name = str(new_path)
param['name'] = name
if 'additional' not in param.keys():
param['additional'] = ['real_path', 'size', 'owner', 'time']
if type(param['additional']) is list:
param['additional'] = ','.join(param['additional'])
return self.api_request('Rename', 'rename', param)
"""
method: start_copy_move
args: path, dest_folder
kwargs: overwite,
remove_src,
accurate_progress,
search_taskid
"""
@api_call()
def _start_copy_move(self, path, dest_folder, **kwargs):
param = kwargs
if type(path) is list:
path = str(path)
param['path'] = path
if type(dest_folder) is list:
dest_folder = str(dest_folder)
param['name'] = dest_folder
return self.api_request('CopyMove', 'start', param)
def start_copy_move(self, path, dest_folder, **kwargs):
response = self._start_copy_move(path, dest_folder, **kwargs)
if response['success']:
self._copy_move_taskid = response['data']['taskid']
self._dir_taskid_list.append(self._copy_move_taskid)
return response
@api_call()
def get_copy_move_status(self, taskid):
return self.api_request('CopyMove', 'status', {'taskid': taskid})
@api_call()
def _stop_copy_move_task(self, taskid):
return self.api_request('CopyMove', 'stop', taskid)
def stop_copy_move_taks(self, taskid):
response = self._stop_copy_move_task(taskid)
if response['success']:
self._copy_move_taskid_list.remove(taskid)
if self._copy_move_taskid is taskid:
self._copy_move_taskid = self._copy_move_taskid_list[-1]
return response
"""
method: start_delete_task
args: path
kwargs: accurate_progress,
recursive,
search_taskid
"""
@api_call()
def _start_delete_task(self, path, **kwargs):
param = kwargs
if type(path) is list:
path = str(path)
param['path'] = path
return self.api_request('Delete', 'start', param)
def start_delete_task(self, path, **kwargs):
response = self._start_delete_task(path, **kwargs)
if response['success']:
self._delete_taskid = response['data']['taskid']
self._delete_taskid_list.append(self._delete_taskid)
return response
@api_call()
def get_delete_status(self, taskid):
return self.api_request('Delete', 'status', {'taskid': taskid})
@api_call()
def _stop_delete_task(self, taskid):
return self.api_request('Delete', 'stop', {'taskid': taskid})
def stop_delete_task(self, taskid):
response = self._stop_delete_task(taskid)
if response['success']:
self._delete_taskid_list.remove(taskid)
if self._delete_taskid is taskid:
self._delete_taskid = self._delete_taskid_list[-1]
return response
"""
method: delete_blocking_function
args: path
kwargs: recursive,
search_taskid
delete_blocking_function may appear to hang for a while. This is normal.
TODO: possibly add async option?
"""
@api_call()
def delete_blocking_function(self, path, **kwargs):
param = kwargs
if type(path) is list:
path = str(path)
param['path'] = path
return self.api_request('Delete', 'delete', param)
"""
method: start_extract_task
args: file_path, dest_folder
kwargs: overwrite,
keep_dir,
create_subfolder,
codepage,
password,
item_id
"""
@api_call()
def start_extract_task(self, file_path, dest_folder, **kwargs):
param = kwargs
param['file_path'] = file_path
param['dest_folder_path'] = dest_folder
response = self.api_request('Extract', 'start', param)
self._extract_taskid = response['data']['taskid']
self._extract_taskid_list.append(self._extract_taskid)
return response
def get_extract_status(self, taskid):
return self.api_request('Extract', 'status', {'taskid': taskid})
@api_call()
def _stop_extract_task(self, taskid):
return self.api_request('Extract', 'stop', {'taskid': taskid})
def stop_extract_task(self, taskid):
response = self._stop_extract_task(taskid)
if response['success']:
self._extract_taskid_list.remove(taskid)
if self._extract_taskid is taskid:
self._extract_taskid = self._extract_taskid_list[-1]
return response
"""
method: get_archive_file_list
args: file_path
kwargs: offset,
limit,
sort_by,
sort_direction,
codepage,
password,
item_id
"""
@api_call()
def get_archive_file_list(self, file_path, **kwargs):
param = kwargs
param['file_path'] = file_path
return self.api_request('Extract', 'list', param)
def start_file_compression(self, path=None, dest_file_path=None, level=None, mode=None,
compress_format=None, password=None):
api_name = 'SYNO.FileStation.Compress'
info = self.file_station_list[api_name]
api_path = info['path']
param = {'version': info['maxVersion'], 'method': 'start'}
if type(path) is list:
param['path'] = to_compatible_str(path)
elif path is not None:
param['path'] = path
else:
return 'Enter a valid path'
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'compress_format', '_password', '_api_path',
'param', 'path', 'new_path']:
if val is not None:
param[str(key)] = val
if dest_file_path is None:
return 'Enter a valid dest_file_path'
if compress_format is not None:
param['format'] = compress_format
if password is not None:
param['_password'] = password
self._compress_taskid = self.api_request(api_name, api_path, param)['data']['taskid']
return 'You can now check the status of request with get_compress_status() , ' \
'your id is: ' + self._compress_taskid
def get_compress_status(self, taskid=None):
api_name = 'SYNO.FileStation.Compress'
info = self.file_station_list[api_name]
api_path = info['path']
param = {'version': info['maxVersion'], 'method': 'status'}
if taskid is None:
return 'Enter a valid taskid'
else:
param['taskid'] = taskid
return self.api_request(api_name, api_path, param)
def stop_compress_task(self, taskid=None):
api_name = 'SYNO.FileStation.Compress'
info = self.file_station_list[api_name]
api_path = info['path']
param = {'version': info['maxVersion'], 'method': 'stop'}
if taskid is None:
return 'Enter a valid taskid'
else:
param['taskid'] = taskid
return self.api_request(api_name, api_path, param)
def get_list_of_all_background_task(self, offset=None, limit=None, sort_by=None,
sort_direction=None, api_filter=None):
api_name = 'SYNO.FileStation.BackgroundTask'
info = self.file_station_list[api_name]
api_path = info['path']
param = {'version': info['maxVersion'], 'method': 'list'}
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'param']:
if val is not None:
param[str(key)] = val
if type(api_filter) is list:
param['api_filter'] = to_compatible_str(api_filter)
return self.api_request(api_name, api_path, param)
def get_file(self, path=None, mode=None):
api_name = 'SYNO.FileStation.Download'
info = self.file_station_list[api_name]
api_path = info['path']
if path is None:
return 'Enter a valid path'
if type(path) is list:
path = to_compatible_str(path)
# Giving a list of files / folders to the API returns a .zip file as a response
# Change the filename, because right now it is a long string of file paths
filepath = 'content.zip'
else:
filepath = os.path.basename(path)
session = requests.session()
url = ('%s%s' % (self.url, api_path)) + '?api=%s&version=%s&method=download&path=%s&mode=%s&_sid=%s' % (
api_name, info['maxVersion'], parse.quote_plus(path), mode, self.sid)
if mode is None:
return 'Enter a valid mode (open / download)'
if mode == r'open':
with session.get(url, stream=True) as r:
r.raise_for_status()
for chunk in r.iter_content(chunk_size=8192):
if chunk: # filter out keep-alive new chunks
sys.stdout.buffer.write(chunk)
if mode == r'download':
with session.get(url, stream=True) as r:
r.raise_for_status()
with open(filepath, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
# TODO SYNO.FileStation.Thumb to be done