| 1 |
"""
|
| 2 |
|
| 3 |
JsonSchemaInspector
|
| 4 |
|
| 5 |
Created by:
|
| 6 |
Emanuel Borges
|
| 7 |
|
| 8 |
11.20.2025
|
| 9 |
|
| 10 |
A library that I've been refining for inspecting JSON data and potentially pulling out the Schema.
|
| 11 |
|
| 12 |
"""
|
| 13 |
|
| 14 |
|
| 15 |
import json
|
| 16 |
import csv
|
| 17 |
from dataclasses import dataclass, field
|
| 18 |
from typing import Any, Dict, Optional, Set
|
| 19 |
|
| 20 |
|
| 21 |
def classify_type(value: Any) -> str:
|
| 22 |
"""Return a simple type label for a Python value."""
|
| 23 |
if value is None:
|
| 24 |
return "null"
|
| 25 |
if isinstance(value, bool):
|
| 26 |
return "boolean"
|
| 27 |
if isinstance(value, (int, float)):
|
| 28 |
return "number"
|
| 29 |
if isinstance(value, (list, tuple)):
|
| 30 |
return "array"
|
| 31 |
if isinstance(value, dict):
|
| 32 |
return "object"
|
| 33 |
return "string"
|
| 34 |
|
| 35 |
|
| 36 |
@dataclass
|
| 37 |
class SchemaNode:
|
| 38 |
"""Represents an inferred schema node for a JSON subtree."""
|
| 39 |
kinds: Set[str] = field(default_factory=set) # e.g. {"object"}, {"array"}, {"string", "null"}
|
| 40 |
children: Dict[str, "SchemaNode"] = field(default_factory=dict) # for object fields
|
| 41 |
item: Optional["SchemaNode"] = None # for array element schema
|
| 42 |
examples: Set[str] = field(default_factory=set) # example primitive values
|
| 43 |
node_count: int = 0 # how many times this node was visited
|
| 44 |
presence_count: int = 0 # for fields: how many times this field key was present
|
| 45 |
|
| 46 |
|
| 47 |
class JsonSchemaInspector:
|
| 48 |
"""
|
| 49 |
Build a loose "schema" from multiple JSON instances.
|
| 50 |
- Call add_instance(obj) with parsed JSON objects.
|
| 51 |
- Then call print_schema() to inspect the inferred structure.
|
| 52 |
"""
|
| 53 |
|
| 54 |
def __init__(self) -> None:
|
| 55 |
self.root = SchemaNode()
|
| 56 |
|
| 57 |
def add_instance(self, obj: Any) -> None:
|
| 58 |
"""Add one JSON instance (already parsed)."""
|
| 59 |
self._visit(obj, self.root, is_field_root=False)
|
| 60 |
|
| 61 |
def add_json_str(self, s: str) -> None:
|
| 62 |
"""Parse a JSON string and add it if valid."""
|
| 63 |
s = (s or "").strip()
|
| 64 |
if not s:
|
| 65 |
return
|
| 66 |
try:
|
| 67 |
obj = json.loads(s)
|
| 68 |
except json.JSONDecodeError:
|
| 69 |
# You can log or collect invalid strings if you like
|
| 70 |
return
|
| 71 |
self.add_instance(obj)
|
| 72 |
|
| 73 |
# ------------------------------------------------------------
|
| 74 |
# Internal traversal
|
| 75 |
# ------------------------------------------------------------
|
| 76 |
|
| 77 |
def _visit(self, value: Any, node: SchemaNode, is_field_root: bool) -> None:
|
| 78 |
"""
|
| 79 |
Visit a value and update the schema node.
|
| 80 |
is_field_root is True when node corresponds to a field under an object,
|
| 81 |
so presence_count is meaningful.
|
| 82 |
"""
|
| 83 |
node.node_count += 1
|
| 84 |
if is_field_root:
|
| 85 |
node.presence_count += 1
|
| 86 |
|
| 87 |
t = classify_type(value)
|
| 88 |
node.kinds.add(t)
|
| 89 |
|
| 90 |
if t == "object":
|
| 91 |
assert isinstance(value, dict)
|
| 92 |
for key, subval in value.items():
|
| 93 |
child = node.children.get(key)
|
| 94 |
if child is None:
|
| 95 |
child = SchemaNode()
|
| 96 |
node.children[key] = child
|
| 97 |
# child is a field, so is_field_root=True
|
| 98 |
self._visit(subval, child, is_field_root=True)
|
| 99 |
|
| 100 |
elif t == "array":
|
| 101 |
assert isinstance(value, (list, tuple))
|
| 102 |
if node.item is None:
|
| 103 |
node.item = SchemaNode()
|
| 104 |
for elem in value:
|
| 105 |
# array items are not "fields", so is_field_root=False
|
| 106 |
self._visit(elem, node.item, is_field_root=False)
|
| 107 |
|
| 108 |
else:
|
| 109 |
# Primitive: collect a few example values for inspection
|
| 110 |
sval = str(value)
|
| 111 |
if len(node.examples) < 5 and len(sval) <= 80:
|
| 112 |
node.examples.add(sval)
|
| 113 |
|
| 114 |
# ------------------------------------------------------------
|
| 115 |
# Pretty-print the inferred schema
|
| 116 |
# ------------------------------------------------------------
|
| 117 |
|
| 118 |
def print_schema(self) -> None:
|
| 119 |
"""
|
| 120 |
Pretty-print the schema to stdout.
|
| 121 |
"""
|
| 122 |
self._print_node(self.root, indent=0, name="(root)", parent_count=self.root.node_count)
|
| 123 |
|
| 124 |
def _print_node(
|
| 125 |
self,
|
| 126 |
node: SchemaNode,
|
| 127 |
indent: int,
|
| 128 |
name: str,
|
| 129 |
parent_count: int,
|
| 130 |
) -> None:
|
| 131 |
ind = " " * indent
|
| 132 |
type_str = "|".join(sorted(node.kinds)) if node.kinds else "unknown"
|
| 133 |
|
| 134 |
# For fields, presence_count vs parent_count
|
| 135 |
if parent_count > 0:
|
| 136 |
presence_ratio = node.presence_count / parent_count if node.presence_count else 0.0
|
| 137 |
else:
|
| 138 |
presence_ratio = 0.0
|
| 139 |
|
| 140 |
# Line for this node
|
| 141 |
if indent == 0:
|
| 142 |
# Root node: presence ratio isn't meaningful
|
| 143 |
print(f"{ind}{name}: type={type_str} (node_count={node.node_count})")
|
| 144 |
else:
|
| 145 |
optional_str = ""
|
| 146 |
if presence_ratio < 1.0:
|
| 147 |
optional_str = f" [present in {presence_ratio*100:.1f}% of parents]"
|
| 148 |
print(f"{ind}{name}: type={type_str}{optional_str}")
|
| 149 |
|
| 150 |
# Show examples for primitives
|
| 151 |
if node.examples:
|
| 152 |
ex = ", ".join(repr(e) for e in sorted(node.examples))
|
| 153 |
print(f"{ind} examples: {ex}")
|
| 154 |
|
| 155 |
# Recurse into objects
|
| 156 |
if "object" in node.kinds and node.children:
|
| 157 |
print(f"{ind} {{")
|
| 158 |
for key, child in sorted(node.children.items()):
|
| 159 |
# For child fields, parent_count is node.node_count
|
| 160 |
self._print_node(child, indent=indent + 2, name=key, parent_count=node.node_count)
|
| 161 |
print(f"{ind} }}")
|
| 162 |
|
| 163 |
# Recurse into arrays
|
| 164 |
if "array" in node.kinds and node.item is not None:
|
| 165 |
print(f"{ind} [items]:")
|
| 166 |
# For items, parent_count is node.node_count (how many arrays we saw)
|
| 167 |
self._print_node(node.item, indent=indent + 2, name="<item>", parent_count=node.node_count)
|