client
上一篇读了
parser.py
,处理语法
$var
,
${func($var1)}
变量、函数的文件,这次来看
client.py
看名字就差不多明白了 请求方法封装
导包
import json # 内置json库import time # 时间处理import requests # 第三方请求库 requestsimport urllib3 # 内置的 urllib3 requests库基于它实现from loguru import logger # 日志库from requests import Request, Response # 导入 请求类、响应类from requests.exceptions import ( # 一些异常类InvalidSchema,InvalidURL,MissingSchema,RequestException,)from httprunner.models import RequestData, ResponseDatafrom httprunner.models import SessionData, ReqRespDatafrom httprunner.utils import lower_dict_keys, omit_long_data # key转换小写, 长度处理
RequestsData & ResponseData
这里把之前的模型类复制过来
# 请求class RequestData(BaseModel):method: MethodEnum = MethodEnum.GETurl: Urlheaders: Headers = {}cookies: Cookies = {}body: Union[Text, bytes, List, Dict, None] = {}# 响应class ResponseData(BaseModel):status_code: intheaders: Dictcookies: Cookiesencoding: Union[Text, None] = Nonecontent_type: Textbody: Union[Text, bytes, List, Dict]
源码附注释
# 禁用InsecureRequestWarningurllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)# 继承Response 类 ,重写 raise_for_status 状态码异常方法class ApiResponse(Response):def raise_for_status(self):if hasattr(self, "error") and self.error:raise self.errorResponse.raise_for_status(self)# 从响应对象获取到请求和响应信息def get_req_resp_record(resp_obj: Response) -> ReqRespData:""" get request and response info from Response() object."""def log_print(req_or_resp, r_type):msg = f"\\n================== {r_type} details ==================\\n"for key, value in req_or_resp.dict().items():if isinstance(value, dict) or isinstance(value, list):value = json.dumps(value, indent=4, ensure_ascii=False)msg += "{:<8} : {}\\n".format(key, value)logger.debug(msg)# record actual request info # 响应对象拿到请求headerrequest_headers = dict(resp_obj.request.headers)request_cookies = resp_obj.request._cookies.get_dict() # 拿到请求cookiesrequest_body = resp_obj.request.body # 拿到请求体if request_body is not None:try:request_body = json.loads(request_body)except json.JSONDecodeError:# str: a=1&b=2passexcept UnicodeDecodeError:# bytes/bytearray: request body in protobufpassexcept TypeError:# neither str nor bytes/bytearray, e.g. <MultipartEncoder>passrequest_content_type = lower_dict_keys(request_headers).get("content-type")if request_content_type and "multipart/form-data" in request_content_type:# upload file type 如果是上传文件 就改请求体内容request_body = "upload file stream (OMITTED)"# 实例RequestData模型request_data = RequestData(method=resp_obj.request.method,url=resp_obj.request.url,headers=request_headers,cookies=request_cookies,body=request_body,)# log request details in debug modelog_print(request_data, "request")# record response inforesp_headers = dict(resp_obj.headers) # 响应头lower_resp_headers = lower_dict_keys(resp_headers)content_type = lower_resp_headers.get("content-type", "")if "image" in content_type:# response is image type, record bytes content onlyresponse_body = resp_obj.content # 二进制内容获取else:try:# try to record json dataresponse_body = resp_obj.json() # 响应结果except ValueError:# only record at most 512 text charactorsresp_text = resp_obj.textresponse_body = omit_long_data(resp_text) # 长度处理# 实例化ResponseData模型response_data = ResponseData(status_code=resp_obj.status_code,cookies=resp_obj.cookies or {},encoding=resp_obj.encoding,headers=resp_headers,content_type=content_type,body=response_body,)# log response details in debug modelog_print(response_data, "response")# 实例化ReqRespData 其就是 RequestData ResponseData 组成req_resp_data = ReqRespData(request=request_data, response=response_data)return req_resp_data# 继承requests.Sessionclass HttpSession(requests.Session):"""Class for performing HTTP requests and holding (session-) cookies between requests (in orderto be able to log in and out of websites). Each request is logged so that HttpRunner candisplay statistics.This is a slightly extended version of `python-request <http://python-requests.org>`_\'s:py:class:`requests.Session` class and mostly this class works exactly the same."""def __init__(self):# 调用父类构造方法super(HttpSession, self).__init__()# 实例SessionData模型self.data = SessionData()def update_last_req_resp_record(self, resp_obj): # 更新响应对象"""update request and response info from Response() object."""# TODO: fixself.data.req_resps.pop()self.data.req_resps.append(get_req_resp_record(resp_obj))def request(self, method, url, name=None, **kwargs): # 请求方法"""Constructs and sends a :py:class:`requests.Request`.Returns :py:class:`requests.Response` object.:param method:method for the new :class:`Request` object.:param url:URL for the new :class:`Request` object.:param name: (optional)Placeholder, make compatible with Locust\'s HttpSession:param params: (optional)Dictionary or bytes to be sent in the query string for the :class:`Request`.:param data: (optional)Dictionary or bytes to send in the body of the :class:`Request`.:param headers: (optional)Dictionary of HTTP Headers to send with the :class:`Request`.:param cookies: (optional)Dict or CookieJar object to send with the :class:`Request`.:param files: (optional)Dictionary of ``\'filename\': file-like-objects`` for multipart encoding upload.:param auth: (optional)Auth tuple or callable to enable Basic/Digest/Custom HTTP Auth.:param timeout: (optional)How long to wait for the server to send data before giving up, as a float, or \\a (`connect timeout, read timeout <user/advanced.html#timeouts>`_) tuple.:type timeout: float or tuple:param allow_redirects: (optional)Set to True by default.:type allow_redirects: bool:param proxies: (optional)Dictionary mapping protocol to the URL of the proxy.:param stream: (optional)whether to immediately download the response content. Defaults to ``False``.:param verify: (optional)if ``True``, the SSL cert will be verified. A CA_BUNDLE path can also be provided.:param cert: (optional)if String, path to ssl client cert file (.pem). If Tuple, (\'cert\', \'key\') pair."""self.data = SessionData()# timeout default to 120 secondskwargs.setdefault("timeout", 120)# set stream to True, in order to get client/server IP/Portkwargs["stream"] = Truestart_timestamp = time.time() # 计时response = self._send_request_safe_mode(method, url, **kwargs)response_time_ms = round((time.time() - start_timestamp) * 1000, 2) # 算时间try: # 拿客户端数据,确实现在才知道这种操作client_ip, client_port = response.raw.connection.sock.getsockname()self.data.address.client_ip = client_ipself.data.address.client_port = client_portlogger.debug(f"client IP: {client_ip}, Port: {client_port}")except AttributeError as ex:logger.warning(f"failed to get client address info: {ex}")try:# 拿服务端数据server_ip, server_port = response.raw.connection.sock.getpeername()self.data.address.server_ip = server_ipself.data.address.server_port = server_portlogger.debug(f"server IP: {server_ip}, Port: {server_port}")except AttributeError as ex:logger.warning(f"failed to get server address info: {ex}")# get length of the response contentcontent_size = int(dict(response.headers).get("content-length") or 0)# record the consumed timeself.data.stat.response_time_ms = response_time_msself.data.stat.elapsed_ms = response.elapsed.microseconds / 1000.0 # 响应时间self.data.stat.content_size = content_size# record request and response histories, include 30X redirectionresponse_list = response.history + [response]self.data.req_resps = [get_req_resp_record(resp_obj) for resp_obj in response_list]try:response.raise_for_status()except RequestException as ex:logger.error(f"{str(ex)}")else:logger.info(f"status_code: {response.status_code}, "f"response_time(ms): {response_time_ms} ms, "f"response_length: {content_size} bytes")return responsedef _send_request_safe_mode(self, method, url, **kwargs):"""Send a HTTP request, and catch any exception that might occur due to connection problems.Safe mode has been removed from requests 1.x."""try:return requests.Session.request(self, method, url, **kwargs)except (MissingSchema, InvalidSchema, InvalidURL):raiseexcept RequestException as ex:resp = ApiResponse()resp.error = exresp.status_code = 0 # with this status_code, content returns Noneresp.request = Request(method, url).prepare() # "Constructs a :class:`PreparedRequest <PreparedRequest>` for transmission and returns it.return resp