python requests
  0voAMrgZhFuV 2023年11月02日 51 0

参考资料

菜鸟 Python requests 模块

Python Requests库进阶用法——timeouts, retries, hooks

中文文档地址:http://cn.python-requests.org/zh_CN/latest/

英文文档地址:https://2.python-requests.org/en/master/api/

后台接口

package com.laolang.shop.modules.admin.controller;

import cn.hutool.json.JSONUtil;
import com.laolang.shop.common.annotation.AnonymousAccess;
import com.laolang.shop.common.domain.R;
import com.laolang.shop.modules.admin.rsp.SysDictTypeRsp;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RequestMapping("admin/test")
@RestController
public class SysTestController {

    // get 不带参数
    @AnonymousAccess
    @GetMapping("info")
    public R<SysDictTypeRsp> info() {
        SysDictTypeRsp rsp = new SysDictTypeRsp();
        rsp.setId(1L);
        rsp.setName("性别");
        rsp.setCode("gender");
        return R.ok(rsp);
    }

    // get 带参数
    @AnonymousAccess
    @GetMapping("info2")
    public R<SysDictTypeRsp> info2(@RequestParam("name") String name) {
        log.info("name:{}", name);
        SysDictTypeRsp rsp = new SysDictTypeRsp();
        rsp.setId(1L);
        rsp.setName("性别");
        rsp.setCode("gender");
        return R.ok(rsp);
    }

    // post Content-Type": "application/json;charset=UTF-8
    @AnonymousAccess
    @PostMapping("add")
    public R<Void> add(@RequestBody SysDictTypeRsp req) {
        log.info("req:{}", JSONUtil.toJsonStr(req));
        return R.ok();
    }

    // post Content-Type": "application/x-www-form-urlencoded
    @AnonymousAccess
    @PostMapping("add2")
    public R<Void> add2(@RequestParam("id") Long id, @RequestParam("name") String name,
                        @RequestParam("code") String code) {
        log.info("id:{}, name:{}, code:{}", id, name, code);
        return R.ok();
    }
}


Python requests

# -*- coding: utf-8 -*-
import json

import requests
from loguru import logger

baseUrl = 'http://localhost:8091'


def test_get():
    """
    get 不带参数
    """
    url = baseUrl + '/admin/test/info'
    r = requests.get(url)
    logger.info(r.status_code)
    logger.info(r.text)


def test_get_with_param():
    """
    get 带参数
    """
    url = baseUrl + '/admin/test/info2'
    params = {
        "name": "性别"
    }
    r = requests.get(url, params=params)
    logger.info(r.status_code)
    logger.info(r.text)


def test_post_request_body():
    """
    post body 请求
    """
    url = baseUrl + '/admin/test/add'
    headers = {
        "Content-Type": "application/json;charset=UTF-8"
    }
    body = {
        "id": 1,
        "name": "性别",
        "code": "gender"
    }
    r = requests.post(url, headers=headers, json=body)
    logger.info(r.status_code)
    logger.info(r.text)


def test_post_request_form():
    """
    post 表单请求
    """
    url = baseUrl + '/admin/test/add2'
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    form = {
        "id": 1,
        "name": "性别",
        "code": "gender"
    }
    r = requests.post(url, headers=headers, data=form)
    logger.info(r.status_code)
    logger.info(r.text)


def test_response():
    """
    响应内容
    返回值是 json 的话 , 可以用 r.json() , 方法, 返回的是一个字典
    """
    url = baseUrl + '/admin/test/info'
    r = requests.get(url)
    logger.info(r.status_code)  # HTTP 响应状态码
    logger.info(r.headers)  # 响应头
    logger.info(r.content)  # 响应内容
    logger.info(r.text)  # 响应内容的文本形式
    logger.info(r.json())  # 响应内容的 json 形式
    logger.info(r.json()['code'])
    logger.info(r.json()['body']['id'])
    logger.info(r.url)  # url
    logger.info(r.cookies)  # 响应的 cookie
    logger.info(r.encoding)  # 响应的编码


if __name__ == '__main__':
    test_get()
    test_get_with_param()
    test_post_request_body()
    test_post_request_form()
    test_response()

request 参数

requests.get(),requests.post() 等调用的斗士 requests.request()方法,  requests.request() 调用的是 session.request 方法, 具体参数如下

参数名

默认值

含义

method

None

请求方式

url

None

请求路径

params

None

params 参数

data

None

data 参数

headers

None

请求头

cookies

None

cookie 信息

files

None

文件上传

auth

None

鉴权

timeout

None

超时

allow_redirects

True

重定向

proxies

None

代理

hooks

None

钩子

stream

None

文件下载

verify

None

证书验证

cert

None

CA证书

json

None

json参数


钩子

# -*- coding: utf-8 -*-

import requests
from loguru import logger

# baseUrl
baseUrl = 'http://localhost:8091'


# 断言 HTTP 响应状态码不是 4xx 或者 5xx
def assert_http_status_code_hook(response, *args, **kwargs):
    logger.info("assert_http_status_code_hook")
    response.raise_for_status()


# 判断业务状态码是不是 200
def assert_logic_code_hook(response, *args, **kwargs):
    logger.info("assert_logic_code_hook")
    if '200' != response.json()['code']:
        raise Exception('业务请求失败')


# 继承 requests.Session, 重写 prepare_request 方法
# 比如修改 url
# 填充 token 请求头等
class CustomSession(requests.Session):
    def prepare_request(self, request):
        logger.info("prepare_request")
        request.url = baseUrl + request.url
        return super().prepare_request(request)


http = CustomSession()

http.hooks['response'].append(assert_http_status_code_hook)
http.hooks['response'].append(assert_logic_code_hook)


def test_get():
    url = '/admin/test/info'
    r = http.request(method='GET', url=url)
    logger.info(r.status_code)
    logger.info(r.json())


if __name__ == '__main__':
    test_get()

requests 简易封装

代码

# -*- coding: utf-8 -*-
from datetime import datetime

import requests
from loguru import logger

# 登录地址
login_url = '/auth/login'
# token 过期时间, 29 分钟, 比后端少 1 分钟
expire_time = 29
# 请求成功的业务状态码
logic_success_code = '200'


# 自定义 session , 主要是凭借请求地址
class CustomSession(requests.Session):
    def __init__(self, baseurl):
        self.baseurl = baseurl
        super().__init__()

    def prepare_request(self, request):
        request.url = self.baseurl + request.url
        return super().prepare_request(request)


# 打印响应
def print_response_hook(response, *args, **kwargs):
    logger.info("response:{}".format(response.text))


# 断言 HTTP 响应状态码不是 4xx 或者 5xx
def assert_http_status_code_hook(response, *args, **kwargs):
    # logger.info("assert_http_status_code_hook")
    response.raise_for_status()


# 判断业务状态码是不是 200
def assert_logic_code_hook(response, *args, **kwargs):
    # logger.info("assert_logic_code_hook")
    if logic_success_code != response.json()['code']:
        raise Exception('业务请求失败')


# token 信息
class TokenInfo:
    username = None
    password = None
    token = None
    login_time = None

    def __init__(self, username, password):
        self.username = username
        self.password = password


# request 工具类
class RequestUtil:
    # 用户 token 信息列表
    tokens = {}
    # session 实例
    http = None
    # baseurl
    baseurl = None
    # 当前用户名
    curr_username = None
    # 当前用户 token
    curr_token = None

    def __init__(self, baseurl):
        self.baseurl = baseurl

        # 初始化 session 实例
        self.http = CustomSession(self.baseurl)
        self.http.hooks['response'].append(print_response_hook)
        self.http.hooks['response'].append(assert_http_status_code_hook)
        self.http.hooks['response'].append(assert_logic_code_hook)

    # 登陆
    def login(self, username, password):
        if username not in self.tokens:
            body = {
                "username": username,
                "password": password
            }
            r = self.send(method='POST', url=login_url, json=body)
            token = r.json()['body']['token']

            token_info = TokenInfo(username=username, password=password)
            token_info.token = token
            token_info.login_time = datetime.now()

            self.tokens[username] = token_info
            self.curr_username = username
            self.curr_token = token

    # 刷新 token
    def refresh_token(self):
        total_seconds = (datetime.now() - self.tokens[self.curr_username].login_time).total_seconds()
        diff = total_seconds // 60
        if diff >= expire_time:
            username = self.tokens[self.curr_username].username
            password = self.tokens[self.curr_username].password
            del self.tokens[self.curr_username]
            self.login(username=username, password=password)
        pass

    # 发送请求
    def send(self, method, url, **kwargs):
        # 如果不是登陆请求, 则需要添加 token 请求头
        if login_url != url:
            headers = {}
            if 'headers' in kwargs:
                headers = kwargs['headers']
            headers['Authorization'] = 'Bearer ' + self.curr_token
            kwargs['headers'] = headers
            self.refresh_token()  # 刷新 token
        logger.info("url:{}".format(self.baseurl + url))
        return self.http.request(method=method, url=url, **kwargs)

使用

# -*- coding: utf-8 -*-

from loguru import logger

from commons.request_util import RequestUtil

# baseUrl
baseUrl = 'http://localhost:8091'

http = RequestUtil(baseUrl)


def test_get():
    # admin 请求
    logger.info("admin request")
    http.login('admin', '123456')
    url = '/admin/sysdict/info'
    r = http.send(method='GET', url=url)
    logger.info(r.json())

    # laolang 请求
    logger.info("laolang request")
    http.login('laolang', '123456')
    url = '/admin/sysdict/info'
    r = http.send(method='GET', url=url)
    logger.info(r.json())

    # 再次使用 admin 请求
    logger.info("admin request")
    http.login('admin', '123456')
    url = '/admin/sysdict/info'
    r = http.send(method='GET', url=url)
    logger.info(r.json())


if __name__ == '__main__':
    test_get()

python requests _自动化测试

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
0voAMrgZhFuV
作者其他文章 更多
最新推荐 更多