# -*- coding: utf-8 -*-
import re
import requests
from time import sleep
from fake_useragent import UserAgent
from .cache import cache
from .singleton import Singleton
@cache()
def get_user_agent():
""" Return user-agent
Returns
-------
str
user-agent
"""
ua = UserAgent(os=['windows', 'macos', 'linux'], platforms='pc') # Exclude mobile devices and tablets
agent = ua.random # Random user-agent
return str(agent)
def query_to_regex(query):
""" query to regular expression
Parameters
----------
query: str or list of str
query
Returns
-------
Pattern object
regular expression
"""
if isinstance(query, str):
regex = re.compile(query, re.IGNORECASE)
elif isinstance(query, list):
pattern = '(' + '|'.join(query) + ')'
regex = re.compile(pattern, re.IGNORECASE)
else:
raise TypeError('Invalid query type')
return regex
[docs]
class Request(object, metaclass=Singleton):
"""HTTP 요청을 보내는 클래스
HTTP 요청을 위해 사용되는 클래스입니다.
User-Agent 및 Cookies 관련 정보를 저장하고 있습니다.
Attributes
---------
s: Session
Requests Session
delay: float
Delay for repeat delay, Default: 1s
"""
def __init__(self):
self.s = requests.Session()
self.update_user_agent()
# 분당 1000회 이상 자체적으로 24시간 IP차단
# IP 차단 방지 위해 delay 0.1s -> 0.2s
self.delay = 0.2
[docs]
def update_user_agent(self, force: bool = False):
""" Update User-Agent
Parameters
----------
force: bool
Force update
"""
if force:
ua = UserAgent(os=['windows', 'macos', 'linux'], platforms='pc')
agent = ua.random # Random user-agent
user_agent = str(agent)
else:
user_agent = get_user_agent()
self.s.headers.update({'user-agent': user_agent})
[docs]
def set_proxies(self, proxies: dict = None):
""" Set proxies
Parameters
----------
proxies: dict
proxies
"""
if proxies is not None:
import copy
self.s.proxies = copy.deepcopy(proxies)
[docs]
def set_delay(self, second: float = None):
""" Set delay
Parameters
----------
second: float
delay for repeat
"""
self.delay = second
[docs]
def request(self,
url: str,
method: str = 'GET',
payload: dict = None,
referer: str = None,
stream: bool = False,
timeout: int = 120):
""" send http requests
Parameters
----------
url: str
URL
method: str, optional
GET, OPTIONS, POST, PUT, PATCH or DELETE
payload: dict, optional
Request parameters
referer: str, optional
Temporary referer
stream: bool, optional
Stream optional, default False
timeout: int, optional
default 120s
Returns
-------
requests.Response
Response
"""
headers = self.s.headers
if referer is not None:
headers['referer'] = referer
# Session-level state such as cookies will not get applied to your request.
# To get a PreparedRequest with that state applied,
# replace the call to Request.prepare() with a call to Session.prepare_request()
req = requests.Request(method, url=url, params=payload, headers=headers)
prepped = self.s.prepare_request(req)
resp = self.s.send(prepped, stream=stream, timeout=timeout)
if self.delay is not None:
sleep(self.delay)
return resp
[docs]
def get(self, url: str,
payload: dict = None,
referer: str = None,
stream: bool = False,
timeout: int = 120):
""" Request get method
Parameters
----------
url: str
URL
payload: dict, optional
Request parameters
referer: str, optional
Temporary referer
stream: bool, optional
Stream optional, default False
timeout: int, optional
default 120s
Returns
-------
requests.Response
Response
"""
return self.request(url=url, method='GET', payload=payload, referer=referer, stream=stream, timeout=timeout)
[docs]
def post(self, url: str,
payload: dict = None,
referer: str = None,
stream: bool = False,
timeout: int = 120):
""" Request post method
Parameters
----------
url: str
URL
payload: dict, optional
Request parameters
referer: str, optional
Temporary referer
stream: bool, optional
Stream optional, default False
timeout: int, optional
default 120s
Returns
-------
requests.Response
Response
"""
return self.request(url=url, method='POST', payload=payload, referer=referer, stream=stream, timeout=timeout)
[docs]
def download(self,
url: str,
path: str,
filename: str = None,
method: str = 'GET',
payload: dict = None,
referer: str = None,
timeout: int = 120) -> dict:
""" Download File
Parameters
----------
url: str
Request URL
path: str
Download Path
filename: str
filename for saving
method: str, optional
Request Method
payload: dict, optional
Request parameters
referer: str, optional
Temporary referer
timeout: int, optional
default 120s
Returns
-------
dict
filename, path, full_path
"""
from .spinner import Spinner
from .file import create_folder
from urllib.parse import unquote
import os
# Create Folder
create_folder(path)
r = self.request(url=url, method=method, payload=payload, referer=referer, stream=True, timeout=timeout)
# Check validity
headers = r.headers.get('Content-Disposition')
if headers is None or not re.search('attachment', headers):
raise FileNotFoundError('target does not exist')
# total_size = int(r.headers.get('content-length', 0))
block_size = 8192
# Extract filename
extracted_filename = unquote(re.findall(r'filename="?([^"]*)"?', headers)[0])
if filename is None:
filename = extracted_filename
else:
filename = filename.format(extracted_filename)
spinner = Spinner('Downloading ' + filename)
spinner.start()
file_path = os.path.join(path, filename)
with open(file_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=block_size):
if chunk is not None:
f.write(chunk)
r.close()
spinner.stop()
return {'filename': filename, 'path': path, 'full_path': file_path}
# Request object
request = Request()