#! coding:utf-8
# compatible for win32 / python 2 & 3
from __future__ import division, print_function
import argparse
import hashlib
import importlib
import json
import os
import pickle
import re
import shlex
import signal
import sys
import time
import timeit
from fractions import Fraction
from functools import wraps
from logging import getLogger
from threading import Lock, Thread
from .configs import Config
from .exceptions import ImportErrorModule
from .logs import print_info
from .main import run_after_async, threads, tPool
from .versions import PY2, PY3
logger = getLogger("torequests")
if PY2:
import repr as reprlib
from Queue import Empty, PriorityQueue
from urllib import quote, quote_plus, unquote_plus
from urlparse import (
parse_qs,
parse_qsl,
urlparse,
unquote,
urljoin,
urlsplit,
urlunparse,
)
from cgi import escape
import HTMLParser
unescape = HTMLParser.HTMLParser().unescape
def retry(tries=1, exceptions=(Exception,), catch_exception=False):
def wrapper_sync(function):
@wraps(function)
def retry_sync(*args, **kwargs):
for _ in range(tries):
try:
return function(*args, **kwargs)
except exceptions as err:
error = err
if catch_exception:
return error
raise error
return retry_sync
return wrapper_sync
elif PY3:
import reprlib
from urllib.parse import (
parse_qs,
parse_qsl,
urlparse,
quote,
quote_plus,
unquote,
unquote_plus,
urljoin,
urlsplit,
urlunparse,
)
from html import escape, unescape
from queue import Empty, PriorityQueue
from ._py3_patch import retry
unicode = str
else:
logger.warning('Unhandled python version.')
__all__ = "parse_qs parse_qsl urlparse quote quote_plus unquote unquote_plus urljoin urlsplit urlunparse escape unescape simple_cmd print_mem curlparse Null null itertools_chain slice_into_pieces slice_by_size ttime ptime split_seconds timeago timepass md5 Counts unique unparse_qs unparse_qsl Regex kill_after UA try_import ensure_request Timer ClipboardWatcher Saver guess_interval split_n find_one register_re_findone Cooldown curlrequests sort_url_query retry get_readable_size".split(
" ")
NotSet = object()
[docs]def simple_cmd():
"""
``Deprecated``: Not better than ``fire`` -> pip install fire
"""
parser = argparse.ArgumentParser(
prog="Simple command-line function toolkit.",
description="""Input function name and args and kwargs.
python xxx.py main -a 1 2 3 -k a=1,b=2,c=3""",
)
parser.add_argument("-f", "--func_name", default="main")
parser.add_argument("-a", "--args", dest="args", nargs="*")
parser.add_argument("-k", "--kwargs", dest="kwargs")
parser.add_argument(
"-i",
"-s",
"--info",
"--show",
"--status",
dest="show",
action="store_true",
help="show the args, kwargs and function's source code.",
)
params = parser.parse_args()
func_name = params.func_name
func = globals().get(func_name)
if not (callable(func)):
logger.warning("invalid func_name: %s" % func_name)
return
args = params.args or []
kwargs = params.kwargs or {}
if kwargs:
items = [re.split("[:=]", i) for i in re.split("[,;]+", kwargs)]
kwargs = dict(items)
if params.show:
from inspect import getsource
logger.info("args: %s; kwargs: %s" % (args, kwargs))
logger.info(getsource(func))
return
func(*args, **kwargs)
[docs]def get_readable_size(input_num,
unit=None,
rounded=NotSet,
format="%s %s",
units=None,
carry=1024):
"""Show the num readable with unit.
:param input_num: raw number
:type input_num: float, int
:param unit: target unit, defaults to None for auto set.
:type unit: str, optional
:param rounded: defaults to NotSet return raw float without round.
:type rounded: None or int, optional
:param format: output string format, defaults to "%s %s"
:type format: str, optional
:param units: unit list, defaults to None for computer storage unit
:type units: list, optional
:param carry: carry a number as in adding, defaults to 1024
:type carry: int, optional
:return: string for input_num with unit.
:rtype: str
"""
units = units or ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB', 'BB']
result_size = input_num
if unit in units:
result_size = input_num / (carry**units.index(unit))
else:
unit = units[0]
for idx, _unit in enumerate(units):
_result_size = input_num / (carry**units.index(_unit))
if _result_size < 1:
break
result_size = _result_size
unit = _unit
if rounded is not NotSet:
if rounded is None and PY2:
# PY2 rounded should not be None
result_size = int(result_size)
else:
result_size = round(result_size, rounded)
result = format % (result_size, unit)
return result
[docs]def print_mem(unit=None, callback=print_info, rounded=2):
"""Show the proc-mem-cost with psutil, use this only for lazinesssss.
:param unit: B, KB, MB, GB.
"""
try:
import psutil
B = float(psutil.Process(os.getpid()).memory_info().vms)
result = get_readable_size(B, unit=unit, rounded=rounded)
callback(result)
return result
except ImportError:
print("pip install psutil first.")
class _Curl:
"""Curl args parser.
**Use curlparse function directly.**
"""
parser = argparse.ArgumentParser()
parser.add_argument("curl")
parser.add_argument("url")
parser.add_argument("-X", "--method", default="get")
parser.add_argument("-A", "--user-agent")
parser.add_argument("-u", "--user") # <user[:password]>
parser.add_argument("-x", "--proxy") # proxy.com:port
parser.add_argument("-d", "--data")
parser.add_argument("-F", "--form")
parser.add_argument("--data-binary")
parser.add_argument("--connect-timeout", type=float)
parser.add_argument(
"-H", "--header", action="append", default=[]) # key: value
parser.add_argument("--compressed", action="store_true")
[docs]def curlparse(string, encoding="utf-8"):
"""Translate curl-string into dict of request. Do not support file upload which contains @file_path.
:param string: standard curl-string, like `r'''curl ...'''`.
:param encoding: encoding for post-data encoding.
Basic Usage::
>>> from torequests.utils import curlparse
>>> curl_string = '''curl 'https://p.3.cn?skuIds=1&nonsense=1&nonce=0' -H 'Pragma: no-cache' -H 'DNT: 1' -H 'Accept-Encoding: gzip, deflate' -H 'Accept-Language: zh-CN,zh;q=0.9' -H 'Upgrade-Insecure-Requests: 1' -H 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8' -H 'Cache-Control: no-cache' -H 'Referer: https://p.3.cn?skuIds=1&nonsense=1&nonce=0' -H 'Cookie: ASPSESSIONIDSQRRSADB=MLHDPOPCAMBDGPFGBEEJKLAF' -H 'Connection: keep-alive' --compressed'''
>>> request_args = curlparse(curl_string)
>>> request_args
{'url': 'https://p.3.cn?skuIds=1&nonsense=1&nonce=0', 'headers': {'Pragma': 'no-cache', 'Dnt': '1', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Cache-Control': 'no-cache', 'Referer': 'https://p.3.cn?skuIds=1&nonsense=1&nonce=0', 'Cookie': 'ASPSESSIONIDSQRRSADB=MLHDPOPCAMBDGPFGBEEJKLAF', 'Connection': 'keep-alive'}, 'method': 'get'}
>>> import requests
>>> requests.request(**request_args)
<Response [200]>
"""
assert "\n" not in string, 'curl-string should not contain \\n, try r"...".'
if string.startswith("http"):
return {"url": string, "method": "get"}
try:
lex_list = shlex.split(string.strip())
except ValueError as e:
if str(e) == 'No closing quotation' and string.count("'") % 2 != 0:
print_info(
"If `data` has single-quote ('), the `data` should be quote by double-quote, and add the `backslash`(\\) before original \"."
)
raise e
args, unknown = _Curl.parser.parse_known_args(lex_list)
requests_args = {}
headers = {}
requests_args["url"] = args.url
for header in args.header:
key, value = header.split(":", 1)
headers[key.title()] = value.strip()
if args.user_agent:
headers["User-Agent"] = args.user_agent
if headers:
requests_args["headers"] = headers
if args.user:
requests_args["auth"] = tuple(
u for u in args.user.split(":", 1) + [""])[:2]
# if args.proxy:
# pass
data = args.data or args.data_binary or args.form
if data:
if data.startswith("$"):
data = data[1:]
args.method = "post"
if PY2:
# TODO not fix the UnicodeEncodeError, so use `replace`, damn python2.x.
data = data.replace(r'\r', '\r').replace(r'\n', '\n')
else:
data = data.encode(
'latin-1',
'backslashreplace').decode('unicode-escape').encode(encoding)
requests_args["data"] = data
requests_args["method"] = args.method.lower()
if args.connect_timeout:
requests_args["timeout"] = args.connect_timeout
return requests_args
[docs]class Null(object):
"""Null instance will return self when be called, it will alway be False."""
def __init__(self, *args, **kwargs):
return
def __call__(self, *args, **kwargs):
return self
def __getattr__(self, mname):
return self
def __setattr__(self, name, value):
return self
def __getitem__(self, key):
return self
def __delattr__(self, name):
return self
def __repr__(self):
return ""
def __str__(self):
return ""
def __bool__(self):
return False
def __nonzero__(self):
return False
null = Null()
[docs]def slice_into_pieces(seq, n):
"""Slice a sequence into `n` pieces, return a generation of n pieces"""
length = len(seq)
if length % n == 0:
size = length // n
else:
size = length // n + 1
for it in slice_by_size(seq, size):
yield it
[docs]def slice_by_size(seq, size):
"""Slice a sequence into chunks, return as a generation of chunks with `size`."""
filling = null
for it in zip(*(itertools_chain(seq, [filling] * size),) * size):
if filling in it:
it = tuple(i for i in it if i is not filling)
if it:
yield it
[docs]def ttime(timestamp=None, tzone=None, fail="", fmt="%Y-%m-%d %H:%M:%S"):
"""Translate timestamp into human-readable: %Y-%m-%d %H:%M:%S.
:param timestamp: the timestamp float, or `time.time()` by default.
:param tzone: time compensation, int(-time.timezone / 3600) by default,
(can be set with Config.TIMEZONE).
:param fail: while raising an exception, return it.
:param fmt: %Y-%m-%d %H:%M:%S, %z not work.
:rtype: str
>>> ttime()
2018-03-15 01:24:35
>>> ttime(1486572818.421858323)
2017-02-09 00:53:38
"""
tzone = Config.TIMEZONE if tzone is None else tzone
fix_tz = tzone * 3600
if timestamp is None:
timestamp = time.time()
else:
timestamp = float(timestamp)
if 1e12 <= timestamp < 1e13:
# Compatible timestamp with 13-digit milliseconds
timestamp = timestamp / 1000
try:
timestamp = time.time() if timestamp is None else timestamp
return time.strftime(fmt, time.gmtime(timestamp + fix_tz))
except Exception:
return fail
[docs]def ptime(timestr=None, tzone=None, fail=0, fmt="%Y-%m-%d %H:%M:%S"):
"""Translate %Y-%m-%d %H:%M:%S into timestamp.
:param timestr: string like 2018-03-15 01:27:56, or time.time() if not set.
:param tzone: time compensation, int(-time.timezone / 3600) by default,
(can be set with Config.TIMEZONE).
:param fail: while raising an exception, return it.
:param fmt: %Y-%m-%d %H:%M:%S, %z not work.
:rtype: int
>>> ptime('2018-03-15 01:27:56')
1521048476
"""
tzone = Config.TIMEZONE if tzone is None else tzone
fix_tz = -(tzone * 3600 + time.timezone)
#: str(timestr) for datetime.datetime object
timestr = str(timestr or ttime())
try:
return int(time.mktime(time.strptime(timestr, fmt)) + fix_tz)
except Exception:
return fail
[docs]def split_seconds(seconds):
"""Split seconds into [day, hour, minute, second, ms]
`divisor: 1, 24, 60, 60, 1000`
`units: day, hour, minute, second, ms`
>>> split_seconds(6666666)
[77, 3, 51, 6, 0]
"""
ms = seconds * 1000
divisors = (1, 24, 60, 60, 1000)
quotient, result = ms, []
for divisor in divisors[::-1]:
quotient, remainder = divmod(quotient, divisor)
result.append(quotient) if divisor == 1 else result.append(remainder)
return result[::-1]
[docs]def timeago(seconds=0, accuracy=4, format=0, lang="en", short_name=False):
"""Translate seconds into human-readable.
:param seconds: seconds (float/int).
:param accuracy: 4 by default (units[:accuracy]), determine the length of elements.
:param format: index of [led, literal, dict].
:param lang: en or cn.
:param units: day, hour, minute, second, ms.
>>> timeago(93245732.0032424, 5)
'1079 days, 05:35:32,003'
>>> timeago(93245732.0032424, 4, 1)
'1079 days 5 hours 35 minutes 32 seconds'
>>> timeago(-389, 4, 1)
'-6 minutes 29 seconds 0 ms'
"""
assert format in [0, 1,
2], ValueError("format arg should be one of 0, 1, 2")
negative = "-" if seconds < 0 else ""
is_en = lang == "en"
seconds = abs(seconds)
if is_en:
if short_name:
units = ("day", "hr", "min", "sec", "ms")
else:
units = ("day", "hour", "minute", "second", "ms")
elif lang == "cn":
if short_name:
units = (u"日", u"时", u"分", u"秒", u"毫秒")
else:
units = (u"天", u"小时", u"分钟", u"秒", u"毫秒")
times = split_seconds(seconds)
if format == 2:
return dict(zip(units, times))
day, hour, minute, second, ms = times
if format == 0:
day_str = (
"%d %s%s, " % (day, units[0], "s" if day > 1 and is_en else "")
if day else "")
mid_str = ":".join(("%02d" % i for i in (hour, minute, second)))
if accuracy > 4:
mid_str += ",%03d" % ms
return negative + day_str + mid_str
elif format == 1:
if seconds:
# find longest valid fields index (non-zero for head and tail)
for index, item in enumerate(times):
if item != 0:
head_index = index
break
for index, item in enumerate(reversed(times)):
if item != 0:
tail_index = len(times) - index
break
result_str = [
"%d %s%s" % (num, unit,
"s" if is_en and num > 1 and unit != "ms" else "")
for num, unit in zip(times, units)
][head_index:tail_index][:accuracy]
result_str = " ".join(result_str)
else:
result_str = "0 %s" % units[-1]
return negative + result_str
# alias name
timepass = timeago
[docs]def md5(string, n=32, encoding="utf-8", skip_encode=False):
"""str(obj) -> md5_string
:param string: string to operate.
:param n: md5_str length.
>>> from torequests.utils import md5
>>> md5(1, 10)
'923820dcc5'
>>> md5('test')
'098f6bcd4621d373cade4e832627b4f6'
"""
todo = string if skip_encode else unicode(string).encode(encoding)
if n == 32:
return hashlib.md5(todo).hexdigest()
elif isinstance(n, (int, float)):
return hashlib.md5(todo).hexdigest()[(32 - n) // 2:(n - 32) // 2]
elif isinstance(n, (tuple, list)):
return hashlib.md5(todo).hexdigest()[n[0]:n[1]]
[docs]class Counts(object):
"""Counter for counting the times been called
>>> from torequests.utils import Counts
>>> cc = Counts()
>>> cc.x
1
>>> cc.x
2
>>> cc.now
2
>>> cc.current
2
>>> cc.sub()
1
"""
__slots__ = ("start", "step", "current", "total")
def __init__(self, start=0, step=1):
self.start = start
self.step = step
self.current = start
self.total = -1
[docs] def clear(self):
self.current = self.start
@property
def x(self):
return self.add()
@property
def s(self):
return self.sub()
@property
def c(self):
return self.x
@property
def now(self):
return self.current
[docs] def add(self, num=None):
self.current += num or self.step
return self.current
[docs] def sub(self, num=None):
self.current -= num or self.step
return self.current
[docs]def unique(seq, key=None, return_as=None):
"""Unique the seq and keep the order.
Instead of the slow way:
`lambda seq: (x for index, x in enumerate(seq) if seq.index(x)==index)`
:param seq: raw sequence.
:param return_as: generator for default, or list / set / str...
>>> from torequests.utils import unique
>>> a = [1,2,3,4,2,3,4]
>>> unique(a)
<generator object unique.<locals>.<genexpr> at 0x05720EA0>
>>> unique(a, str)
'1234'
>>> unique(a, list)
[1, 2, 3, 4]
"""
seen = set()
add = seen.add
if key:
generator = (x for x in seq if key(x) not in seen and not add(key(x)))
else:
generator = (x for x in seq if x not in seen and not add(x))
if return_as:
if return_as == str:
return "".join(map(str, generator))
else:
return return_as(generator)
else:
# python2 not support yield from
return generator
[docs]def unparse_qs(qs, sort=False, reverse=False):
"""Reverse conversion for parse_qs"""
result = []
items = qs.items()
if sort:
items = sorted(items, key=lambda x: x[0], reverse=reverse)
for keys, values in items:
query_name = quote(keys)
for value in values:
result.append(query_name + "=" + quote(value))
return "&".join(result)
[docs]def unparse_qsl(qsl, sort=False, reverse=False):
"""Reverse conversion for parse_qsl"""
result = []
items = qsl
if sort:
items = sorted(items, key=lambda x: x[0], reverse=reverse)
for keys, values in items:
query_name = quote(keys)
result.append(query_name + "=" + quote(values))
return "&".join(result)
[docs]class Regex(object):
"""Register some objects(like functions) to the regular expression.
>>> from torequests.utils import Regex, re
>>> reg = Regex()
>>> @reg.register_function('http.*cctv.*')
... def mock():
... pass
...
>>> reg.register('http.*HELLOWORLD', 'helloworld', instances='http://helloworld', flags=re.I)
>>> reg.register('http.*HELLOWORLD2', 'helloworld2', flags=re.I)
>>> reg.find('http://cctv.com')
[<function mock at 0x031FC5D0>]
>>> reg.match('http://helloworld')
['helloworld']
>>> reg.match('non-http://helloworld')
[]
>>> reg.search('non-http://helloworld')
['helloworld']
>>> len(reg.search('non-http://helloworld2'))
2
>>> print(reg.show_all())
('http.*cctv.*') => => <class 'function'> mock ""
('http.*HELLOWORLD', re.IGNORECASE) => http://helloworld => <class 'str'> helloworld
('http.*HELLOWORLD2', re.IGNORECASE) => => <class 'str'> helloworld2
"""
def __init__(self, ensure_mapping=False):
"""
:param ensure_mapping: ensure mapping one to one, if False,
will return all(more than 1) mapped object list."""
self.container = []
self.ensure_mapping = ensure_mapping
[docs] def register(self, patterns, obj=None, instances=None, **reg_kwargs):
"""Register one object which can be matched/searched by regex.
:param patterns: a list/tuple/set of regex-pattern.
:param obj: return it while search/match success.
:param instances: instance list will search/match the patterns.
:param reg_kwargs: kwargs for re.compile.
"""
assert obj, "bool(obj) should be True."
patterns = patterns if isinstance(patterns,
(list, tuple, set)) else [patterns]
instances = instances or []
instances = (instances if isinstance(instances, (list, tuple,
set)) else [instances])
for pattern in patterns:
pattern_compiled = re.compile(pattern, **reg_kwargs)
self.container.append((pattern_compiled, obj, instances))
if self.ensure_mapping:
# check all instances to avoid one-to-many instances.
self._check_instances()
else:
# no need to check all instances.
for instance in instances:
assert self.search(instance) == [
obj
] or self.match(instance) == [obj], (
"instance %s should fit at least one pattern %s" %
(instance, pattern))
[docs] def register_function(self, patterns, instances=None, **reg_kwargs):
"""Decorator for register."""
def wrapper(function):
self.register(patterns, function, instances=instances, **reg_kwargs)
return function
return wrapper
[docs] def find(self, string, default=None):
"""Return match or search result.
:rtype: list"""
return self.match(string) or self.search(string) or default
[docs] def search(self, string, default=None):
"""Use re.search to find the result
:rtype: list"""
default = default if default else []
result = [item[1] for item in self.container if item[0].search(string)]
if self.ensure_mapping:
assert len(result) < 2, "%s matches more than one pattern: %s" % (
string,
result,
)
return result if result else default
[docs] def match(self, string, default=None):
"""Use re.search to find the result
:rtype: list"""
default = default if default else []
result = [item[1] for item in self.container if item[0].match(string)]
if self.ensure_mapping:
assert len(result) < 2, "%s matches more than one pattern: %s" % (
string,
result,
)
return result if result else default
def _check_instances(self):
for item in self.container:
for instance in item[2]:
assert self.search(instance) or self.match(
instance), "instance %s not fit pattern %s" % (
instance, item[0].pattern)
[docs] def show_all(self, as_string=True):
""", python2 will not show flags"""
result = []
for item in self.container:
pattern = str(item[0])[10:] if PY3 else item[0].pattern
instances = item[2] or []
value = ('%s "%s"' % (item[1].__name__, (item[1].__doc__ or ""))
if callable(item[1]) else str(item[1]))
value = "%s %s" % (type(item[1]), value)
result.append(" => ".join((pattern, ",".join(instances), value)))
return "\n".join(result) if as_string else result
[docs]def kill_after(seconds, timeout=2):
"""Kill self after seconds"""
pid = os.getpid()
kill = os.kill
run_after_async(seconds, kill, pid, signal.SIGTERM)
run_after_async(seconds + timeout, kill, pid, 9)
[docs]class UA:
"""Some common User-Agents for crawler.
Android, iPhone, iPad, Firefox, Chrome, IE6, IE9"""
__slots__ = ()
Android = "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 6 Build/LYZ28E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Mobile Safari/537.36"
iPhone = "Mozilla/5.0 (iPhone; CPU iPhone OS 10_3 like Mac OS X) AppleWebKit/602.1.50 (KHTML, like Gecko) CriOS/56.0.2924.75 Mobile/14E5239e Safari/602.1"
iPad = "Mozilla/5.0 (iPad; CPU OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1"
Firefox = (
"Mozilla/5.0 (Windows NT 10.0; WOW64; rv:54.0) Gecko/20100101 Firefox/54.0"
)
Chrome = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36"
IE6 = "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)"
IE9 = "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0;"
WECHAT_ANDROID = "Mozilla/5.0 (Linux; Android 5.0; SM-N9100 Build/LRX21V) > AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 > Chrome/37.0.0.0 Mobile Safari/537.36 > MicroMessenger/6.0.2.56_r958800.520 NetType/WIFI"
WECHAT_IOS = "Mozilla/5.0 (iPhone; CPU iPhone OS 5_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Mobile/9B176 MicroMessenger/4.3.2"
[docs]def try_import(module_name, names=None, default=ImportErrorModule, warn=True):
"""Try import module_name, except ImportError and return default,
sometimes to be used for catch ImportError and lazy-import.
"""
try:
module = importlib.import_module(module_name)
except ImportError:
if warn:
if warn is True:
logger.warning(
"Module `%s` not found. Install it to remove this warning" %
module_name)
else:
warn(module_name, names, default)
module = (ImportErrorModule(module_name)
if default is ImportErrorModule else default)
if not names:
return module
if not isinstance(names, (tuple, set, list)):
names = [names]
result = []
for name in names:
if hasattr(module, name):
result.append(module.__getattribute__(name))
else:
if default is ImportErrorModule:
result.append(ImportErrorModule("%s.%s" % (module_name, name)))
else:
result.append(default)
return result[0] if len(result) == 1 else result
[docs]def ensure_request(request):
"""Used for requests.request / Requests.request with **ensure_request(request)**
:param request: dict or curl-string or url
:type request: [dict]
:return: dict of request
:rtype: [dict]
Basic Usage::
>>> from torequests.utils import ensure_request
>>> ensure_request('''curl http://test.com''')
{'url': 'http://test.com', 'method': 'get'}
>>> ensure_request('http://test.com')
{'method': 'get', 'url': 'http://test.com'}
>>> ensure_request({'method': 'get', 'url': 'http://test.com'})
{'method': 'get', 'url': 'http://test.com'}
>>> ensure_request({'url': 'http://test.com'})
{'url': 'http://test.com', 'method': 'get'}
"""
if isinstance(request, dict):
result = request
elif isinstance(request, (unicode, str)):
request = request.strip()
if request.startswith("http"):
result = {"method": "get", "url": request}
elif request.startswith("curl "):
result = curlparse(request)
else:
raise ValueError("request should be dict or str.")
result["method"] = result.setdefault("method", "get").lower()
return result
[docs]class Timer(object):
"""
Usage:
init Timer anywhere:
such as head of function, or head of module, then it will show log after del it by gc.
:param name: be used in log or None.
:param log_func: some function to show process.
:param default_timer: use `timeit.default_timer` by default.
:param rounding: None, or seconds will be round(xxx, rounding)
:param readable: None, or use `timepass`: readable(cost_seconds) -> 00:00:01,234
Basic Usage::
from torequests.utils import Timer
import time
Timer()
@Timer.watch()
def test(a=1):
Timer()
time.sleep(1)
def test_inner():
t = Timer('test_non_del')
time.sleep(1)
t.x
test_inner()
test(3)
time.sleep(1)
# [2018-03-10 02:16:48]: Timer [00:00:01]: test_non_del, start at 2018-03-10 02:16:47.
# [2018-03-10 02:16:48]: Timer [00:00:02]: test(a=3), start at 2018-03-10 02:16:46.
# [2018-03-10 02:16:48]: Timer [00:00:02]: test(3), start at 2018-03-10 02:16:46.
# [2018-03-10 02:16:49]: Timer [00:00:03]: <module>: __main__ (temp_code.py), start at 2018-03-10 02:16:46.
"""
def __init__(
self,
name=None,
log_func=None,
default_timer=None,
rounding=None,
readable=None,
log_after_del=True,
stack_level=1,
):
readable = readable or timepass
self._log_after_del = False
self.start_at = time.time()
uid = md5("%s%s" % (self.start_at, id(self)))
if not name:
f_name = sys._getframe(stack_level).f_code.co_name
f_local = sys._getframe(stack_level).f_locals
if f_name == "<module>":
f_vars = ": %s (%s)" % (
f_local.get("__name__"),
os.path.split(f_local.get("__file__"))[-1],
)
# f_vars = f_vars.replace(' __main__', '')
else:
f_vars = ("(%s)" % ", ".join([
"%s=%s" % (i, repr(f_local[i]))
for i in sorted(f_local.keys())
]) if f_local else "()")
if self not in f_local.values():
# add self to name space for __del__ way.
sys._getframe(stack_level).f_locals.update(**{uid: self})
name = "%s%s" % (f_name, f_vars)
self.name = name
self.log_func = log_func
self.timer = default_timer or timeit.default_timer
self.rounding = rounding
self.readable = readable
self.start_timer = self.timer()
self._log_after_del = log_after_del
@property
def string(self):
"""Only return the expect_string quietly."""
return self.tick()
@property
def x(self):
"""Call self.log_func(self) and return expect_string."""
self._log_after_del = False
passed_string = self.string
if self.log_func:
self.log_func(self)
else:
print_info(
"Timer [%(passed)s]: %(name)s, start at %(start)s." % (dict(
name=self.name,
start=ttime(self.start_at),
passed=passed_string)))
return passed_string
@property
def passed(self):
"""Return the cost_seconds after starting up."""
return self.timer() - self.start_timer
[docs] def tick(self):
"""Return the time cost string as expect."""
string = self.passed
if self.rounding:
string = round(string)
if self.readable:
string = self.readable(string)
return string
[docs] @staticmethod
def watch(*timer_args, **timer_kwargs):
"""Decorator for Timer."""
def wrapper(function):
@wraps(function)
def inner(*args, **kwargs):
args1 = ", ".join(map(repr, args)) if args else ""
kwargs1 = ", ".join([
"%s=%s" % (i, repr(kwargs[i]))
for i in sorted(kwargs.keys())
])
arg = ", ".join(filter(None, [args1, kwargs1]))
name = "%s(%s)" % (function.__name__, arg)
_ = Timer(name=name, *timer_args, **timer_kwargs)
result = function(*args, **kwargs)
return result
return inner
return wrapper
def __del__(self):
if self._log_after_del:
# not be called by self.x yet.
self.x
def __enter__(self):
return self
def __exit__(self, *args):
self.x
def ensure_dict_key_title(dict_obj):
"""Set the dict key as key.title(); keys should be str.
Always be used to headers.
>>> from torequests.utils import ensure_dict_key_title
>>> ensure_dict_key_title({'hello-world':1, 'HELLOWORLD':2})
{'Hello-World': 1, 'Helloworld': 2}
"""
if not all((isinstance(i, unicode) for i in dict_obj.keys())):
return dict_obj
return {key.title(): value for key, value in dict_obj.items()}
[docs]class ClipboardWatcher(object):
"""Watch clipboard with `pyperclip`, run callback while changed."""
def __init__(self, interval=0.2, callback=None):
self.pyperclip = try_import("pyperclip")
self.interval = interval
self.callback = callback or self.default_callback
self.temp = self.current
[docs] def read(self):
"""Return the current clipboard content."""
return self.pyperclip.paste()
[docs] def write(self, text):
"""Rewrite the current clipboard content."""
return self.pyperclip.copy(text)
@property
def current(self):
"""Return the current clipboard content."""
return self.read()
[docs] def default_callback(self, text):
"""Default clean the \\n in text."""
text = text.replace("\r\n", "\n")
text = "%s\n" % text
flush_print(text, sep="", end="")
return text
[docs] def watch(self, limit=None, timeout=None):
"""Block method to watch the clipboard changing."""
start_time = time.time()
count = 0
while not timeout or time.time() - start_time < timeout:
new = self.read()
if new != self.temp:
count += 1
self.callback(new)
if count == limit:
break
self.temp = new
time.sleep(self.interval)
@property
def x(self):
"""Return self.watch()"""
return self.watch()
[docs] @threads(1)
def watch_async(self, limit=None, timeout=None):
"""Non-block method to watch the clipboard changing."""
return self.watch(limit=limit, timeout=timeout)
[docs]class Saver(object):
"""
Simple object persistent toolkit with pickle/json,
if only you don't care the performance and security.
**Do not set the key startswith "_"**
:param path: if not set, will be ~/_saver.db. print(self._path) to show it.
Set pickle's protocol < 3 for compatibility between python2/3,
but use -1 for performance and some other optimizations.
:param save_mode: pickle / json.
>>> ss = Saver()
>>> ss._path
'/home/work/_saver.json'
>>> ss.a = 1
>>> ss['b'] = 2
>>> str(ss)
{'a': 1, 'b': 2, 'c': 3, 'd': 4}
>>> del ss.b
>>> str(ss)
"{'a': 1, 'c': 3, 'd': 4}"
>>> ss._update({'c': 3, 'd': 4})
>>> ss
Saver(path="/home/work/_saver.json"){'a': 1, 'c': 3, 'd': 4}
"""
_instances = {}
_locks = {}
_protected_keys = {
"_auto_backup",
"_lock",
"_path",
"_saver_args",
"_save_mode",
"_cache",
"__getitem__",
"_keys",
"_values",
"__getattr__",
"__len__",
"_popitem",
"_shutdown",
"__setitem__",
"__delitem__",
"_save_obj",
"_get",
"__dict__",
"_clear",
"_locks",
"__weakref__",
"_items",
"__module__",
"_pop",
"__contains__",
"_load",
"_save",
"_update",
"_set",
"_protected_keys",
"_instances",
"_get_home_path",
"_save_back_up",
}
_protected_keys = _protected_keys | set(object.__dict__.keys())
def __new__(cls,
path=None,
save_mode="json",
auto_backup=False,
**saver_args):
# BORG
path = path or cls._get_home_path(save_mode=save_mode)
return cls._instances.setdefault(path, super(Saver, cls).__new__(cls))
def __init__(self,
path=None,
save_mode="json",
auto_backup=False,
**saver_args):
super(Saver, self).__init__()
self._auto_backup = auto_backup
self._lock = self.__class__._locks.setdefault(path, Lock())
self._path = path or self._get_home_path(save_mode=save_mode)
self._saver_args = saver_args
self._save_mode = save_mode
self._cache = self._load()
@classmethod
def _get_home_path(cls, save_mode=None):
home = os.path.expanduser("~")
if save_mode == "json":
ext = "json"
elif save_mode == "pickle":
ext = "pkl"
else:
ext = "db"
file_name = "_saver.%s" % ext
path = os.path.join(home, file_name)
return path
def _save_back_up(self):
with open(self._path, "rb") as f_raw:
with open(self._path + ".bk", "wb") as f_bk:
f_bk.write(f_raw.read())
def _save_obj(self, obj):
mode = "wb" if self._save_mode == "pickle" else "w"
with self._lock:
with open(self._path, mode) as f:
if self._save_mode == "json":
json.dump(obj, f, **self._saver_args)
if self._save_mode == "pickle":
pickle.dump(obj, f, **self._saver_args)
if self._auto_backup:
self._save_back_up()
return obj
def _load(self):
if not (os.path.isfile(self._path) and os.path.getsize(self._path)):
cache = {}
self._save_obj(cache)
return cache
mode = "rb" if self._save_mode == "pickle" else "r"
with self._lock:
with open(self._path, mode) as f:
if self._save_mode == "json":
return json.load(f)
if self._save_mode == "pickle":
return pickle.load(f)
def _save(self):
return self._save_obj(self._cache)
def _set(self, key, value):
if self._save_mode == "json":
try:
json.dumps(value)
except TypeError:
logger.warning(
"Saver._set(%s, %s) failed: bad type, using str(value) instead."
% (key, value))
value = str(value)
self._cache[key] = value
self._save()
def _get(self, key, default=None):
return self._cache.get(key, default)
def __setattr__(self, key, value):
if key in self._protected_keys:
object.__setattr__(self, key, value)
else:
self._set(key, value)
def __getattr__(self, key):
if key in self._protected_keys:
return object.__getattribute__(self, key)
return self._get(key)
def __contains__(self, key):
return key in self._cache
def __delattr__(self, key):
self._cache.pop(key, None)
self._save()
def __dir__(self):
return dir(object)
def __len__(self):
return len(self._cache)
def _clear(self):
self._cache = {}
self._save()
def _shutdown(self):
if self._auto_backup:
os.remove(self._path + ".bk")
return os.remove(self._path)
def _keys(self):
return self._cache.keys()
def _items(self):
return self._cache.items()
def _values(self):
return self._cache.values()
def _pop(self, key, default=None):
result = self._cache.pop(key, default)
self._save()
return result
def _popitem(self):
result = self._cache.popitem()
self._save()
return result
def _update(self, *args, **kwargs):
self._cache.update(*args, **kwargs)
self._save()
def __getitem__(self, key):
if key in self._cache:
return self._get(key)
raise KeyError
def __setitem__(self, key, value):
self._set(key, value)
def __delitem__(self, key):
self._cache.pop(key, None)
self._save()
def __str__(self):
return str(self._cache)
def __repr__(self):
return 'Saver(path="%s")%s' % (self._path, reprlib.repr(self._cache))
[docs]def guess_interval(nums, accuracy=0):
"""Given a seq of number, return the median, only calculate interval >= accuracy.
Basic Usage::
from torequests.utils import guess_interval
import random
seq = [random.randint(1, 100) for i in range(20)]
print(guess_interval(seq, 5))
# sorted_seq: [2, 10, 12, 19, 19, 29, 30, 32, 38, 40, 41, 54, 62, 69, 75, 79, 82, 88, 97, 99]
# diffs: [8, 7, 10, 6, 13, 8, 7, 6, 6, 9]
# median: 8
"""
if not nums:
return 0
nums = sorted([int(i) for i in nums])
if len(nums) == 1:
return nums[0]
diffs = [nums[i + 1] - nums[i] for i in range(len(nums) - 1)]
diffs = [item for item in diffs if item >= accuracy]
sorted_diff = sorted(diffs)
result = sorted_diff[len(diffs) // 2]
return result
def _re_split_mixin(string, sep, reg=False):
if reg:
return re.split(sep, string)
else:
return string.split(sep)
[docs]def split_n(string, seps, reg=False):
r"""Split strings into n-dimensional list.
Basic Usage::
from torequests.utils import split_n
ss = '''a b c d e f 1 2 3 4 5 6
a b c d e f 1 2 3 4 5 6
a b c d e f 1 2 3 4 5 6'''
print(split_n(ss, ('\n', ' ', ' ')))
# [[['a', 'b', 'c'], ['d', 'e', 'f'], ['1', '2', '3'], ['4', '5', '6']], [['a', 'b', 'c'], ['d', 'e', 'f'], ['1', '2', '3'], ['4', '5', '6']], [['a', 'b', 'c'], ['d', 'e', 'f'], ['1', '2', '3'], ['4', '5', '6']]]
print(split_n(ss, ['\s+'], reg=1))
# ['a', 'b', 'c', 'd', 'e', 'f', '1', '2', '3', '4', '5', '6', 'a', 'b', 'c', 'd', 'e', 'f', '1', '2', '3', '4', '5', '6', 'a', 'b', 'c', 'd', 'e', 'f', '1', '2', '3', '4', '5', '6']
"""
deep = len(seps)
if not deep:
return string
return [
split_n(i, seps[1:]) for i in _re_split_mixin(string, seps[0], reg=reg)
]
def bg(func):
"""Run a function in background, will not block main thread's exit.(thread.daemon=True)
Basic Usage::
from torequests.utils import bg, print_info
import time
def test1(n):
time.sleep(n)
print_info(n, 'done')
@bg
def test2(n):
time.sleep(n)
print_info(n, 'done')
test3 = bg(test1)
test2(1)
test3(1)
print_info('not be blocked')
time.sleep(2)
# [2018-06-12 23:46:19](L81): not be blocked
# [2018-06-12 23:46:20](L81): 1 done
# [2018-06-12 23:46:20](L81): 1 done
"""
@wraps(func)
def wrapper(*args, **kwargs):
t = Thread(target=func, args=args, kwargs=kwargs)
t.daemon = True
t.start()
return t
return wrapper
def countdown(
seconds=None,
block=True,
interval=1,
daemon=True,
tick_callback=None,
finish_callback=None,
):
"""Run a countdown function to wait something, similar to threading.Timer,
but will show the detail tick by tick_callback.
Basic Usage::
from torequests.utils import countdown
countdown(3)
# 3 2 1
# countdown finished [3 seconds]: 2018-06-13 00:12:55 => 2018-06-13 00:12:58.
countdown('2018-06-13 00:13:29')
# 10 9 8 7 6 5 4 3 2 1
# countdown finished [10 seconds]: 2018-06-13 00:13:18 => 2018-06-13 00:13:28.
"""
def default_tick_callback(s, seconds, *args):
flush_print(s, sep="", end=" ")
def default_finish_callback(seconds, start_time):
flush_print()
def cd(seconds, interval):
for s in range(seconds, 0, -interval):
tick_callback(s, seconds, interval)
time.sleep(interval)
if callable(finish_callback):
finish_callback(seconds, start_time)
start_time = time.time()
tick_callback = tick_callback or default_tick_callback
finish_callback = (default_finish_callback
if finish_callback is None else finish_callback)
if unicode(seconds).isdigit():
seconds = int(seconds)
elif isinstance(seconds, (unicode, str)):
seconds = int(ptime(seconds) - time.time())
t = Thread(target=cd, args=(seconds, interval))
t.daemon = daemon
t.start()
if block:
t.join()
def flush_print(*args, **kwargs):
"""
Like print_function at python3, support flush, but not support file.
:param sep: space by default
:param end: '\\n' by default
:param flush: True by default
Basic Usage::
import time
from torequests.utils import flush_print
flush_print("=" * 10)
for _ in range(10):
time.sleep(0.2)
flush_print("=", sep="", end="")
"""
# PY2 raise SyntaxError for : def flush_print(*args, sep='', end=''):
sep, end, flush = (
kwargs.pop("sep", " "),
kwargs.pop("end", "\n"),
kwargs.pop("flush", 1),
)
string = sep.join((unicode(i) for i in args))
sys.stdout.write("%s%s" % (string, end))
if flush:
sys.stdout.flush()
class ProgressBar(object):
"""Simple progress bar.
:param size: total counts of calling ProgressBar.x.
:param length: length of print log.
:param sig: string of each printing log.
Basic Usage::
pb = ProgressBar(50, 10)
for _ in range(50):
time.sleep(0.1)
pb.x
print("current completion rate:", pb.completion_rate)
# ==========
# ==========
# current completion rate: 1.0
"""
def __init__(self, size, length=100, sig="="):
self.size = size or 0
self.length = length
self.sig = sig
self.current = 0
self.last_print = 0
self.printed = 0
if size:
# use Fraction for the deviation of division
self.chunk = Fraction(self.size, self.length)
flush_print(self.sig * self.length)
else:
self.chunk = 1
def add(self, step):
# ensure step >= 0
self.current += step
count = int((self.current - self.last_print) / self.chunk)
if count < 1:
return self.printed
for _ in range(count):
self.printed += 1
flush_print(self.sig, end="")
self.last_print = count * self.chunk + self.last_print
if self.current == self.size:
flush_print()
return self.printed
@property
def x(self):
return self.add(1)
@property
def completion_rate(self):
return self.current / self.size
class RegMatch(object):
"""JS-like match object. Use index number to get groups, if not match or no group, will return ''."""
def __init__(self, item):
self.item = item
def __getattr__(self, key, default=null):
return getattr(self.item, key, default)
def __getitem__(self, index):
if self.item is None:
return ""
if not isinstance(index, int):
raise IndexError
try:
return self.item.group(index)
except IndexError:
return ""
@classmethod
def find_one(cls, pattern, string, flags=0):
"""JS-like match object. Use index number to get groups, if not match or no group, will return ''.
Basic Usage::
>>> from torequests.utils import find_one
>>> string = "abcd"
>>> find_one("a.*", string)
<torequests.utils.RegMatch object at 0x0705F1D0>
>>> find_one("a.*", string)[0]
'abcd'
>>> find_one("a.*", string)[1]
''
>>> find_one("a(.)", string)[0]
'ab'
>>> find_one("a(.)", string)[1]
'b'
>>> find_one("a(.)", string)[2] or "default"
'default'
>>> import re
>>> item = find_one("a(B)(C)", string, flags=re.I | re.S)
>>> item
<torequests.utils.RegMatch object at 0x0705F1D0>
>>> item[0]
'abc'
>>> item[1]
'b'
>>> item[2]
'c'
>>> item[3]
''
>>> # import re
>>> # re.findone = find_one
>>> register_re_findone()
>>> re.findone('a(b)', 'abcd')[1] or 'default'
'b'
"""
item = re.search(pattern, string, flags=flags)
return cls(item)
find_one = RegMatch.find_one
[docs]def register_re_findone():
"""import re; re.findone = find_one"""
re.findone = find_one
class TimeItem(object):
"""Used for Cooldown."""
__slots__ = ('data', 'use_at')
def __init__(self, data, use_at):
self.data = data
self.use_at = use_at
def __hash__(self):
return hash(self.data)
def __gt__(self, other):
return self.use_at > other.use_at
def __ge__(self, other):
return self.use_at >= other.use_at
def __lt__(self, other):
return self.use_at < other.use_at
def __le__(self, other):
return self.use_at <= other.use_at
def __eq__(self, other):
return self.use_at == other.use_at
def __ne__(self, other):
return self.use_at != other.use_at
[docs]class Cooldown(object):
"""Thread-safe Cooldown toolkit.
:param init_items: iterables to add into the default queue at first.
:param interval: each item will cooldown `interval` seconds before return.
:param born_at_now: if be set True, the item.use_at will be set time.time()
instead of 0 when adding to queue at the first time.
>>> from torequests.logs import print_info
>>> cd = Cooldown(range(1, 3), interval=2)
>>> cd.add_items([3, 4])
>>> cd.add_item(5)
>>> for _ in range(7):
... print_info(cd.get(1, 'timeout'))
[2019-01-17 01:50:59] pyld.py(152): 1
[2019-01-17 01:50:59] pyld.py(152): 3
[2019-01-17 01:50:59] pyld.py(152): 5
[2019-01-17 01:50:59] pyld.py(152): 2
[2019-01-17 01:50:59] pyld.py(152): 4
[2019-01-17 01:51:00] pyld.py(152): timeout
[2019-01-17 01:51:01] pyld.py(152): 1
>>> cd.size
5
"""
def __init__(self, init_items=None, interval=0, born_at_now=False):
self.interval = interval
self.queue = PriorityQueue()
self.use_at_function = self.get_now_timestamp if born_at_now else lambda: 0
self.add_items(init_items or [])
@property
def size(self):
return self.queue.qsize()
@property
def all_items(self):
return [item.data for item in self.queue.queue]
[docs] def get_now_timestamp(self):
return time.time()
[docs] def add_item(self, item):
if not isinstance(item, TimeItem):
item = TimeItem(item, self.use_at_function())
self.queue.put(item)
[docs] def add_items(self, items):
for item in items:
self.add_item(item)
[docs] def remove_item(self, item):
self.queue.queue = [i for i in self.queue.queue if i.data != item]
return self.queue.qsize()
[docs] def remove_items(self, items):
self.queue.queue = [i for i in self.queue.queue if i.data in items]
return self.queue.qsize()
[docs] def get(self, timeout=None, default=None):
try:
start_time = time.time()
if timeout is None:
timeout = float('inf')
while time.time() - start_time < timeout:
item = self.queue.get(timeout=timeout)
if time.time() - item.use_at < self.interval:
self.queue.put(item)
wait_time = self.interval - (time.time() - item.use_at)
wait_time = min((wait_time, timeout))
time.sleep(wait_time)
continue
item.use_at = self.get_now_timestamp()
self.queue.put(item)
return item.data
else:
return default
except Empty:
return default
[docs]def curlrequests(curl_string, **kwargs):
"""Use tPool to request for curl string.
If kwargs contains the req which hasattr request method, like req=requests.
:param curl_string: standard curl string.
:type curl_string: str
:param kwargs: valid kwargs for tPool.
:type curl_string: dict
Basic Usage::
from torequests.utils import curlrequests
r = curlrequests('''curl 'http://p.3.cn/' -H 'Connection: keep-alive' -H 'Cache-Control: max-age=0' -H 'Upgrade-Insecure-Requests: 1' -H 'User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36' -H 'DNT: 1' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8' -H 'Accept-Encoding: gzip, deflate' -H 'Accept-Language: zh-CN,zh;q=0.9,en;q=0.8' -H 'If-None-Match: "55dd9090-264"' -H 'If-Modified-Since: Wed, 26 Aug 2015 10:10:24 GMT' --compressed''', retry=1)
print(r.text)
"""
req = kwargs.pop('req', tPool())
kwargs.update(curlparse(curl_string))
return req.request(**kwargs)
[docs]def sort_url_query(url, reverse=False, _replace_kwargs=None):
"""sort url query args.
_replace_kwargs is a dict to update attributes before sorting (such as scheme / netloc...).
http://www.google.com?b=2&z=26&a=1 => http://www.google.com?a=1&b=2&z=26
"""
parsed = urlparse(url)
if _replace_kwargs:
parsed = parsed._replace(**_replace_kwargs)
sorted_parsed = parsed._replace(
query=unparse_qsl(parse_qsl(parsed.query), sort=True, reverse=reverse))
return urlunparse(sorted_parsed)