Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ oracle = [
"sqlglot>=27.20.0",
]

nosql = [
"pandas>=2.0.0",
"pyarrow>=14.0.0",
"pymongo>=4.0.0",
]

streamlit = [
"streamlit==1.50.0",
"pyngrok==7.4.0",
Expand All @@ -99,6 +105,7 @@ streamlit = [
"Bug Tracker" = "https://github.com/Intugle/data-tools/issues"

[project.scripts]
intugle = "intugle.cli:main"
intugle-mcp = "intugle.mcp.server:main"
intugle-streamlit = "intugle.cli:run_streamlit_app"

Expand All @@ -118,6 +125,8 @@ dev = [
"ipykernel>=6.30.1",
"langfuse==2.60.4",
"mssql-python>=0.13.1",
"pandas>=2.0.0",
"pyarrow>=14.0.0",
"pysonar>=1.2.0.2419",
"pyspark>=3.5.0,<4.0.0",
"pytest>=8.4.1",
Expand Down
13 changes: 13 additions & 0 deletions src/intugle/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
from intugle.analysis.models import DataSet as DataSet
from intugle.data_product import DataProduct as DataProduct
from intugle.semantic_model import SemanticModel as SemanticModel

__all__ = ["DataSet", "DataProduct", "SemanticModel"]

# Expose NoSQL components if dependencies are available
try:
from intugle.nosql.api import NoSQLToRelationalParser # noqa: F401
from intugle.nosql.source import MongoSource # noqa: F401
from intugle.nosql.writer import ParquetTarget # noqa: F401

__all__.extend(["NoSQLToRelationalParser", "MongoSource", "ParquetTarget"])
except ImportError:
# Dependencies (pandas/pyarrow/pymongo) might not be installed
pass
104 changes: 100 additions & 4 deletions src/intugle/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import argparse
import importlib.util
import logging
import os
import subprocess
import sys

# Setup basic logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def run_streamlit_app():
Expand Down Expand Up @@ -28,9 +37,9 @@ def run_streamlit_app():
return

# Get the absolute path to the main.py of the Streamlit app
app_dir = os.path.join(os.path.dirname(__file__), 'streamlit_app')
app_path = os.path.join(app_dir, 'main.py')
app_dir = os.path.join(os.path.dirname(__file__), "streamlit_app")
app_path = os.path.join(app_dir, "main.py")

# Ensure the app_path exists
if not os.path.exists(app_path):
print(f"Error: Streamlit app not found at {app_path}")
Expand All @@ -41,5 +50,92 @@ def run_streamlit_app():
subprocess.run(["streamlit", "run", app_path], cwd=app_dir)


def run_nosql_to_relational(args):
"""Execute the NoSQL to Relational conversion command."""
# Import here to avoid issues when nosql dependencies aren't installed
try:
from intugle.nosql.api import NoSQLToRelationalParser
from intugle.nosql.source import MongoSource
from intugle.nosql.writer import ParquetTarget
except ImportError as e:
logger.error(
"NoSQL dependencies not installed. Install with: pip install 'intugle[nosql]'"
)
logger.error(f"Missing: {e}")
sys.exit(1)

try:
logger.info(f"Connecting to MongoDB: {args.db}.{args.collection}")
source = MongoSource(
uri=args.uri,
database=args.db,
collection=args.collection,
sample_size=args.sample,
)

logger.info("Initializing parser...")
# Initialize orchestrator
orchestrator = NoSQLToRelationalParser(source)

# Run parsing
logger.info("Starting extraction and parsing...")
orchestrator.run()

# Write output
logger.info(f"Writing results to: {args.output}")
target = ParquetTarget(args.output)
orchestrator.write(target)

logger.info("✅ Job completed successfully!")

except Exception as e:
logger.error(f"Job failed: {str(e)}")
sys.exit(1)


def main():
"""Main entry point for the intugle CLI."""
parser = argparse.ArgumentParser(
description="Intugle - GenAI-powered semantic layer toolkit"
)
subparsers = parser.add_subparsers(dest="command", help="Available commands")

# Define the 'streamlit' command
subparsers.add_parser("streamlit", help="Launch the Streamlit web application")

# Define the 'nosql-to-relational' command
nosql_parser = subparsers.add_parser(
"nosql-to-relational", help="Convert NoSQL collection to Parquet"
)

# Source Arguments
nosql_parser.add_argument("--uri", required=True, help="MongoDB connection URI")
nosql_parser.add_argument("--db", required=True, help="Database name")
nosql_parser.add_argument("--collection", required=True, help="Collection name")

# Output Arguments
nosql_parser.add_argument(
"--output", required=True, help="Output directory for Parquet files"
)

# Optional Arguments
nosql_parser.add_argument(
"--sample",
type=int,
default=0,
help="Number of documents to sample (0 = all)",
)

args = parser.parse_args()

if args.command == "streamlit":
run_streamlit_app()
elif args.command == "nosql-to-relational":
run_nosql_to_relational(args)
else:
parser.print_help()


if __name__ == "__main__":
run_streamlit_app()
main()

Empty file added src/intugle/nosql/__init__.py
Empty file.
64 changes: 64 additions & 0 deletions src/intugle/nosql/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from typing import Any, Dict, Optional

from intugle.nosql.inference import infer_schema
from intugle.nosql.parser import NoSQLParser
from intugle.nosql.source import NoSQLSource


class NoSQLToRelationalParser:
"""
High-level orchestrator for converting NoSQL data to relational formats.
"""

def __init__(self, source: NoSQLSource, config: Optional[Dict[str, Any]] = None):
"""
Args:
source: A configured NoSQLSource (e.g., MongoSource)
config: Configuration dict for table renaming, PK overrides, etc.
"""
self.source = source
self.config = config or {}
self._parsed_tables: Optional[Dict[str, Any]] = None

def infer_model(self) -> Dict[str, Any]:
"""
Peeks at the source data to infer the schema and relationships.

Returns:
Dict containing the inferred schema structure.
"""
# Fetch a sample from the source (iterator)
# We assume the source handles sampling limits internally if configured
sample_data = list(self.source.get_data())

# Use our existing inference logic
schema = infer_schema(sample_data)
return schema

def run(self) -> None:
"""
Executes the full parsing pipeline: fetch -> parse.
Stores results in memory (self._parsed_tables).
"""
# Note: For V1, we load source data into memory.
# Future optimization: Implement chunked streaming here.
all_data = list(self.source.get_data())

# Initialize the core parser with our config
# (Assuming NoSQLParser accepts config in __init__ from Phase 5)
parser = NoSQLParser(config=self.config)

self._parsed_tables = parser.parse(all_data)

def write(self, target: Any) -> None:
"""
Writes the parsed tables to the specified target.

Args:
target: An instance of a Target (e.g., ParquetTarget)
"""
if self._parsed_tables is None:
# Auto-run if not already run
self.run()

target.write(self._parsed_tables)
75 changes: 75 additions & 0 deletions src/intugle/nosql/inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from typing import Any, Dict, List, Set


def infer_schema(data: List[Dict[str, Any]], sample_size: int = 100) -> Dict[str, str]:
"""
Infers the schema of the NoSQL data by sampling records.
Handles type conflicts (e.g., int vs str -> str).
Identifies potential primary keys.
"""
if not data:
return {}

schema: Dict[str, Set[str]] = {}

# Analyze sample data
for row in data[:sample_size]:
for key, value in row.items():
if key not in schema:
schema[key] = set()

if value is None:
continue

# Determine type
if isinstance(value, bool):
type_name = "bool"
elif isinstance(value, int):
type_name = "int"
elif isinstance(value, float):
type_name = "float"
elif isinstance(value, str):
type_name = "string"
elif isinstance(value, dict):
type_name = "object"
elif isinstance(value, list):
type_name = "list"
else:
type_name = "unknown"

schema[key].add(type_name)

final_schema: Dict[str, str] = {}

# Resolve conflicts
for key, types in schema.items():
if not types:
final_schema[key] = "string" # Default for all-null
elif len(types) == 1:
final_schema[key] = list(types)[0]
else:
# Conflict resolution
if "string" in types:
final_schema[key] = "string"
elif "float" in types and "int" in types:
final_schema[key] = "float"
elif "list" in types:
# If list mixes with something else, it's tricky.
# For now, if list is involved, we might mark it as complex/string or keep as mixed.
# Let's fallback to string/object representation for now if mixed.
final_schema[key] = "string" # simplified fallback
else:
final_schema[key] = "string"

return final_schema


def identify_primary_key(schema: Dict[str, str]) -> str | None:
"""
Identifies a potential primary key from the schema keys.
"""
candidates = ["_id", "id", "uuid", "guid"]
for candidate in candidates:
if candidate in schema:
return candidate
return None
Loading