On this tutorial, we construct an ultra-advanced agentic AI workflow that behaves like a production-grade analysis and reasoning system slightly than a single immediate name. We ingest actual net sources asynchronously, cut up them into provenance-tracked chunks, and run hybrid retrieval utilizing each TF-IDF (sparse) and OpenAI embeddings (dense), then fuse outcomes for larger recall and stability. We orchestrate a number of brokers, planning, synthesis, and restore, whereas imposing strict guardrails so each main declare is grounded in retrieved proof, and we persist episodic reminiscence. Therefore, the system improves its technique over time. Try the FULL CODES right here.
import os, re, json, time, getpass, asyncio, sqlite3, hashlib
from typing import Listing, Dict, Tuple, Non-compulsory, Any
import numpy as np
import httpx
from bs4 import BeautifulSoup
from pydantic import BaseModel, Discipline
from sklearn.feature_extraction.textual content import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from openai import AsyncOpenAI
from brokers import Agent, Runner, SQLiteSession
if not os.environ.get(“OPENAI_API_KEY”):
os.environ[“OPENAI_API_KEY”] = getpass.getpass(“Enter your OpenAI API key: “)
if not os.environ.get(“OPENAI_API_KEY”):
elevate RuntimeError(“OPENAI_API_KEY not supplied.”)
print(“✅ OpenAI API key loaded securely.”)
oa = AsyncOpenAI(api_key=os.environ[“OPENAI_API_KEY”])
def sha1(s: str) -> str:
return hashlib.sha1(s.encode(“utf-8″, errors=”ignore”)).hexdigest()
def normalize_url(u: str) -> str:
u = (u or “”).strip()
return u.rstrip(“).,]”‘”)
def clean_html_to_text(html: str) -> str:
soup = BeautifulSoup(html, “lxml”)
for tag in soup([“script”, “style”, “noscript”]):
tag.decompose()
txt = soup.get_text(“n”)
txt = re.sub(r”n{3,}”, “nn”, txt).strip()
txt = re.sub(r”[ t]+”, ” “, txt)
return txt
def chunk_text(textual content: str, chunk_chars: int = 1600, overlap_chars: int = 320) -> Listing[str]:
if not textual content:
return []
textual content = re.sub(r”s+”, ” “, textual content).strip()
n = len(textual content)
step = max(1, chunk_chars – overlap_chars)
chunks = []
i = 0
whereas i < n:
chunks.append(textual content[i:i + chunk_chars])
i += step
return chunks
def canonical_chunk_id(s: str) -> str:
if s is None:
return “”
s = str(s).strip()
s = s.strip(“<>”‘()[]{}”)
s = s.rstrip(“.,;:”)
return s
def inject_exec_summary_citations(exec_summary: str, citations: Listing[str], allowed_chunk_ids: Listing[str]) -> str:
exec_summary = exec_summary or “”
cset = []
for c in citations:
c = canonical_chunk_id(c)
if c and c in allowed_chunk_ids and c not in cset:
cset.append(c)
if len(cset) >= 2:
break
if len(cset) < 2:
for c in allowed_chunk_ids:
if c not in cset:
cset.append(c)
if len(cset) >= 2:
break
if len(cset) >= 2:
wanted = [c for c in cset if c not in exec_summary]
if wanted:
exec_summary = exec_summary.strip()
if exec_summary and never exec_summary.endswith(“.”):
exec_summary += “.”
exec_summary += f” (cite: {cset[0]}) (cite: {cset[1]})”
return exec_summary
We arrange the setting, securely load the OpenAI API key, and initialize core utilities that the whole lot else relies on. We outline hashing, URL normalization, HTML cleansing, and chunking so all downstream steps function on clear, constant textual content. We additionally add deterministic helpers to normalize and inject citations, guaranteeing guardrails are at all times glad. Try the FULL CODES right here.
headers = {“Consumer-Agent”: “Mozilla/5.0 (AgenticAI/4.2)”}
urls = [normalize_url(u) for u in urls]
urls = [u for u in urls if u.startswith(“http”)]
urls = record(dict.fromkeys(urls))
out: Dict[str, str] = {}
async with httpx.AsyncClient(timeout=timeout_s, follow_redirects=True, headers=headers) as consumer:
async def _one(url: str):
strive:
r = await consumer.get(url)
r.raise_for_status()
out[url] = clean_html_to_text(r.textual content)[:per_url_char_limit]
besides Exception as e:
out[url] = f”__FETCH_ERROR__ {kind(e).__name__}: {e}”
await asyncio.collect(*[_one(u) for u in urls])
return out
def dedupe_texts(sources: Dict[str, str]) -> Dict[str, str]:
seen = set()
out = {}
for url, txt in sources.objects():
if not isinstance(txt, str) or txt.startswith(“__FETCH_ERROR__”):
proceed
h = sha1(txt[:25000])
if h in seen:
proceed
seen.add(h)
out[url] = txt
return out
class ChunkRecord(BaseModel):
chunk_id: str
url: str
chunk_index: int
textual content: str
class RetrievalHit(BaseModel):
chunk_id: str
url: str
chunk_index: int
score_sparse: float = 0.0
score_dense: float = 0.0
score_fused: float = 0.0
textual content: str
class EvidencePack(BaseModel):
question: str
hits: Listing[RetrievalHit]
We asynchronously fetch a number of net sources in parallel and aggressively deduplicate content material to keep away from redundant proof. We convert uncooked pages into structured textual content and outline the core information fashions that symbolize chunks and retrieval hits. We guarantee each piece of textual content is traceable again to a particular supply and chunk index. Try the FULL CODES right here.
def episode_db_init():
con = sqlite3.join(EPISODE_DB)
cur = con.cursor()
cur.execute(“””
CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
query TEXT NOT NULL,
urls_json TEXT NOT NULL,
retrieval_queries_json TEXT NOT NULL,
useful_sources_json TEXT NOT NULL
)
“””)
con.commit()
con.shut()
def episode_store(query: str, urls: Listing[str], retrieval_queries: Listing[str], useful_sources: Listing[str]):
con = sqlite3.join(EPISODE_DB)
cur = con.cursor()
cur.execute(
“INSERT INTO episodes(ts, query, urls_json, retrieval_queries_json, useful_sources_json) VALUES(?,?,?,?,?)”,
(int(time.time()), query, json.dumps(urls), json.dumps(retrieval_queries), json.dumps(useful_sources)),
)
con.commit()
con.shut()
def episode_recall(query: str, top_k: int = 2) -> Listing[Dict[str, Any]]:
con = sqlite3.join(EPISODE_DB)
cur = con.cursor()
cur.execute(“SELECT ts, query, urls_json, retrieval_queries_json, useful_sources_json FROM episodes ORDER BY ts DESC LIMIT 200″)
rows = cur.fetchall()
con.shut()
q_tokens = set(re.findall(r”[A-Za-z]{3,}”, (query or “”).decrease()))
scored = []
for ts, q2, u, rq, us in rows:
t2 = set(re.findall(r”[A-Za-z]{3,}”, (q2 or “”).decrease()))
if not t2:
proceed
rating = len(q_tokens & t2) / max(1, len(q_tokens))
if rating > 0:
scored.append((rating, {
“ts”: ts,
“query”: q2,
“urls”: json.masses(u),
“retrieval_queries”: json.masses(rq),
“useful_sources”: json.masses(us),
}))
scored.kind(key=lambda x: x[0], reverse=True)
return [x[1] for x in scored[:top_k]]
episode_db_init()
We introduce episodic reminiscence backed by SQLite so the system can recall what labored in earlier runs. We retailer questions, retrieval methods, and helpful sources to information future planning. We additionally implement light-weight similarity-based recall to bias the system towards traditionally efficient patterns. Try the FULL CODES right here.
def __init__(self):
self.information: Listing[ChunkRecord] = []
self.tfidf: Non-compulsory[TfidfVectorizer] = None
self.tfidf_mat = None
self.emb_mat: Non-compulsory[np.ndarray] = None
def build_sparse(self):
corpus = [r.text for r in self.records] if self.information else [“”]
self.tfidf = TfidfVectorizer(stop_words=”english”, ngram_range=(1, 2), max_features=80000)
self.tfidf_mat = self.tfidf.fit_transform(corpus)
def search_sparse(self, question: str, ok: int) -> Listing[Tuple[int, float]]:
if not self.information or self.tfidf is None or self.tfidf_mat is None:
return []
qv = self.tfidf.rework([query])
sims = cosine_similarity(qv, self.tfidf_mat).flatten()
prime = np.argsort(-sims)[:k]
return [(int(i), float(sims[i])) for i in prime]
def set_dense(self, mat: np.ndarray):
self.emb_mat = mat.astype(np.float32)
def search_dense(self, q_emb: np.ndarray, ok: int) -> Listing[Tuple[int, float]]:
if self.emb_mat is None or not self.information:
return []
M = self.emb_mat
q = q_emb.astype(np.float32).reshape(1, -1)
M_norm = M / (np.linalg.norm(M, axis=1, keepdims=True) + 1e-9)
q_norm = q / (np.linalg.norm(q) + 1e-9)
sims = (M_norm @ q_norm.T).flatten()
prime = np.argsort(-sims)[:k]
return [(int(i), float(sims[i])) for i in prime]
def rrf_fuse(rankings: Listing[List[int]], ok: int = 60) -> Dict[int, float]:
scores: Dict[int, float] = {}
for r in rankings:
for pos, idx in enumerate(r, begin=1):
scores[idx] = scores.get(idx, 0.0) + 1.0 / (ok + pos)
return scores
HYBRID = HybridIndex()
ALLOWED_URLS: Listing[str] = []
EMBED_MODEL = “text-embedding-3-small”
async def embed_batch(texts: Listing[str]) -> np.ndarray:
resp = await oa.embeddings.create(mannequin=EMBED_MODEL, enter=texts, encoding_format=”float”)
vecs = [np.array(item.embedding, dtype=np.float32) for item in resp.data]
return np.vstack(vecs) if vecs else np.zeros((0, 0), dtype=np.float32)
async def embed_texts(texts: Listing[str], batch_size: int = 96, max_concurrency: int = 3) -> np.ndarray:
sem = asyncio.Semaphore(max_concurrency)
mats: Listing[Tuple[int, np.ndarray]] = []
async def _one(begin: int, batch: Listing[str]):
async with sem:
m = await embed_batch(batch)
mats.append((begin, m))
duties = []
for begin in vary(0, len(texts), batch_size):
batch = [t[:7000] for t in texts[start:start + batch_size]]
duties.append(_one(begin, batch))
await asyncio.collect(*duties)
mats.kind(key=lambda x: x[0])
emb = np.vstack([m for _, m in mats]) if mats else np.zeros((len(texts), 0), dtype=np.float32)
if emb.form[0] != len(texts):
elevate RuntimeError(f”Embedding rows mismatch: bought {emb.form[0]} anticipated {len(texts)}”)
return emb
async def embed_query(question: str) -> np.ndarray:
m = await embed_batch([query[:7000]])
return m[0] if m.form[0] else np.zeros((0,), dtype=np.float32)
async def build_index(urls: Listing[str], max_chunks_per_url: int = 60):
world ALLOWED_URLS
fetched = await fetch_many(urls)
fetched = dedupe_texts(fetched)
information: Listing[ChunkRecord] = []
allowed: Listing[str] = []
for url, txt in fetched.objects():
if not isinstance(txt, str) or txt.startswith(“__FETCH_ERROR__”):
proceed
allowed.append(url)
chunks = chunk_text(txt)[:max_chunks_per_url]
for i, ch in enumerate(chunks):
cid = f”{sha1(url)}:{i}”
information.append(ChunkRecord(chunk_id=cid, url=url, chunk_index=i, textual content=ch))
if not information:
err_view = {normalize_url(u): fetched.get(normalize_url(u), “”) for u in urls}
elevate RuntimeError(“No sources fetched efficiently.n” + json.dumps(err_view, indent=2)[:4000])
ALLOWED_URLS = allowed
HYBRID.information = information
HYBRID.build_sparse()
texts = [r.text for r in HYBRID.records]
emb = await embed_texts(texts, batch_size=96, max_concurrency=3)
HYBRID.set_dense(emb)
We construct a hybrid retrieval index that mixes sparse TF-IDF search with dense OpenAI embeddings. We allow reciprocal rank fusion, in order that sparse and dense indicators complement one another slightly than compete. We assemble the index as soon as per run and reuse it throughout all retrieval queries for effectivity. Try the FULL CODES right here.
sparse_rank = [i for i,_ in sparse]
dense_rank = [i for i,_ in dense]
sparse_scores = {i:s for i,s in sparse}
dense_scores = {i:s for i,s in dense}
fused = rrf_fuse([sparse_rank, dense_rank], ok=60) if dense_rank else rrf_fuse([sparse_rank], ok=60)
prime = sorted(fused.keys(), key=lambda i: fused[i], reverse=True)[:k]
hits: Listing[RetrievalHit] = []
for idx in prime:
r = HYBRID.information[idx]
hits.append(RetrievalHit(
chunk_id=r.chunk_id, url=r.url, chunk_index=r.chunk_index,
score_sparse=float(sparse_scores.get(idx, 0.0)),
score_dense=float(dense_scores.get(idx, 0.0)),
score_fused=float(fused.get(idx, 0.0)),
textual content=r.textual content
))
return EvidencePack(question=question, hits=hits)
async def gather_evidence(queries: Listing[str], per_query_k: int = 10, sparse_k: int = 60, dense_k: int = 60):
proof: Listing[EvidencePack] = []
useful_sources_count: Dict[str, int] = {}
all_chunk_ids: Listing[str] = []
for q in queries:
sparse = HYBRID.search_sparse(q, ok=sparse_k)
q_emb = await embed_query(q)
dense = HYBRID.search_dense(q_emb, ok=dense_k)
pack = build_evidence_pack(q, sparse, dense, ok=per_query_k)
proof.append(pack)
for h in pack.hits[:6]:
useful_sources_count[h.url] = useful_sources_count.get(h.url, 0) + 1
for h in pack.hits:
all_chunk_ids.append(h.chunk_id)
useful_sources = sorted(useful_sources_count.keys(), key=lambda u: useful_sources_count[u], reverse=True)
all_chunk_ids = sorted(record(dict.fromkeys(all_chunk_ids)))
return proof, useful_sources[:8], all_chunk_ids
class Plan(BaseModel):
goal: str
subtasks: Listing[str]
retrieval_queries: Listing[str]
acceptance_checks: Listing[str]
class UltraAnswer(BaseModel):
title: str
executive_summary: str
structure: Listing[str]
retrieval_strategy: Listing[str]
agent_graph: Listing[str]
implementation_notes: Listing[str]
risks_and_limits: Listing[str]
citations: Listing[str]
sources: Listing[str]
def normalize_answer(ans: UltraAnswer, allowed_chunk_ids: Listing[str]) -> UltraAnswer:
information = ans.model_dump()
information[“citations”] = [canonical_chunk_id(x) for x in (data.get(“citations”) or [])]
information[“citations”] = [x for x in data[“citations”] if x in allowed_chunk_ids]
information[“executive_summary”] = inject_exec_summary_citations(information.get(“executive_summary”,””), information[“citations”], allowed_chunk_ids)
return UltraAnswer(**information)
def validate_ultra(ans: UltraAnswer, allowed_chunk_ids: Listing[str]) -> None:
extras = [u for u in ans.sources if u not in ALLOWED_URLS]
if extras:
elevate ValueError(f”Non-allowed sources in output: {extras}”)
cset = set(ans.citations or [])
lacking = [cid for cid in cset if cid not in set(allowed_chunk_ids)]
if lacking:
elevate ValueError(f”Citations reference unknown chunk_ids (not retrieved): {lacking}”)
if len(cset) < 6:
elevate ValueError(“Want not less than 6 distinct chunk_id citations in extremely mode.”)
es_text = ans.executive_summary or “”
es_count = sum(1 for cid in cset if cid in es_text)
if es_count < 2:
elevate ValueError(“Govt abstract should embrace not less than 2 chunk_id citations verbatim.”)
PLANNER = Agent(
title=”Planner”,
mannequin=”gpt-4o-mini”,
directions=(
“Return a technical Plan schema.n”
“Make 10-16 retrieval_queries.n”
“Acceptance should embrace: not less than 6 citations and exec_summary incorporates not less than 2 citations verbatim.”
),
output_type=Plan,
)
SYNTHESIZER = Agent(
title=”Synthesizer”,
mannequin=”gpt-4o-mini”,
directions=(
“Return UltraAnswer schema.n”
“Onerous constraints:n”
“- executive_summary MUST embrace not less than TWO citations verbatim as: (cite: ).n”
“- citations have to be chosen ONLY from ALLOWED_CHUNK_IDS record.n”
“- citations record should embrace not less than 6 distinctive chunk_ids.n”
“- sources have to be subset of allowed URLs.n”
),
output_type=UltraAnswer,
)
FIXER = Agent(
title=”Fixer”,
mannequin=”gpt-4o-mini”,
directions=(
“Restore to fulfill guardrails.n”
“Guarantee executive_summary contains not less than TWO citations verbatim.n”
“Select citations ONLY from ALLOWED_CHUNK_IDS record.n”
“Return UltraAnswer schema.”
),
output_type=UltraAnswer,
)
session = SQLiteSession(“ultra_agentic_user”, “ultra_agentic_session.db”)
We collect proof by working a number of focused queries, fusing sparse and dense outcomes, and assembling proof packs with scores and provenance. We outline strict schemas for plans and last solutions, then normalize and validate citations in opposition to retrieved chunk IDs. We implement arduous guardrails so each reply stays grounded and auditable. Try the FULL CODES right here.
await build_index(urls)
recall_hint = json.dumps(episode_recall(query, top_k=2), indent=2)[:2000]
plan_res = await Runner.run(
PLANNER,
f”Query:n{query}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nnRecall:n{recall_hint}n”,
session=session
)
plan: Plan = plan_res.final_output
queries = (plan.retrieval_queries or [])[:16]
evidence_packs, useful_sources, allowed_chunk_ids = await gather_evidence(queries)
evidence_json = json.dumps([p.model_dump() for p in evidence_packs], indent=2)[:16000]
allowed_chunk_ids_json = json.dumps(allowed_chunk_ids[:200], indent=2)
draft_res = await Runner.run(
SYNTHESIZER,
f”Query:n{query}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nn”
f”ALLOWED_CHUNK_IDS:n{allowed_chunk_ids_json}nn”
f”Proof packs:n{evidence_json}nn”
“Return UltraAnswer.”,
session=session
)
draft = normalize_answer(draft_res.final_output, allowed_chunk_ids)
last_err = None
for i in vary(max_repairs + 1):
strive:
validate_ultra(draft, allowed_chunk_ids)
episode_store(query, ALLOWED_URLS, plan.retrieval_queries, useful_sources)
return draft
besides Exception as e:
last_err = str(e)
if i >= max_repairs:
draft = normalize_answer(draft, allowed_chunk_ids)
validate_ultra(draft, allowed_chunk_ids)
return draft
fixer_res = await Runner.run(
FIXER,
f”Query:n{query}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nn”
f”ALLOWED_CHUNK_IDS:n{allowed_chunk_ids_json}nn”
f”Guardrail error:n{last_err}nn”
f”Draft:n{json.dumps(draft.model_dump(), indent=2)[:12000]}nn”
f”Proof packs:n{evidence_json}nn”
“Return corrected UltraAnswer that passes guardrails.”,
session=session
)
draft = normalize_answer(fixer_res.final_output, allowed_chunk_ids)
elevate RuntimeError(f”Surprising failure: {last_err}”)
query = (
“Design a production-lean however superior agentic AI workflow in Python with hybrid retrieval, ”
“provenance-first citations, critique-and-repair loops, and episodic reminiscence. ”
“Clarify why every layer issues, failure modes, and analysis.”
)
urls = [
“https://openai.github.io/openai-agents-python/”,
“https://openai.github.io/openai-agents-python/agents/”,
“https://openai.github.io/openai-agents-python/running_agents/”,
“https://github.com/openai/openai-agents-python”,
]
ans = await run_ultra_agentic(query, urls, max_repairs=2)
print(“nTITLE:n”, ans.title)
print(“nEXECUTIVE SUMMARY:n”, ans.executive_summary)
print(“nARCHITECTURE:”)
for x in ans.structure:
print(“-“, x)
print(“nRETRIEVAL STRATEGY:”)
for x in ans.retrieval_strategy:
print(“-“, x)
print(“nAGENT GRAPH:”)
for x in ans.agent_graph:
print(“-“, x)
print(“nIMPLEMENTATION NOTES:”)
for x in ans.implementation_notes:
print(“-“, x)
print(“nRISKS & LIMITS:”)
for x in ans.risks_and_limits:
print(“-“, x)
print(“nCITATIONS (chunk_ids):”)
for c in ans.citations:
print(“-“, c)
print(“nSOURCES:”)
for s in ans.sources:
print(“-“, s)
We orchestrate the total agentic loop by chaining planning, synthesis, validation, and restore in an async-safe pipeline. We routinely retry and repair outputs till they move all constraints with out human intervention. We end by working a full instance and printing a completely grounded, production-ready agentic response.
In conclusion, we developed a complete agentic pipeline strong to widespread failure modes: unstable embedding shapes, quotation drift, and lacking grounding in govt summaries. We validated outputs in opposition to allowlisted sources, retrieved chunk IDs, routinely normalized citations, and injected deterministic citations when wanted to ensure compliance with out sacrificing correctness. By combining hybrid retrieval, critique-and-repair loops, and episodic reminiscence, we created a reusable basis we are able to prolong with stronger evaluations (claim-to-evidence protection scoring, adversarial red-teaming, and regression assessments) to constantly harden the system because it scales to new domains and bigger corpora.
Try the FULL CODES right here. Additionally, be at liberty to comply with us on Twitter and don’t overlook to hitch our 100k+ ML SubReddit and Subscribe to our Publication. Wait! are you on telegram? now you possibly can be a part of us on telegram as nicely.


