mirror of
https://github.com/N4S4/synology-api.git
synced 2025-08-06 11:20:05 +00:00
901 lines
27 KiB
Python
901 lines
27 KiB
Python
import os
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import requests
|
|
import sys
|
|
from urllib import parse
|
|
|
|
from .synology import Synology, api_call
|
|
|
|
|
|
def to_compatible_str(lst: list) -> str:
|
|
"""
|
|
Convert a list to an API compatible string representation.
|
|
"""
|
|
# The API only accepts double quoted strings, but str() returns single quoted strings
|
|
return str(lst).replace('\'', '\"')
|
|
|
|
|
|
class FileStation(Synology):
|
|
app = 'FileStation'
|
|
|
|
def __init__(self):
|
|
|
|
super(FileStation, self).__init__()
|
|
|
|
self._dir_taskid = ''
|
|
self._dir_taskid_list = []
|
|
self._md5_taskid = ''
|
|
self._md5_taskid_list = []
|
|
self._search_taskid = ''
|
|
self._search_taskid_list = []
|
|
self._copy_move_taskid = ''
|
|
self._copy_move_taskid_list = []
|
|
self._delete_taskid = ''
|
|
self._delete_taskid_list = []
|
|
self._extract_taskid = ''
|
|
self._extract_taskid_list = []
|
|
self._compress_taskid = ''
|
|
self._compress_taskid_list = []
|
|
|
|
self.file_station_list = self.app_api_dict
|
|
|
|
@api_call()
|
|
def get_info(self):
|
|
r = self.api_request('Info', 'getInfo')
|
|
return r
|
|
|
|
"""
|
|
method: get_list_share
|
|
kwargs: additional,
|
|
offset,
|
|
limit,
|
|
sort_by,
|
|
sort_direction,
|
|
only_writable
|
|
"""
|
|
|
|
@api_call()
|
|
def get_list_share(self, **kwargs):
|
|
param = kwargs
|
|
|
|
if 'additional' not in param.keys():
|
|
param['additional'] = "real_path,size,owner,time"
|
|
|
|
if type(param['additional']) is list:
|
|
additional = kwargs.get('additional')
|
|
param['additional'] = ','.join(additional)
|
|
|
|
return self.api_request('List', 'list_share', param)
|
|
|
|
"""
|
|
method: get_file_list
|
|
args: folder_path
|
|
kwargs: offset,
|
|
limit,
|
|
sort_by,
|
|
sort_direction,
|
|
pattern,
|
|
filetype,
|
|
goto_path,
|
|
additional
|
|
"""
|
|
|
|
@api_call()
|
|
def get_file_list(self, folder_path, **kwargs):
|
|
|
|
param = kwargs
|
|
param['folder_path'] = folder_path
|
|
|
|
if 'filetype' in param:
|
|
param['filetype'] = str(param['filetype']).lower()
|
|
|
|
if 'additional' not in param:
|
|
param['additional'] = "real_path,size,owner,time"
|
|
|
|
if type(param['additional']) is list:
|
|
param['additional'] = ','.join(param['additional'])
|
|
|
|
return self.api_request('List', 'list', param)
|
|
|
|
@api_call()
|
|
def get_file_info(self, path=None, additional=None):
|
|
param = {}
|
|
|
|
if type(path) is list:
|
|
new_path = []
|
|
[new_path.append('"' + x + '"') for x in path]
|
|
path = new_path
|
|
path = '[' + ','.join(path) + ']'
|
|
param['path'] = path
|
|
elif path is not None:
|
|
param['path'] = path
|
|
|
|
if additional is None:
|
|
additional = ['real_path', 'size', 'owner', 'time']
|
|
|
|
if type(additional) is list:
|
|
additional = ','.join(additional)
|
|
|
|
param['additional'] = additional
|
|
|
|
return self.api_request('List', 'getinfo', param)
|
|
|
|
# TODO all working if specify extension check if correct [pattern, extension]
|
|
# it works if you put extension='...'
|
|
@api_call()
|
|
def _search_start(self, folder_path, **kwargs):
|
|
param = kwargs
|
|
|
|
if 'time' in param.keys():
|
|
t = param['time']
|
|
is_str = isinstance(t, str)
|
|
is_int = isinstance(t, int)
|
|
|
|
if not is_str and is_int:
|
|
raise ValueError(
|
|
"Timestamp must be string or Unix timestamp, type is {t}.".format(
|
|
t=type(param['time'])))
|
|
elif is_int and not is_str:
|
|
val = datetime.fromtimestamp(t).strftime('%Y-%m-%d %H:%M:%S')
|
|
param['time'] = val
|
|
else: # is_str is True, is_int might be True or False
|
|
date = time.strptime(t, "%Y-%m-%d %H:%M:%S")
|
|
timestamp = time.mktime(date)
|
|
param['time'] = timestamp
|
|
|
|
param['folder_path'] = '"{p}"'.format(p=param['folder_path'])
|
|
|
|
if 'filetype' in param.keys():
|
|
param['filetype'] = '"{f}"'.format(f=param['filetype'])
|
|
|
|
return self.api_request('Search', 'start', param)
|
|
|
|
"""
|
|
method: search_start
|
|
args: folder_path
|
|
kwargs: recursive,
|
|
pattern,
|
|
extension,
|
|
filetype,
|
|
size_from,
|
|
site_to,
|
|
mtime_from,
|
|
mtime_to,
|
|
crtime_from,
|
|
crtime_to,
|
|
atime_from,
|
|
atime_to,
|
|
owner,
|
|
group
|
|
"""
|
|
|
|
def search_start(self, folder_path, **kwargs):
|
|
response = self._search_start(folder_path, **kwargs)
|
|
|
|
if response['success']:
|
|
self._search_taskid = response['data']['taskid']
|
|
self._search_taskid_list.append(response['data']['taskid'])
|
|
else:
|
|
raise self.SynologyError(
|
|
"Could not start search task.\nResponse: {r}".format(
|
|
r=response))
|
|
|
|
return response
|
|
|
|
"""
|
|
method: get_search_list
|
|
args: task_id
|
|
kwargs: filetype,
|
|
limit,
|
|
sort_by,
|
|
sort_direction,
|
|
offset,
|
|
additional
|
|
"""
|
|
|
|
@api_call()
|
|
def get_search_list(self, task_id, **kwargs):
|
|
param = kwargs
|
|
param['taskid'] = task_id
|
|
|
|
if 'filetype' in param.keys():
|
|
param['filetype'] = param['filetype']
|
|
|
|
if 'additional' not in param.keys():
|
|
param['additional'] = ['size', 'owner', 'time']
|
|
|
|
if type(param['additional']) is list:
|
|
param['additional'] = str(param['additional'])
|
|
|
|
return self.api_request('Search', 'list', param)
|
|
|
|
@api_call()
|
|
def _stop_search_task(self, taskid):
|
|
param = {}
|
|
|
|
if taskid is None:
|
|
if self._search_taskid is None:
|
|
raise self.SynologyError(
|
|
"Tried to stop search tasks but none exist.")
|
|
else:
|
|
taskid = self._search_taskid
|
|
|
|
param['taskid'] = taskid
|
|
|
|
return self.api_request('Search', 'stop', param)
|
|
|
|
def stop_search_task(self, taskid=None):
|
|
response = self._stop_search_task(taskid)
|
|
|
|
if not response['success']:
|
|
raise self.SynologyError(
|
|
"Could not stop search jobs. Response: {r}".format(r=response))
|
|
|
|
self._search_taskid_list.remove(taskid)
|
|
self._search_taskid = self._search_taskid_list[-1]
|
|
|
|
return response
|
|
|
|
"""
|
|
method: stop_all_search_task
|
|
args: log
|
|
Argument 'log' should be an object with a method 'error', such as the
|
|
logger from the standard logging library.
|
|
Two values are returned: a list of the tasks which were not stopped
|
|
(or an empty list), and the response of the first task that couldn't
|
|
be stopped (or None if all were successfully stopped).
|
|
"""
|
|
|
|
def stop_all_search_task(self, log=None):
|
|
for task in self._search_taskid_list:
|
|
try:
|
|
response = self.stop_search_task(task)
|
|
except self.SynologyError as err:
|
|
if log is not None:
|
|
try:
|
|
log.error(str(err))
|
|
except AttributeError as err:
|
|
pass
|
|
return self._search_taskid_list, response
|
|
return self._search_taskid_list, None
|
|
|
|
"""
|
|
method: get_mount_point_list
|
|
args:
|
|
kwargs: type,
|
|
offset,
|
|
limit,
|
|
sort_by
|
|
sort_direction,
|
|
additional
|
|
"""
|
|
|
|
@api_call()
|
|
def get_mount_point_list(self, **kwargs):
|
|
param = kwargs
|
|
|
|
if 'additional' not in param.keys():
|
|
param['additional'] = ['real_path', 'size', 'owner', 'time']
|
|
|
|
if type(param['additional']) is list:
|
|
param['additional'] = str(param['additional'])
|
|
|
|
return self.api_request('VirtualFolder', 'list', param)
|
|
|
|
"""
|
|
method: get_favorite_list
|
|
args:
|
|
kwargs: offset,
|
|
limit,
|
|
sort_by,
|
|
status_filter,
|
|
additional
|
|
"""
|
|
|
|
@api_call()
|
|
def get_favorite_list(self, **kwargs):
|
|
param = kwargs
|
|
|
|
if 'additional' not in param.keys():
|
|
param['additional'] = ['real_path', 'size', 'owner', 'time']
|
|
|
|
if type(param['additional']) is list:
|
|
param['additional'] = str(param['additional'])
|
|
|
|
return self.api_request('Favorite', 'list', param)
|
|
|
|
@api_call()
|
|
def add_a_favorite(self, path, name=None, index=None):
|
|
param = {'path': path}
|
|
if name is not None:
|
|
param['name'] = name
|
|
if param['index'] is not None:
|
|
param['index'] = index
|
|
|
|
return self.api_request('Favorite', 'add', param)
|
|
|
|
@api_call()
|
|
def delete_a_favorite(self, path):
|
|
return self.api_request('Favorite', 'delete', {'path': path})
|
|
|
|
@api_call()
|
|
def clear_broken_favorite(self):
|
|
return self.api_request(self.app, 'Favorite', 'clear_broken')
|
|
|
|
@api_call()
|
|
def edit_favorite_name(self, path, new_name):
|
|
return self.api_request('Favorite', 'edit',
|
|
{'path': path, 'new_name': new_name})
|
|
|
|
@api_call()
|
|
def replace_all_favorite(self, path, name):
|
|
if type(path) is list:
|
|
path = ",".join(path)
|
|
|
|
if type(name) is list:
|
|
path = ",".join(name)
|
|
|
|
return self.api_request('Favorite', 'edit',
|
|
{'path': path, 'name': name})
|
|
|
|
@api_call()
|
|
def _start_dir_size_calc(self, path):
|
|
if type(path) is list:
|
|
path = str(path)
|
|
|
|
return self.api_request('DirSize', 'start', path)
|
|
|
|
def start_dir_size_calc(self, path):
|
|
response = self._start_dir_size_calc(path)
|
|
self._dir_taskid = response['data']['taskid']
|
|
self._dir_taskid_list.append(self._dir_taskid)
|
|
|
|
return response
|
|
|
|
@api_call()
|
|
def _stop_dir_size_calc(self, taskid):
|
|
|
|
return self.api_request('DirName', 'stop',
|
|
{'taskid': '"{t}"'.format(t=taskid)})
|
|
|
|
def stop_dir_size_calc(self, taskid):
|
|
response = self._stop_dir_size_calc(taskid)
|
|
if not response['success']:
|
|
raise self.SynologyError(
|
|
"Cannot stop task {t}.\nResponse: {r}".format(
|
|
t=taskid, r=response))
|
|
self._dir_taskid_list.remove(taskid)
|
|
if self._dir_taskid is taskid:
|
|
self._dir_taskid = self._dir_taskid_list[-1]
|
|
|
|
return response
|
|
|
|
@api_call()
|
|
def get_dir_status(self, taskid=None):
|
|
if taskid is None:
|
|
if self._dir_taskid is None:
|
|
raise self.SynologyError("No DirSize tasks currently running.")
|
|
else:
|
|
taskid = self._dir_taskid
|
|
|
|
return self.api_request('DirSize', 'status',
|
|
{'taskid': taskid})
|
|
|
|
@api_call()
|
|
def _start_md5_calc(self, file_path):
|
|
return self.api_request('MD5', 'start',
|
|
{'file_path': file_path})
|
|
|
|
def start_md5_calc(self, file_path):
|
|
response = self._start_md5_calc(file_path)
|
|
self._md5_taskid = response['data']['taskid']
|
|
self._md5_taskid_list.append(self._md5_taskid)
|
|
return response
|
|
|
|
@api_call()
|
|
def get_md5_status(self, taskid=None):
|
|
if taskid is None:
|
|
if self._md5_taskid is not None:
|
|
taskid = self._md5_taskid
|
|
else:
|
|
raise self.SynologyError("No MD5 tasks currently running.")
|
|
|
|
return self.api_request('MD5', 'status',
|
|
{'taskid': taskid})
|
|
|
|
@api_call()
|
|
def _stop_md5_calc(self, taskid):
|
|
self.api_request('MD5', 'stop',
|
|
{'taskid': taskid})
|
|
|
|
def stop_md5_calc(self, taskid):
|
|
response = self._stop_md5_calc(taskid)
|
|
if response['success']:
|
|
self._md5_taskid_list.remove(taskid)
|
|
if self._md5_taskid is taskid:
|
|
self._md5_taskid = self._md5_taskid_list[-1]
|
|
|
|
return response
|
|
|
|
"""
|
|
method: check_permissions
|
|
args: path, filename
|
|
kwargs: overwrite, create_only
|
|
"""
|
|
|
|
@api_call()
|
|
def check_permissions(self, path, filename, **kwargs):
|
|
param = kwargs
|
|
param['path'] = path
|
|
param['filename'] = filename
|
|
return self.api_request('CheckPermission', 'write', param)
|
|
|
|
_url_ptrn = "{u}{p}?api={n}&version={m}&method=upload&_sid={s}"
|
|
"""
|
|
method: upload_file
|
|
args: dest_path, file_path, log, create_parets=True, overwrite=False
|
|
FileStation.upload_file() works differently from other methods. Instead of
|
|
sending a single requests.request(), it opens a requests.session() to
|
|
upload the file found at 'file_path' on the client to 'dest_path' on the
|
|
NAS. As such, it does not use the usual @api_call()
|
|
decorator.
|
|
|
|
The 'log' argument is an object with an "error" method, such as the logger
|
|
from Python's standard library "logging". This is used in case of an error
|
|
during the file upload to make reporting on any errors easier. If any
|
|
exceptions are caught, the method returns false. Note that it is on the
|
|
client to check for any errors regarding the file to be uploaded (such as
|
|
anything covered by OSError).
|
|
|
|
Note: I have not seen any other methods using a session yet, but if it's
|
|
common enough it may be worthwhile to create another decorator similar to
|
|
@api_call()
|
|
, such as @Synology.api_session.
|
|
"""
|
|
|
|
def upload_file(self, dest_path, file_path, log,
|
|
create_parents=True, overwrite=False):
|
|
api_name = 'SYNO.FileStation.Upload'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
filename = os.path.basename(file_path)
|
|
|
|
session = requests.session()
|
|
try:
|
|
with open(file_path, 'rb') as payload:
|
|
url = self._url_ptrn.format(u=self.base_url, p=api_path,
|
|
n=api_name, m=info['minVersion'], s=self._sid)
|
|
|
|
args = {
|
|
'path': dest_path,
|
|
'create_parents': create_parents,
|
|
'overwrite': overwrite,
|
|
}
|
|
|
|
file_info = {'file': (filename, payload,
|
|
'application/octet-stream')}
|
|
|
|
r = session.post(url, data=args, files=file_info)
|
|
return r
|
|
|
|
except ConnectionError as err:
|
|
log.error("Could not connect to Synology.\nError: {e}".format(
|
|
e=str(err)))
|
|
|
|
except requests.Timeout as err:
|
|
log.error("Request timed out.\nError: {e}".format(e=str(err)))
|
|
|
|
except requests.exceptions.RequestException as err:
|
|
log.error("Session raised an exception.\nError: {e}".format(
|
|
e=str(err)))
|
|
|
|
return False
|
|
|
|
@api_call()
|
|
def get_shared_link_info(self, link_id):
|
|
return self.api_request('Sharing', 'getinfo',
|
|
{'id': link_id})
|
|
|
|
"""
|
|
method: get_shared_link_list
|
|
kwargs: offset,
|
|
limit,
|
|
sort_by,
|
|
sort_direction,
|
|
force_clean
|
|
"""
|
|
|
|
@api_call()
|
|
def get_shared_link_list(self, **kwargs):
|
|
return self.api_request('Sharing', 'list', kwargs)
|
|
|
|
"""
|
|
method: create_sharing_link
|
|
args: path
|
|
kwargs: password,
|
|
date_expired,
|
|
date_available
|
|
"""
|
|
|
|
@api_call()
|
|
def create_sharing_link(self, path, **kwargs):
|
|
param = kwargs
|
|
param['path'] = path
|
|
return self.api_request('Sharing', 'create', param)
|
|
|
|
@api_call()
|
|
def delete_shared_link(self, link_id):
|
|
return self.api_request('Sharing', 'delete', {'id': link_id})
|
|
|
|
@api_call()
|
|
def clear_invalid_shared_link(self):
|
|
return self.api_request('Sharing', 'clear_invalid')
|
|
|
|
"""
|
|
method: edit_shared_link
|
|
args: link_id
|
|
kwargs: password,
|
|
date_expired,
|
|
date_available
|
|
"""
|
|
|
|
@api_call()
|
|
def edit_shared_link(self, link_id, **kwargs):
|
|
param = kwargs
|
|
param['id'] = link_id
|
|
return self.api_request('Sharing', 'edit', param)
|
|
|
|
"""
|
|
method: create_folder
|
|
args: folder_path, name
|
|
kwargs: force_parent, additional
|
|
"""
|
|
|
|
@api_call()
|
|
def create_folder(self, folder_path, name, **kwargs):
|
|
param = kwargs
|
|
|
|
if type(folder_path) is list:
|
|
new_fp = []
|
|
for path in folder_path:
|
|
path = '"{p}"'.format(p=path)
|
|
new_fp.append(path)
|
|
folder_path = str(new_fp)
|
|
param['folder_path'] = folder_path
|
|
|
|
if type(name) is list:
|
|
new_name = []
|
|
for nm in name:
|
|
nm = '"{n}"'.format(n=nm)
|
|
new_name.append(nm)
|
|
name = str(new_name)
|
|
param['name'] = name
|
|
|
|
if 'additional' not in param.keys():
|
|
param['additional'] = ['real_path', 'size', 'owner', 'time']
|
|
|
|
if type(param['additional']) is list:
|
|
param['additional'] = ','.join(param['additional'])
|
|
|
|
return self.api_request('CreateFolder', 'create', param)
|
|
|
|
"""
|
|
method: rename_folder
|
|
args: path, name
|
|
kwargs: additional, search_taskid
|
|
"""
|
|
|
|
@api_call()
|
|
def rename_folder(self, path, name, **kwargs):
|
|
param = kwargs
|
|
|
|
if type(path) is list:
|
|
new_path = []
|
|
for np in path:
|
|
np = '"{n}"'.format(n=np)
|
|
new_path.append(np)
|
|
path = str(new_path)
|
|
param['path'] = path
|
|
|
|
if type(name) is list:
|
|
new_path = []
|
|
for np in path:
|
|
np = '"{n}"'.format(n=np)
|
|
new_path.append(np)
|
|
name = str(new_path)
|
|
param['name'] = name
|
|
|
|
if 'additional' not in param.keys():
|
|
param['additional'] = ['real_path', 'size', 'owner', 'time']
|
|
|
|
if type(param['additional']) is list:
|
|
param['additional'] = ','.join(param['additional'])
|
|
|
|
return self.api_request('Rename', 'rename', param)
|
|
|
|
"""
|
|
method: start_copy_move
|
|
args: path, dest_folder
|
|
kwargs: overwite,
|
|
remove_src,
|
|
accurate_progress,
|
|
search_taskid
|
|
"""
|
|
|
|
@api_call()
|
|
def _start_copy_move(self, path, dest_folder, **kwargs):
|
|
param = kwargs
|
|
|
|
if type(path) is list:
|
|
path = str(path)
|
|
param['path'] = path
|
|
|
|
if type(dest_folder) is list:
|
|
dest_folder = str(dest_folder)
|
|
param['name'] = dest_folder
|
|
|
|
return self.api_request('CopyMove', 'start', param)
|
|
|
|
def start_copy_move(self, path, dest_folder, **kwargs):
|
|
response = self._start_copy_move(path, dest_folder, **kwargs)
|
|
if response['success']:
|
|
self._copy_move_taskid = response['data']['taskid']
|
|
self._dir_taskid_list.append(self._copy_move_taskid)
|
|
return response
|
|
|
|
@api_call()
|
|
def get_copy_move_status(self, taskid):
|
|
return self.api_request('CopyMove', 'status', {'taskid': taskid})
|
|
|
|
@api_call()
|
|
def _stop_copy_move_task(self, taskid):
|
|
return self.api_request('CopyMove', 'stop', taskid)
|
|
|
|
def stop_copy_move_taks(self, taskid):
|
|
response = self._stop_copy_move_task(taskid)
|
|
|
|
if response['success']:
|
|
self._copy_move_taskid_list.remove(taskid)
|
|
if self._copy_move_taskid is taskid:
|
|
self._copy_move_taskid = self._copy_move_taskid_list[-1]
|
|
|
|
return response
|
|
|
|
"""
|
|
method: start_delete_task
|
|
args: path
|
|
kwargs: accurate_progress,
|
|
recursive,
|
|
search_taskid
|
|
"""
|
|
|
|
@api_call()
|
|
def _start_delete_task(self, path, **kwargs):
|
|
param = kwargs
|
|
|
|
if type(path) is list:
|
|
path = str(path)
|
|
param['path'] = path
|
|
return self.api_request('Delete', 'start', param)
|
|
|
|
def start_delete_task(self, path, **kwargs):
|
|
response = self._start_delete_task(path, **kwargs)
|
|
if response['success']:
|
|
self._delete_taskid = response['data']['taskid']
|
|
self._delete_taskid_list.append(self._delete_taskid)
|
|
return response
|
|
|
|
@api_call()
|
|
def get_delete_status(self, taskid):
|
|
return self.api_request('Delete', 'status', {'taskid': taskid})
|
|
|
|
@api_call()
|
|
def _stop_delete_task(self, taskid):
|
|
return self.api_request('Delete', 'stop', {'taskid': taskid})
|
|
|
|
def stop_delete_task(self, taskid):
|
|
response = self._stop_delete_task(taskid)
|
|
if response['success']:
|
|
self._delete_taskid_list.remove(taskid)
|
|
if self._delete_taskid is taskid:
|
|
self._delete_taskid = self._delete_taskid_list[-1]
|
|
return response
|
|
|
|
"""
|
|
method: delete_blocking_function
|
|
args: path
|
|
kwargs: recursive,
|
|
search_taskid
|
|
|
|
delete_blocking_function may appear to hang for a while. This is normal.
|
|
TODO: possibly add async option?
|
|
"""
|
|
|
|
@api_call()
|
|
def delete_blocking_function(self, path, **kwargs):
|
|
param = kwargs
|
|
|
|
if type(path) is list:
|
|
path = str(path)
|
|
param['path'] = path
|
|
|
|
return self.api_request('Delete', 'delete', param)
|
|
|
|
"""
|
|
method: start_extract_task
|
|
args: file_path, dest_folder
|
|
kwargs: overwrite,
|
|
keep_dir,
|
|
create_subfolder,
|
|
codepage,
|
|
password,
|
|
item_id
|
|
"""
|
|
|
|
@api_call()
|
|
def start_extract_task(self, file_path, dest_folder, **kwargs):
|
|
|
|
param = kwargs
|
|
param['file_path'] = file_path
|
|
param['dest_folder_path'] = dest_folder
|
|
|
|
response = self.api_request('Extract', 'start', param)
|
|
|
|
self._extract_taskid = response['data']['taskid']
|
|
self._extract_taskid_list.append(self._extract_taskid)
|
|
|
|
return response
|
|
|
|
def get_extract_status(self, taskid):
|
|
return self.api_request('Extract', 'status', {'taskid': taskid})
|
|
|
|
@api_call()
|
|
def _stop_extract_task(self, taskid):
|
|
return self.api_request('Extract', 'stop', {'taskid': taskid})
|
|
|
|
def stop_extract_task(self, taskid):
|
|
response = self._stop_extract_task(taskid)
|
|
if response['success']:
|
|
self._extract_taskid_list.remove(taskid)
|
|
if self._extract_taskid is taskid:
|
|
self._extract_taskid = self._extract_taskid_list[-1]
|
|
return response
|
|
|
|
"""
|
|
method: get_archive_file_list
|
|
args: file_path
|
|
kwargs: offset,
|
|
limit,
|
|
sort_by,
|
|
sort_direction,
|
|
codepage,
|
|
password,
|
|
item_id
|
|
"""
|
|
|
|
@api_call()
|
|
def get_archive_file_list(self, file_path, **kwargs):
|
|
param = kwargs
|
|
param['file_path'] = file_path
|
|
return self.api_request('Extract', 'list', param)
|
|
|
|
def start_file_compression(self, path=None, dest_file_path=None, level=None, mode=None,
|
|
compress_format=None, password=None):
|
|
api_name = 'SYNO.FileStation.Compress'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
param = {'version': info['maxVersion'], 'method': 'start'}
|
|
|
|
if type(path) is list:
|
|
param['path'] = to_compatible_str(path)
|
|
elif path is not None:
|
|
param['path'] = path
|
|
else:
|
|
return 'Enter a valid path'
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'compress_format', '_password', '_api_path',
|
|
'param', 'path', 'new_path']:
|
|
if val is not None:
|
|
param[str(key)] = val
|
|
|
|
if dest_file_path is None:
|
|
return 'Enter a valid dest_file_path'
|
|
|
|
if compress_format is not None:
|
|
param['format'] = compress_format
|
|
|
|
if password is not None:
|
|
param['_password'] = password
|
|
|
|
self._compress_taskid = self.api_request(api_name, api_path, param)['data']['taskid']
|
|
|
|
return 'You can now check the status of request with get_compress_status() , ' \
|
|
'your id is: ' + self._compress_taskid
|
|
|
|
def get_compress_status(self, taskid=None):
|
|
api_name = 'SYNO.FileStation.Compress'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
param = {'version': info['maxVersion'], 'method': 'status'}
|
|
|
|
if taskid is None:
|
|
return 'Enter a valid taskid'
|
|
else:
|
|
param['taskid'] = taskid
|
|
|
|
return self.api_request(api_name, api_path, param)
|
|
|
|
def stop_compress_task(self, taskid=None):
|
|
api_name = 'SYNO.FileStation.Compress'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
param = {'version': info['maxVersion'], 'method': 'stop'}
|
|
|
|
if taskid is None:
|
|
return 'Enter a valid taskid'
|
|
else:
|
|
param['taskid'] = taskid
|
|
|
|
return self.api_request(api_name, api_path, param)
|
|
|
|
def get_list_of_all_background_task(self, offset=None, limit=None, sort_by=None,
|
|
sort_direction=None, api_filter=None):
|
|
api_name = 'SYNO.FileStation.BackgroundTask'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
param = {'version': info['maxVersion'], 'method': 'list'}
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'param']:
|
|
if val is not None:
|
|
param[str(key)] = val
|
|
|
|
if type(api_filter) is list:
|
|
param['api_filter'] = to_compatible_str(api_filter)
|
|
|
|
return self.api_request(api_name, api_path, param)
|
|
|
|
def get_file(self, path=None, mode=None):
|
|
api_name = 'SYNO.FileStation.Download'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
|
|
if path is None:
|
|
return 'Enter a valid path'
|
|
|
|
if type(path) is list:
|
|
path = to_compatible_str(path)
|
|
# Giving a list of files / folders to the API returns a .zip file as a response
|
|
# Change the filename, because right now it is a long string of file paths
|
|
filepath = 'content.zip'
|
|
else:
|
|
filepath = os.path.basename(path)
|
|
|
|
session = requests.session()
|
|
|
|
url = ('%s%s' % (self.url, api_path)) + '?api=%s&version=%s&method=download&path=%s&mode=%s&_sid=%s' % (
|
|
api_name, info['maxVersion'], parse.quote_plus(path), mode, self.sid)
|
|
|
|
if mode is None:
|
|
return 'Enter a valid mode (open / download)'
|
|
|
|
if mode == r'open':
|
|
with session.get(url, stream=True) as r:
|
|
r.raise_for_status()
|
|
for chunk in r.iter_content(chunk_size=8192):
|
|
if chunk: # filter out keep-alive new chunks
|
|
sys.stdout.buffer.write(chunk)
|
|
|
|
if mode == r'download':
|
|
with session.get(url, stream=True) as r:
|
|
r.raise_for_status()
|
|
with open(filepath, 'wb') as f:
|
|
for chunk in r.iter_content(chunk_size=8192):
|
|
if chunk: # filter out keep-alive new chunks
|
|
f.write(chunk)
|
|
|
|
# TODO SYNO.FileStation.Thumb to be done
|