#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Author: Kai Peng
__version__ = "0.0.4"
from requests import Session
from requests import HTTPError
import json
from urllib.parse import urlparse
from alist.public import AlistPublic
from alist.admin import AlistAdmin
from alist import utils
import alist.setting
[文档]class AlistClient(object):
"""
Python wrapper for the Alist API.
"""
def __init__(
self,
base_url,
password = None,
authorization = None,
ssl_verify = True,
cert = None,
):
self.base_url = base_url.rstrip('/')
self.public = AlistPublic(self)
self.admin = None
self.session = Session()
self.password = ""
self.ssl_verify = ssl_verify
self.cert = cert
self.url = urlparse(self.base_url)
self.authorization = None
self.login(password, authorization)
[文档] def login(self, password = None, authorization = None):
"""
使用password或authorization登录。
:param password: 密码
:param authorization: 授权码
:return: 登录成功返回Ture,登录失败触发异常。
"""
if self.is_login():
return True
self.authorization = authorization
self.password = password
if self.password != None and self.authorization == None:
self.authorization = utils.calc_authorization(password)
if self.authorization:
self.admin = AlistAdmin(self)
return self.admin.login()
return False
[文档] def is_login(self):
"""
是否登录
:return: 如果已登录,返回True;否则返回Flase。
"""
return self.authorization != None
[文档] @staticmethod
def decode_response(response):
"""
解析服务器的响应。
:return: 将JSON解析为字典,如果不是JSON则返回原始字符串。
:raises: 如果响应包含HTTP错误则触发requests.HTTPError。
"""
content_type = response.headers.get("content-type", "")
content = response.content.strip()
if response.encoding:
content = content.decode(response.encoding)
if not content:
return content
if content_type.split(";")[0] != "application/json":
return content
try:
content = json.loads(content)
except ValueError:
raise ValueError(f"Invalid json content: {content}")
if content['code'] != 200:
response.status_code = content['code']
raise HTTPError(content['code'], content['message'], response=response)
if content['data'] is None:
return True
else:
return content['data']
[文档] def get_request_dict(self, method, endpoint, **kwargs):
"""
获取requests请求参数。
:param method: 请求方法,GET、POST或DELETE。
:param endpoint: URL端点。
:param kwargs: 其他可选参数。
:return: requests请求参数。
"""
headers = {
"Method": method,
"Path": self.get_api_url(endpoint),
"Authority": self.url.hostname,
"Scheme": self.url.scheme,
"Accept": "application/json, text/plain, */*",
"Origin": self.base_url,
"Authorization": self.authorization,
# "Content-Type": "application/json;charset=UTF-8", # requests 自动填充
}
request_kwargs = kwargs
if "headers" in request_kwargs:
request_kwargs["headers"].update(headers)
else:
request_kwargs["headers"] = headers
request_kwargs['verify'] = self.ssl_verify
request_kwargs['cert'] = self.cert
return request_kwargs
[文档] def get_api_url(self, endpoint):
"""
返回指定端点的api url,不包含主机和端口。
:param endpoint: 服务端点。
:return: api url
"""
return f'/api{endpoint}'
[文档] def get_endpoint_url(self, endpoint):
"""
返回指定端点的完整URL,包含主机和端口。
:param endpoint: 服务端点。
:return: 完整的URL。
"""
return f'{self.base_url}{self.get_api_url(endpoint)}'
[文档] def get(self, endpoint, **kwargs):
"""
发送HTTP GET请求到端点。
:param endpoint: 发送请求的端点。
:return: 服务器返回的数据。
"""
request_kwargs = self.get_request_dict("GET", endpoint, **kwargs)
response = self.session.get(self.get_endpoint_url(endpoint), **request_kwargs)
return self.decode_response(response)
[文档] def post(self, endpoint, **kwargs):
"""
发送HTTP POST请求到端点。
:param endpoint: 发送请求的端点。
:return: 服务器返回的数据。
"""
request_kwargs = self.get_request_dict("POST", endpoint, **kwargs)
response = self.session.post(self.get_endpoint_url(endpoint), **request_kwargs)
# return response
return self.decode_response(response)
[文档] def delete(self, endpoint, **kwargs):
"""
发送HTTP DELETE请求到端点。
:param endpoint: 发送请求的端点。
:return: 服务器返回的数据。
"""
request_kwargs = self.get_request_dict("DELETE", endpoint, **kwargs)
response = self.session.delete(self.get_endpoint_url(endpoint), **request_kwargs)
# return response
return self.decode_response(response)