Skip to content

Commit ac7fbae

Browse files
author
Swaroop Manchala
committed
feat: Added NL parser with typo tolerance, confidence scoring, clarifications, and tests
1 parent 79bbfbd commit ac7fbae

File tree

2 files changed

+259
-0
lines changed

2 files changed

+259
-0
lines changed

nl_parser.py

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
import difflib
2+
import re
3+
from difflib import SequenceMatcher
4+
from typing import Dict, Any, List, Tuple
5+
6+
# Vocabulary for typo correction
7+
VOCAB = {
8+
"python", "pip", "venv", "virtualenv", "conda", "anaconda",
9+
"docker", "kubernetes", "k8s", "kubectl",
10+
"nginx", "apache", "httpd", "web", "server",
11+
"flask", "django", "tensorflow", "pytorch", "torch",
12+
"install", "setup", "development", "env", "environment",
13+
}
14+
15+
# Canonical examples for lightweight semantic matching
16+
INTENT_EXAMPLES = {
17+
"install_ml": [
18+
"install something for machine learning",
19+
"install pytorch",
20+
"install tensorflow",
21+
"i want to run pytorch",
22+
],
23+
"install_web_server": [
24+
"i need a web server",
25+
"install nginx",
26+
"install apache",
27+
"set up a web server",
28+
],
29+
"setup_python_env": [
30+
"set up python development environment",
31+
"install python 3.10",
32+
"create python venv",
33+
"setup dev env",
34+
],
35+
"install_docker": [
36+
"install docker",
37+
"add docker",
38+
"deploy containers - docker",
39+
],
40+
"install_docker_k8s": [
41+
"install docker and kubernetes",
42+
"docker and k8s",
43+
"k8s and docker on my mac",
44+
],
45+
}
46+
47+
48+
def normalize(text: str) -> str:
49+
text = text.lower()
50+
text = text.replace("-", " ")
51+
text = re.sub(r"[^a-z0-9.\s]", " ", text)
52+
text = re.sub(r"\s+", " ", text).strip()
53+
return text
54+
55+
56+
def tokenize(text: str) -> List[str]:
57+
return text.split()
58+
59+
60+
def spell_correct_token(token: str) -> Tuple[str, bool]:
61+
"""Return corrected_token, was_corrected"""
62+
if token in VOCAB:
63+
return token, False
64+
close = difflib.get_close_matches(token, VOCAB, n=1, cutoff=0.75)
65+
if close:
66+
return close[0], True
67+
return token, False
68+
69+
70+
def apply_spell_correction(tokens: List[str]) -> Tuple[List[str], List[Tuple[str, str]]]:
71+
corrections = []
72+
new_tokens = []
73+
for t in tokens:
74+
new, fixed = spell_correct_token(t)
75+
if fixed:
76+
corrections.append((t, new))
77+
new_tokens.append(new)
78+
return new_tokens, corrections
79+
80+
81+
def fuzzy_phrase_score(a: str, b: str) -> float:
82+
return SequenceMatcher(None, a, b).ratio()
83+
84+
85+
def semantic_intent_score(text: str) -> Tuple[str, float]:
86+
"""Compare text with intent examples."""
87+
best_intent = "unknown"
88+
best_score = 0.0
89+
90+
for intent, examples in INTENT_EXAMPLES.items():
91+
for ex in examples:
92+
score = fuzzy_phrase_score(text, ex)
93+
if score > best_score:
94+
best_score = score
95+
best_intent = intent
96+
97+
return best_intent, best_score
98+
99+
100+
def rule_intent(text: str) -> Tuple[str, float]:
101+
"""Simple keyword/rule-based detection."""
102+
t = text
103+
104+
if "docker" in t:
105+
if "kubernetes" in t or "k8s" in t or "kubectl" in t:
106+
return "install_docker_k8s", 0.95
107+
return "install_docker", 0.9
108+
109+
if "kubernetes" in t or "k8s" in t or "kubectl" in t:
110+
return "install_docker_k8s", 0.9
111+
112+
if "nginx" in t or "apache" in t or "httpd" in t or "web server" in t:
113+
return "install_web_server", 0.9
114+
115+
if "python" in t or "venv" in t or "conda" in t or "anaconda" in t:
116+
return "setup_python_env", 0.9
117+
118+
if any(word in t for word in ("tensorflow", "pytorch", "torch", "machine learning", "ml")):
119+
return "install_ml", 0.9
120+
121+
return "unknown", 0.0
122+
123+
124+
VERSION_RE = re.compile(r"python\s*([0-9]+(?:\.[0-9]+)?)")
125+
PLATFORM_RE = re.compile(r"\b(mac|macos|windows|linux|ubuntu|debian)\b")
126+
PACKAGE_RE = re.compile(r"\b(nginx|apache|docker|kubernetes|k8s|kubectl|python|pip|venv|conda|tensorflow|pytorch)\b")
127+
128+
129+
def extract_slots(text: str) -> Dict[str, Any]:
130+
slots = {}
131+
132+
v = VERSION_RE.search(text)
133+
if v:
134+
slots["python_version"] = v.group(1)
135+
136+
p = PLATFORM_RE.search(text)
137+
if p:
138+
slots["platform"] = p.group(1)
139+
140+
pkgs = PACKAGE_RE.findall(text)
141+
if pkgs:
142+
slots["packages"] = list(dict.fromkeys(pkgs)) # unique preserve order
143+
144+
return slots
145+
146+
147+
def aggregate_confidence(c_rule, c_sem, num_corrections, c_classifier=0.0):
148+
penalty = 1 - (num_corrections * 0.1)
149+
penalty = max(0.0, penalty)
150+
151+
final = (
152+
0.4 * c_rule +
153+
0.4 * c_sem +
154+
0.2 * c_classifier
155+
) * penalty
156+
157+
return round(max(0.0, min(1.0, final)), 2)
158+
159+
160+
def decide_clarifications(intent, confidence):
161+
if intent == "unknown" or confidence < 0.6:
162+
return [
163+
"Install Docker and Kubernetes",
164+
"Set up Python development environment",
165+
"Install a web server (nginx/apache)",
166+
"Install ML libraries (tensorflow/pytorch)",
167+
]
168+
if intent == "setup_python_env" and confidence < 0.75:
169+
return ["Use venv", "Use conda", "Install a specific Python version"]
170+
return []
171+
172+
173+
def parse_request(text: str) -> Dict[str, Any]:
174+
"""Main function used by tests and demo."""
175+
norm = normalize(text)
176+
tokens = tokenize(norm)
177+
178+
tokens_corr, corrections = apply_spell_correction(tokens)
179+
corrected_text = " ".join(tokens_corr)
180+
181+
rule_int, c_rule = rule_intent(corrected_text)
182+
sem_int, c_sem = semantic_intent_score(corrected_text)
183+
184+
if rule_int != "unknown" and rule_int == sem_int:
185+
chosen_intent = rule_int
186+
c_classifier = 0.95
187+
elif rule_int != "unknown":
188+
chosen_intent = rule_int
189+
c_classifier = 0.0
190+
elif c_sem > 0.6:
191+
chosen_intent = sem_int
192+
c_classifier = 0.0
193+
else:
194+
chosen_intent = "unknown"
195+
c_classifier = 0.0
196+
197+
slots = extract_slots(corrected_text)
198+
199+
confidence = aggregate_confidence(
200+
c_rule, c_sem, len(corrections), c_classifier
201+
)
202+
203+
clarifications = decide_clarifications(chosen_intent, confidence)
204+
205+
explanation = []
206+
if corrections:
207+
explanation.append(
208+
"corrected: " + ", ".join(f"{a}->{b}" for a, b in corrections)
209+
)
210+
explanation.append(f"rule_intent={rule_int} ({c_rule:.2f})")
211+
explanation.append(f"semantic_match={sem_int} ({c_sem:.2f})")
212+
213+
return {
214+
"intent": chosen_intent,
215+
"confidence": confidence,
216+
"explanation": "; ".join(explanation),
217+
"slots": slots,
218+
"corrections": corrections,
219+
"clarifications": clarifications,
220+
}
221+
222+

tests/test_nl_parser.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import pytest
2+
from nl_parser import parse_request
3+
4+
@pytest.mark.parametrize("text,expected", [
5+
("install something for machine learning", "install_ml"),
6+
("I need a web server", "install_web_server"),
7+
("set up python development environment", "setup_python_env"),
8+
("install docker and kubernets", "install_docker_k8s"),
9+
("Can you provision a python env with pip, venv and flake8?", "setup_python_env"),
10+
("need nginx or apache for a website", "install_web_server"),
11+
("deploy containers - docker", "install_docker"),
12+
("k8s and docker on my mac", "install_docker_k8s"),
13+
("i want to run pytorch", "install_ml"),
14+
("setup dev env", "ambiguous"),
15+
("add docker", "install_docker"),
16+
("pls install pyhton 3.10", "setup_python_env"),
17+
])
18+
def test_intent(text, expected):
19+
result = parse_request(text)
20+
intent = result["intent"]
21+
confidence = result["confidence"]
22+
23+
if expected == "ambiguous":
24+
assert result["clarifications"], f"Expected clarifications for: {text}"
25+
else:
26+
assert intent == expected
27+
assert confidence >= 0.5
28+
29+
def test_corrections():
30+
r = parse_request("install docker and kubernets")
31+
assert r["intent"] == "install_docker_k8s"
32+
assert any(orig == "kubernets" for orig, _ in r["corrections"])
33+
34+
def test_slot_extraction():
35+
r = parse_request("pls install python 3.10 on mac")
36+
assert r["slots"].get("python_version") == "3.10"
37+
assert r["slots"].get("platform") in ("mac", "macos")

0 commit comments

Comments
 (0)