Skip to content

Commit c71d4b7

Browse files
authored
Merge pull request #5 from koxudaxi/refactor_parse_path_method
refactor parse paths method
2 parents 48263c1 + c083c1e commit c71d4b7

File tree

1 file changed

+71
-95
lines changed

1 file changed

+71
-95
lines changed

fastapi_code_generator/parser.py

Lines changed: 71 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
json_schema_data_formats,
1919
)
2020
from datamodel_code_generator.types import DataType
21-
from pydantic import BaseModel, validator
22-
from pydantic.fields import ModelField
21+
from pydantic import BaseModel, root_validator
2322

2423
MODEL_PATH = ".models"
2524

@@ -45,6 +44,14 @@ class Request(BaseModel):
4544

4645

4746
class UsefulStr(str):
47+
@classmethod
48+
def __get_validators__(cls) -> Any:
49+
yield cls.validate
50+
51+
@classmethod
52+
def validate(cls, v: Any) -> Any:
53+
return cls(v)
54+
4855
@property
4956
def snakecase(self) -> str:
5057
return stringcase.snakecase(self)
@@ -61,36 +68,29 @@ def camelcase(self) -> str:
6168
class Argument(BaseModel):
6269
name: UsefulStr
6370

64-
@validator('name')
65-
def validate_name(cls, value: Any) -> Any:
66-
if type(value) == str:
67-
return UsefulStr(value)
68-
return value
69-
7071
# def __str__(self) -> UsefulStr:
7172
# return self.name
7273

7374

7475
class Operation(CachedPropertyModel):
75-
type: Optional[UsefulStr]
76-
path: Optional[UsefulStr]
76+
type: UsefulStr
77+
path: UsefulStr
7778
operationId: Optional[UsefulStr]
78-
root_path: Optional[UsefulStr]
79-
parameters: Optional[Any]
79+
parameters: List[Dict[str, Any]] = []
8080
responses: Dict[UsefulStr, Any] = {}
8181
requestBody: Dict[str, Any] = {}
8282
imports: List[Import] = []
8383

84+
@cached_property
85+
def root_path(self) -> UsefulStr:
86+
return UsefulStr(self.path.split("/")[1:])
87+
8488
@cached_property
8589
def snake_case_path(self) -> str:
86-
return re.sub( # type: ignore
90+
return re.sub(
8791
r"{([^\}]+)}", lambda m: stringcase.snakecase(m.group()), self.path
8892
)
8993

90-
def set_path(self, path: Path) -> None:
91-
self.path = path.path
92-
self.root_path = UsefulStr(path.root_path)
93-
9494
@cached_property
9595
def request(self) -> Optional[str]:
9696
models: List[str] = []
@@ -152,7 +152,7 @@ def function_name(self) -> str:
152152
if self.operationId:
153153
name: str = self.operationId
154154
else:
155-
name = f"{self.type}{self.path.replace('/', '_')}" # type: ignore
155+
name = f"{self.type}{self.path.replace('/', '_')}"
156156
return stringcase.snakecase(name)
157157

158158
@property
@@ -288,7 +288,7 @@ def response(self) -> str:
288288

289289

290290
class Operations(BaseModel):
291-
parameters: Optional[Any] = None
291+
parameters: List[Dict[str, Any]] = []
292292
get: Optional[Operation] = None
293293
put: Optional[Operation] = None
294294
post: Optional[Operation] = None
@@ -297,39 +297,54 @@ class Operations(BaseModel):
297297
head: Optional[Operation] = None
298298
options: Optional[Operation] = None
299299
trace: Optional[Operation] = None
300+
path: UsefulStr
301+
302+
@root_validator(pre=True)
303+
def inject_path_and_type_to_operation(cls, values: Dict[str, Any]) -> Any:
304+
path: Any = values.get('path')
305+
return dict(
306+
**{
307+
o: dict(**v, path=path, type=o)
308+
for o in OPERATION_NAMES
309+
if (v := values.get(o))
310+
},
311+
path=path,
312+
)
300313

301-
@validator(*OPERATION_NAMES)
302-
def validate_operations(cls, value: Any, field: ModelField) -> Any:
303-
if isinstance(value, Operation):
304-
value.type = UsefulStr(field.name)
305-
return value
314+
@root_validator
315+
def inject_parameters_to_operation(cls, values: Dict[str, Any]) -> Any:
316+
if parameters := values.get('parameters'):
317+
for operation_name in OPERATION_NAMES:
318+
if operation := values.get(operation_name):
319+
operation.parameters.extend(parameters)
320+
return values
306321

307322

308-
class Path(BaseModel):
309-
path: Optional[UsefulStr]
323+
class Path(CachedPropertyModel):
324+
path: UsefulStr
310325
operations: Optional[Operations] = None
311-
children: List[Path] = []
312-
parent: Optional[Path] = None
313326

314-
@property
315-
def exists_operations(self) -> List[Operation]:
316-
return [
317-
operation
318-
for operation_name in OPERATION_NAMES
319-
if (operation := getattr(self.operations, operation_name))
320-
]
321-
322-
@property
323-
def root_path(self) -> str:
324-
paths = self.path.split("/") # type: ignore
325-
if len(paths) > 1:
326-
return paths[1]
327-
else:
328-
return ""
327+
@root_validator(pre=True)
328+
def validate_root(cls, values: Dict[str, Any]) -> Any:
329+
if path := values.get('path'):
330+
if isinstance(path, str):
331+
if operations := values.get('operations'):
332+
if isinstance(operations, dict):
333+
return {
334+
'path': path,
335+
'operations': dict(**operations, path=path),
336+
}
337+
return values
329338

330-
def init(self) -> None:
331-
if self.parent:
332-
self.parent.children.append(self)
339+
@cached_property
340+
def exists_operations(self) -> List[Operation]:
341+
if self.operations:
342+
return [
343+
operation
344+
for operation_name in OPERATION_NAMES
345+
if (operation := getattr(self.operations, operation_name))
346+
]
347+
return []
333348

334349

335350
Path.update_forward_refs()
@@ -363,52 +378,13 @@ def parse(self) -> ParsedObject:
363378
openapi = load_json_or_yaml(self.input_text)
364379
return self.parse_paths(openapi["paths"])
365380

366-
def parse_paths(self, path_tree: Dict[str, Any]) -> ParsedObject:
367-
paths: List[Path] = []
368-
for path_name, operations in path_tree.items():
369-
tree: List[str] = []
370-
last: Optional[Path] = None
371-
372-
for key in path_name.split("/"):
373-
parent: Optional[Path] = None
374-
parents = [p for p in paths if p.path == "/".join(tree)]
375-
if parents:
376-
parent = parents[0]
377-
378-
tree.append(key)
379-
380-
me = [p for p in paths if p.path == "/".join(tree)]
381-
382-
if me:
383-
continue
384-
385-
last = Path(path=UsefulStr("/".join(tree)), parent=parent)
386-
387-
paths.append(last)
388-
389-
if last:
390-
last.operations = Operations.parse_obj(operations)
391-
392-
for path in paths:
393-
path.init()
394-
395-
parsed_operations: List[Operation] = []
396-
for path in paths:
397-
for child in path.children:
398-
parsed_operations.extend(self.parse_operation(child))
399-
return ParsedObject(parsed_operations)
400-
401-
@classmethod
402-
def parse_operation(cls, path: Path) -> List[Operation]:
403-
operations: List[Operation] = []
404-
if path.operations:
405-
for operation in path.exists_operations:
406-
operation.set_path(path)
407-
if path.operations.parameters:
408-
if operation.parameters:
409-
operation.parameters.extend(path.operations.parameters)
410-
else:
411-
operation.parameters = path.operations.parameters
412-
413-
operations.append(operation)
414-
return operations
381+
def parse_paths(self, paths: Dict[str, Any]) -> ParsedObject:
382+
return ParsedObject(
383+
[
384+
operation
385+
for path_name, operations in paths.items()
386+
for operation in Path(
387+
path=UsefulStr(path_name), operations=operations
388+
).exists_operations
389+
]
390+
)

0 commit comments

Comments
 (0)