当前位置: 代码迷 >> 综合 >> 使用django rest framwork 写接口
  详细解决方案

使用django rest framwork 写接口

热度:85   发布时间:2023-12-02 06:40:57.0

本文使用django rest framwork 来写接口,包括了以下10种类型:

1.单增 2.群增 3.单查 4.群查 5.单整体改(个人觉得有了单局部改,单整体改的意义不是很大) 6.单局部改  7.群整体改(同理)

8.群局部改 9.单删 10.群删

1.models层

from django.db import models
class BaseModel(models.Model): #基类is_delete = models.BooleanField(default=False)create_time = models.DateTimeField(auto_now_add=True)class Meta:abstract = True
class Testcase(BaseModel):name = models.CharField(max_length=120)pre_step = models.CharField(max_length=200)step = models.CharField(max_length=300)expect = models.CharField(max_length=200)actual = models.CharField(max_length=200)class Meta:db_table = 'testcase'verbose_name = '测试用例'verbose_name_plural = verbose_namedef __str__(self):return self.name

 

2.serializers层

from rest_framework.serializers import ModelSerializer
from rest_framework import serializers
from rest_framework import exceptions
from . import models
class MyListSerializer(serializers.ListSerializer):  # 群改需要重新update方法def update(self, instance, validated_data):for index, obj in enumerate(instance):self.child.update(obj, validated_data[index])return instanceclass TestcaseSerializer(ModelSerializer):create_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S", required=False, read_only=True)class Meta:model = models.Testcasefields = '__all__'list_serializer_class = MyListSerializer  # 为了群改def validate_name(self, value):if models.Testcase.objects.filter(name=value, is_delete=False):raise exceptions.ValidationError('测试用例已存在')return value

3.views层 

from rest_framework.viewsets import GenericViewSet
from rest_framework import mixins
from . import models
from . import serializers
from utils.myresponse import myResponse
class TestcaseView(mixins.ListModelMixin, mixins.RetrieveModelMixin, mixins.CreateModelMixin, mixins.UpdateModelMixin,GenericViewSet):queryset = models.Testcase.objects.filter(is_delete=False)serializer_class = serializers.TestcaseSerializerdef my_get_list(self, request, *args, **kwargs):  # 群查response = self.list(request, *args, **kwargs)return myResponse(data=response.data, count=len(response.data))def my_get_obj(self, request, *args, **kwargs):  # 单查response = self.retrieve(request, *args, **kwargs)return myResponse(data=response.data)def my_create(self, request, *args, **kwargs):  # 单增和群增res_data = request.dataif isinstance(res_data, dict):many = Falseelif isinstance(res_data, list):many = Trueelse:return myResponse(msg='传参格式错误啦')seria = self.get_serializer(data=res_data, many=many)seria.is_valid(raise_exception=True)seria.save()return myResponse(data=seria.data)def my_update(self, request, *args, **kwargs):  # 单整体改res = self.update(request, *args, **kwargs)return myResponse(data=res.data)def my_patch(self, request, *args, **kwargs):  # 单局部改res = self.partial_update(request, *args, **kwargs)return myResponse(data=res.data)def my_updates(self, request, *args, **kwargs):  # 群整体改req_data = request.dataif not isinstance(req_data, list):return myResponse(msg='格式错误,传参格式应该要是一个列表')objs = []request_data = []for rd in req_data:if 'pk' not in rd:return myResponse('数据没有包含pk字段')pk = rd.pop('pk', None)obj = models.Testcase.objects.filter(pk=pk, is_delete=False).first()if not obj:return myResponse('要更新的数据不存在,pk=%s' % pk)objs.append(obj)request_data.append(rd)# seria = self.get_serializer(instance=objs, data=request_data, many=True, partial=False)seria = serializers.TestcaseSerializer(instance=objs, data=request_data, partial=False, many=True)seria.is_valid(raise_exception=True)seria.save()return myResponse(seria.data)def my_patchs(self, request, *args, **kwargs):  # 群局部改req_data = request.dataif not isinstance(req_data, list):return myResponse(msg='格式错误,传参格式应该要是一个列表')objs = []request_data = []for rd in req_data:if 'pk' not in rd:return myResponse('数据没有包含pk字段')pk = rd.pop('pk', None)obj = models.Testcase.objects.filter(pk=pk, is_delete=False).first()if not obj:return myResponse('要更新的数据不存在,pk=%s' % pk)objs.append(obj)request_data.append(rd)seria = self.get_serializer(instance=objs, data=request_data, partial=True, many=True)seria.is_valid(raise_exception=True)seria.save()return myResponse(data=seria.data)def my_delete(self, request, *args, **kwargs):  # 单删pk = kwargs.get('pk')obj = models.Testcase.objects.filter(pk=pk, is_delete=False).first()if not obj:return myResponse(msg='数据不存在')obj.is_delete = Trueobj.save()return myResponse(msg='删除成功')def my_deletes(self, request, *args, **kwargs):  # 群删if not isinstance(request.data, dict):return myResponse(msg='数据格式错误,期望是字典')pks = request.data.get('pks')if not pks:return myResponse(msg='传参有问题,没有传pks')for pk in pks:obj = models.Testcase.objects.filter(pk=pk, is_delete=False).first()if not obj:return myResponse(msg='数据不存在,pk=%s' % pk)for pk in pks:obj = models.Testcase.objects.filter(pk=pk, is_delete=False).first()obj.is_delete = Trueobj.save()return myResponse(msg='删除成功')

4.urls层:

from django.contrib import admin
from django.urls import path,include
from django.views.static import serve
from django.conf import settingsurlpatterns = [path('admin/', admin.site.urls),path('api01/', include('api01.urls'))
]

5.api01 urls层

from django.urls import path
from . import viewsurlpatterns = [path('testcase/', views.TestcaseView.as_view({'get': 'my_get_list', 'post': 'my_create', 'put': 'my_updates', 'patch': 'my_patchs','delete': 'my_deletes'})),path('testcase/<pk>/', views.TestcaseView.as_view({'get': 'my_get_obj', 'put': 'my_update', 'patch': 'my_patch', 'delete': 'my_delete'}))
]

 6.utils 中自定义的myResponse

from rest_framework.response import Responseclass myResponse(Response):def __init__(self, data=None, status=0, http_status=None, msg='执行成功', headers=None, exception=False, *args,**kwargs):data1 = {'status': status,'msg': msg,'data': data}data1.update(kwargs)super().__init__(data=data1, status=http_status, headers=headers, exception=exception)
  相关解决方案