Skip to content
On this page

过滤器使用方法

过滤器编写

python
from common.filters import Filter
from typing import Optional
from pydantic import Field
from tortoise.queryset import QuerySet


class OrganizationFilter(Filter):
    is_org: Optional[bool] = Field(None, description="是否组织")

    def is_org_filter(self, value, queryset: QuerySet) -> QuerySet:
        if value is not None:
            return queryset.filter(node_type=0 if value else 1)
        return queryset

    class Meta:
        ordering = ["id", "node_type"]
        search_fields = ["name__icontains"]
        filter_fields = ["leader"]

在视图方法中使用

python
async def get_list(
    page_params: PaginationParams = Depends(PaginationParams),
    # 定义参数
    filters: filter_params = Depends(OrganizationFilter),
    _: User = Depends(get_current_user),
) -> page_res_model:
    queryset = self.model.filter(is_active=True)
    # 使用过滤器过滤数据
    queryset = filters.filter(queryset)
    return await list_pagination(page_params, queryset, res_model)

在模型中使用(影响自动生成curd中的列表接口)

python

class Organization(BaseModel):
    sort = fields.IntField(description="排序", default=0)
    name = fields.CharField(max_length=256, description="组织名称")
    code = fields.CharField(max_length=256, description="组织编号", unique=True)
    node_type = fields.IntField(description="组织架构类型 (单位或部门)0 单位 1 部门", default=0)
    leader = fields.CharField(max_length=256, description="负责人", null=True)

    class Meta:
        ordering = ["sort"]
        tags = ["系统管理", "组织"]
        describe = "组织"
        auto_curd = True
        filters = OrganizationFilter

模型数据校验

对已有数据进行重复校验示例

python
class MapOrganizationCamera(BaseModel):
    """
    场景摄像头绑定(网点)
    """
    map_org = fields.ForeignKeyField("models.MapOrganization", related_name="mapcamera_map", description="场景管理")
    code = fields.CharField(max_length=64, description="摄像头编号")

    async def save(
            self,
            using_db: Optional[BaseDBAsyncClient] = None,
            update_fields: Optional[Iterable[str]] = None,
            force_create: bool = False,
            force_update: bool = False,
    ) -> None:
        if await MapOrganizationCamera.filter(code=self.code, is_active=True).exclude(id=self.id).exists():
            raise HTTPException(detail=f"摄像头编号: {self.code} 已存在")
        await super().save(using_db, update_fields, force_create, force_update)

    class PydanticMeta:
        backward_relations = False
        exclude_raw_fields = False
        exclude = ("map_org",)

    class Meta:
        tags = ["场景"]
        describe = "场景摄像头"
        auto_curd = True
        generate_target = ["list", "create", "update", "delete", "retrieve"]

WebSocket使用说明

前端连接说明

  1. 前端连接WebSocket服务器地址:ws://服务器IP地址:端口号/ws/?room_id=room_id
  2. websocket连接成功后发送当前用户Token作为校验信息,格式为:{"token": "Token" },Token校验完成后保持websocket连接,校验失败或逾期(暂定2秒)未发送Token则关闭websocket连接。
  3. 具体消息格式请依据具体场景定义,消息格式为json字符串。

后端使用说明

WebSocket后端采用订阅机制处理websocket消息,订阅后端会收到所有房间内的消息。

无鉴权示例代码:

python
from apps.websocket.handler import Handler
from apps.websocket.subscriber import SubscribeMap, Subscriber
from project.server import app_router


# 定义WebSocket处理类
class TestWsHandler(Handler):
    # 定义房间ID
    _room_id = "test"
    # 定义是否需要Token验证
    _needs_authentication = False

    # ws_id参数可不设置
    async def call(self, ws_id: str, *args, **kwargs):
        # 处理websocket消息
        print("TestWsHandler called")
        print(args, kwargs)


# 订阅WebSocket处理类
ws_handler = SubscribeMap().subscribe(Subscriber(TestWsHandler))


@app_router.get("/ws/test/", summary="Test WebSocket")
async def api() -> str:
    # 发送消息到指定房间
    await ws_handler.send_message({"message": "Hello, World!"})
    return "OK"

鉴权示例代码:

python
from fastapi import Depends

from apps.websocket.handler import Handler
from apps.websocket.subscriber import SubscribeMap, Subscriber
from project.server import app_router
from common.user_valid import get_current_user, User


# 定义WebSocket处理类
class TestAuthWsHandler(Handler):
    # 定义房间ID
    _room_id = "test_auth"

    # ws_id,user参数可不设置
    async def call(self, ws_id: str, user: User, *args, **kwargs):
        # 处理websocket消息
        print("TestWsHandler called")
        print(args, kwargs)


# 订阅WebSocket处理类
auth_ws_handler = SubscribeMap().subscribe(Subscriber(TestAuthWsHandler))


@app_router.get("/ws/test_auth/", summary="Test Auth WebSocket")
async def api(_user: User = Depends(get_current_user)) -> str:
    # 发送消息到指定房间
    await auth_ws_handler.send_message({"message": "Hello, World!"})
    return "OK"

智加文档规范