| 1 |
nino.borges |
958 |
"""
|
| 2 |
|
|
|
| 3 |
|
|
JsonSchemaInspector
|
| 4 |
|
|
|
| 5 |
|
|
Created by:
|
| 6 |
|
|
Emanuel Borges
|
| 7 |
|
|
|
| 8 |
|
|
11.20.2025
|
| 9 |
|
|
|
| 10 |
|
|
A library that I've been refining for inspecting JSON data and potentially pulling out the Schema.
|
| 11 |
|
|
|
| 12 |
|
|
"""
|
| 13 |
|
|
|
| 14 |
|
|
|
| 15 |
|
|
import json
|
| 16 |
|
|
import csv
|
| 17 |
|
|
from dataclasses import dataclass, field
|
| 18 |
|
|
from typing import Any, Dict, Optional, Set
|
| 19 |
|
|
|
| 20 |
|
|
|
| 21 |
|
|
def classify_type(value: Any) -> str:
|
| 22 |
|
|
"""Return a simple type label for a Python value."""
|
| 23 |
|
|
if value is None:
|
| 24 |
|
|
return "null"
|
| 25 |
|
|
if isinstance(value, bool):
|
| 26 |
|
|
return "boolean"
|
| 27 |
|
|
if isinstance(value, (int, float)):
|
| 28 |
|
|
return "number"
|
| 29 |
|
|
if isinstance(value, (list, tuple)):
|
| 30 |
|
|
return "array"
|
| 31 |
|
|
if isinstance(value, dict):
|
| 32 |
|
|
return "object"
|
| 33 |
|
|
return "string"
|
| 34 |
|
|
|
| 35 |
|
|
|
| 36 |
|
|
@dataclass
|
| 37 |
|
|
class SchemaNode:
|
| 38 |
|
|
"""Represents an inferred schema node for a JSON subtree."""
|
| 39 |
|
|
kinds: Set[str] = field(default_factory=set) # e.g. {"object"}, {"array"}, {"string", "null"}
|
| 40 |
|
|
children: Dict[str, "SchemaNode"] = field(default_factory=dict) # for object fields
|
| 41 |
|
|
item: Optional["SchemaNode"] = None # for array element schema
|
| 42 |
|
|
examples: Set[str] = field(default_factory=set) # example primitive values
|
| 43 |
|
|
node_count: int = 0 # how many times this node was visited
|
| 44 |
|
|
presence_count: int = 0 # for fields: how many times this field key was present
|
| 45 |
|
|
|
| 46 |
|
|
|
| 47 |
|
|
class JsonSchemaInspector:
|
| 48 |
|
|
"""
|
| 49 |
|
|
Build a loose "schema" from multiple JSON instances.
|
| 50 |
|
|
- Call add_instance(obj) with parsed JSON objects.
|
| 51 |
|
|
- Then call print_schema() to inspect the inferred structure.
|
| 52 |
|
|
"""
|
| 53 |
|
|
|
| 54 |
|
|
def __init__(self) -> None:
|
| 55 |
|
|
self.root = SchemaNode()
|
| 56 |
|
|
|
| 57 |
|
|
def add_instance(self, obj: Any) -> None:
|
| 58 |
|
|
"""Add one JSON instance (already parsed)."""
|
| 59 |
|
|
self._visit(obj, self.root, is_field_root=False)
|
| 60 |
|
|
|
| 61 |
|
|
def add_json_str(self, s: str) -> None:
|
| 62 |
|
|
"""Parse a JSON string and add it if valid."""
|
| 63 |
|
|
s = (s or "").strip()
|
| 64 |
|
|
if not s:
|
| 65 |
|
|
return
|
| 66 |
|
|
try:
|
| 67 |
|
|
obj = json.loads(s)
|
| 68 |
|
|
except json.JSONDecodeError:
|
| 69 |
|
|
# You can log or collect invalid strings if you like
|
| 70 |
|
|
return
|
| 71 |
|
|
self.add_instance(obj)
|
| 72 |
|
|
|
| 73 |
|
|
# ------------------------------------------------------------
|
| 74 |
|
|
# Internal traversal
|
| 75 |
|
|
# ------------------------------------------------------------
|
| 76 |
|
|
|
| 77 |
|
|
def _visit(self, value: Any, node: SchemaNode, is_field_root: bool) -> None:
|
| 78 |
|
|
"""
|
| 79 |
|
|
Visit a value and update the schema node.
|
| 80 |
|
|
is_field_root is True when node corresponds to a field under an object,
|
| 81 |
|
|
so presence_count is meaningful.
|
| 82 |
|
|
"""
|
| 83 |
|
|
node.node_count += 1
|
| 84 |
|
|
if is_field_root:
|
| 85 |
|
|
node.presence_count += 1
|
| 86 |
|
|
|
| 87 |
|
|
t = classify_type(value)
|
| 88 |
|
|
node.kinds.add(t)
|
| 89 |
|
|
|
| 90 |
|
|
if t == "object":
|
| 91 |
|
|
assert isinstance(value, dict)
|
| 92 |
|
|
for key, subval in value.items():
|
| 93 |
|
|
child = node.children.get(key)
|
| 94 |
|
|
if child is None:
|
| 95 |
|
|
child = SchemaNode()
|
| 96 |
|
|
node.children[key] = child
|
| 97 |
|
|
# child is a field, so is_field_root=True
|
| 98 |
|
|
self._visit(subval, child, is_field_root=True)
|
| 99 |
|
|
|
| 100 |
|
|
elif t == "array":
|
| 101 |
|
|
assert isinstance(value, (list, tuple))
|
| 102 |
|
|
if node.item is None:
|
| 103 |
|
|
node.item = SchemaNode()
|
| 104 |
|
|
for elem in value:
|
| 105 |
|
|
# array items are not "fields", so is_field_root=False
|
| 106 |
|
|
self._visit(elem, node.item, is_field_root=False)
|
| 107 |
|
|
|
| 108 |
|
|
else:
|
| 109 |
|
|
# Primitive: collect a few example values for inspection
|
| 110 |
|
|
sval = str(value)
|
| 111 |
|
|
if len(node.examples) < 5 and len(sval) <= 80:
|
| 112 |
|
|
node.examples.add(sval)
|
| 113 |
|
|
|
| 114 |
|
|
# ------------------------------------------------------------
|
| 115 |
|
|
# Pretty-print the inferred schema
|
| 116 |
|
|
# ------------------------------------------------------------
|
| 117 |
|
|
|
| 118 |
|
|
def print_schema(self) -> None:
|
| 119 |
|
|
"""
|
| 120 |
|
|
Pretty-print the schema to stdout.
|
| 121 |
|
|
"""
|
| 122 |
|
|
self._print_node(self.root, indent=0, name="(root)", parent_count=self.root.node_count)
|
| 123 |
|
|
|
| 124 |
|
|
def _print_node(
|
| 125 |
|
|
self,
|
| 126 |
|
|
node: SchemaNode,
|
| 127 |
|
|
indent: int,
|
| 128 |
|
|
name: str,
|
| 129 |
|
|
parent_count: int,
|
| 130 |
|
|
) -> None:
|
| 131 |
|
|
ind = " " * indent
|
| 132 |
|
|
type_str = "|".join(sorted(node.kinds)) if node.kinds else "unknown"
|
| 133 |
|
|
|
| 134 |
|
|
# For fields, presence_count vs parent_count
|
| 135 |
|
|
if parent_count > 0:
|
| 136 |
|
|
presence_ratio = node.presence_count / parent_count if node.presence_count else 0.0
|
| 137 |
|
|
else:
|
| 138 |
|
|
presence_ratio = 0.0
|
| 139 |
|
|
|
| 140 |
|
|
# Line for this node
|
| 141 |
|
|
if indent == 0:
|
| 142 |
|
|
# Root node: presence ratio isn't meaningful
|
| 143 |
|
|
print(f"{ind}{name}: type={type_str} (node_count={node.node_count})")
|
| 144 |
|
|
else:
|
| 145 |
|
|
optional_str = ""
|
| 146 |
|
|
if presence_ratio < 1.0:
|
| 147 |
|
|
optional_str = f" [present in {presence_ratio*100:.1f}% of parents]"
|
| 148 |
|
|
print(f"{ind}{name}: type={type_str}{optional_str}")
|
| 149 |
|
|
|
| 150 |
|
|
# Show examples for primitives
|
| 151 |
|
|
if node.examples:
|
| 152 |
|
|
ex = ", ".join(repr(e) for e in sorted(node.examples))
|
| 153 |
|
|
print(f"{ind} examples: {ex}")
|
| 154 |
|
|
|
| 155 |
|
|
# Recurse into objects
|
| 156 |
|
|
if "object" in node.kinds and node.children:
|
| 157 |
|
|
print(f"{ind} {{")
|
| 158 |
|
|
for key, child in sorted(node.children.items()):
|
| 159 |
|
|
# For child fields, parent_count is node.node_count
|
| 160 |
|
|
self._print_node(child, indent=indent + 2, name=key, parent_count=node.node_count)
|
| 161 |
|
|
print(f"{ind} }}")
|
| 162 |
|
|
|
| 163 |
|
|
# Recurse into arrays
|
| 164 |
|
|
if "array" in node.kinds and node.item is not None:
|
| 165 |
|
|
print(f"{ind} [items]:")
|
| 166 |
|
|
# For items, parent_count is node.node_count (how many arrays we saw)
|
| 167 |
|
|
self._print_node(node.item, indent=indent + 2, name="<item>", parent_count=node.node_count)
|