PyGraphistry Implementation Workflow for Interactive Graph Intelligence Pipelines in Security Analytics and Risk Investigation
We build a Colab-ready PyGraphistry workflow for interactive graph analytics on enterprise access data. We generate a synthetic dataset of users, devices, IPs, services, roles, and geos, then convert it into nodes and edges. We enrich the graph with risk scores, centrality metrics, community detection, Isolation Forest anomaly scores, and UMAP layout embeddings. We then bind the graph in PyGraphistry and produce local PyVis visualizations for full, ego, and high-risk views.
In this tutorial, we build an advanced, Colab-ready workflow around PyGraphistry for interactive graph analytics and visualization. We start by creating a realistic enterprise-style access dataset, transforming it into nodes and edges, and enriching the graph with risk scores, anomaly indicators, centrality metrics, community detection, and layout embeddings. We then use PyGraphistry to bind graph structure, visual encodings, labels, tooltips, and filtered subgraphs, and to generate local interactive visualizations when Graphistry credentials are not configured. Through this implementation, we see how graph intelligence helps us investigate suspicious users, risky devices, IP relationships, sensitive services, and high-risk behavioral patterns in a practical security analytics setting.
Star us on GitHub for future Code notebooks and implementation
Installing PyGraphistry and Dependencies
Copy CodeCopiedUse a different Browser
import os, sys, subprocess, warnings, textwrap, json, math, random warnings.filterwarnings("ignore") def pip_install(packages): subprocess.run([sys.executable, "-m", "pip", "install", "-q", "-U", *packages], check=True) pip_install([ "graphistry[networkx,umap-learn]", "pandas", "numpy", "networkx", "scikit-learn", "pyvis", "matplotlib", "pyarrow" ]) import numpy as np import pandas as pd import networkx as nx import matplotlib.pyplot as plt import graphistry from pathlib import Path from IPython.display import display, HTML, IFrame from sklearn.preprocessing import StandardScaler from sklearn.ensemble import IsolationForest from sklearn.decomposition import PCA from pyvis.network import Network OUT_DIR = Path("/content/pygraphistry_advanced_tutorial") OUT_DIR.mkdir(parents=True, exist_ok=True) SEED = 42 rng = np.random.default_rng(SEED) random.seed(SEED) print("=" * 100) print("PyGraphistry Advanced Colab Tutorial") print("=" * 100) print("This tutorial builds an enterprise-style access graph, computes graph analytics,") print("creates suspicious subgraphs, exports graph artifacts, and optionally uploads") print("interactive visualizations to Graphistry Hub if credentials are available.") print("=" * 100) def colab_secret(name, default=""): value = os.environ.get(name, default) try: from google.colab import userdata secret_value = userdata.get(name) if secret_value: value = secret_value except Exception: pass return value or default GRAPHISTRY_SERVER = colab_secret("GRAPHISTRY_SERVER", "hub.graphistry.com") GRAPHISTRY_PROTOCOL = colab_secret("GRAPHISTRY_PROTOCOL", "https") GRAPHISTRY_USERNAME = colab_secret("GRAPHISTRY_USERNAME", "") GRAPHISTRY_PASSWORD = colab_secret("GRAPHISTRY_PASSWORD", "") GRAPHISTRY_PERSONAL_KEY_ID = colab_secret("GRAPHISTRY_PERSONAL_KEY_ID", "") GRAPHISTRY_PERSONAL_KEY_SECRET = colab_secret("GRAPHISTRY_PERSONAL_KEY_SECRET", "") REGISTERED = False try: if GRAPHISTRY_PERSONAL_KEY_ID and GRAPHISTRY_PERSONAL_KEY_SECRET: graphistry.register( api=3, protocol=GRAPHISTRY_PROTOCOL, server=GRAPHISTRY_SERVER, personal_key_id=GRAPHISTRY_PERSONAL_KEY_ID, personal_key_secret=GRAPHISTRY_PERSONAL_KEY_SECRET ) REGISTERED = True print("Graphistry registered with personal key credentials.") elif GRAPHISTRY_USERNAME and GRAPHISTRY_PASSWORD: graphistry.register( api=3, protocol=GRAPHISTRY_PROTOCOL, server=GRAPHISTRY_SERVER, username=GRAPHISTRY_USERNAME, password=GRAPHISTRY_PASSWORD ) REGISTERED = True print("Graphistry registered with username/password credentials.") else: graphistry.register(api=3, protocol=GRAPHISTRY_PROTOCOL, server=GRAPHISTRY_SERVER) print("No Graphistry credentials found. Local analytics will run; Graphistry .plot() uploads will be skipped.") print("To enable live Graphistry plots, add Colab secrets:") print("GRAPHISTRY_PERSONAL_KEY_ID and GRAPHISTRY_PERSONAL_KEY_SECRET") print("or GRAPHISTRY_USERNAME and GRAPHISTRY_PASSWORD") except Exception as e: REGISTERED = False print("Graphistry registration was not completed:", repr(e)) print("Continuing with local analytics and local HTML visualization.") def nid(kind, value): return f"{kind}:{value}"
We set up the complete Colab environment by installing PyGraphistry and all supporting libraries for graph analytics, visualization, and machine learning. We configure the output directory, random seed, and Graphistry credentials so the notebook works both locally and with Graphistry Hub. We also define a reusable helper for node naming to keep every entity type clearly separated in the graph.
Generating Enterprise Access Dataset
Copy CodeCopiedUse a different Browser
n_users = 55 n_devices = 42 n_ips = 36 n_services = 15 n_roles = 7 n_geos = 10 n_events = 2200 users = [f"user_{i:03d}" for i in range(n_users)] devices = [f"device_{i:03d}" for i in range(n_devices)] ips = [f"10.{i // 255}.{i % 255}.{rng.integers(1, 255)}" for i in range(1, n_ips + 1)] services = [ "salesforce", "snowflake", "github", "jira", "slack", "vpn", "okta", "aws_console", "gcp_console", "databricks", "hris", "email", "crm", "vault", "payments_api" ] roles = ["employee", "analyst", "engineer", "manager", "admin", "contractor", "service_account"] geos = ["IN", "US", "GB", "DE", "SG", "AE", "BR", "NL", "AU", "JP"] privileged_users = set(rng.choice(users, size=7, replace=False)) compromised_users = set(rng.choice(list(set(users) - privileged_users), size=4, replace=False)) risky_devices = set(rng.choice(devices, size=5, replace=False)) risky_ips = set(rng.choice(ips, size=5, replace=False)) sensitive_services = {"aws_console", "gcp_console", "vault", "payments_api", "snowflake"} user_role = {} for u in users: if u in privileged_users: user_role[u] = rng.choice(["admin", "manager", "engineer"], p=[0.55, 0.2, 0.25]) elif rng.random() 21) service_sensitivity = 1.0 if service in sensitive_services else 0.25 privileged = int(role in ["admin", "manager", "service_account"]) compromised = int(user in compromised_users) risky_infra = int(device in risky_devices or ip in risky_ips) risk_score = ( 0.08 + 0.22 * compromised + 0.18 * risky_infra + 0.17 * impossible_travel + 0.13 * off_hours + 0.15 * service_sensitivity + 0.07 * privileged + rng.normal(0, 0.06) ) risk_score = float(np.clip(risk_score, 0.0, 1.0)) success_probability = 0.96 - 0.45 * risk_score is_success = bool(rng.random() {r['relation']} " f"{r['src']} → {r['dst']} " f"events: {int(r['event_count'])} " f"max risk: {r['max_risk']:.3f} " f"avg risk: {r['avg_risk']:.3f} " f"failures: {int(r['failed_count'])} " f"off-hours: {int(r['off_hours_count'])} " f"impossible-travel: {int(r['impossible_travel_count'])} " f"amount sum: {r['amount_sum']:.2f}" ), axis=1 ) edges_df["first_seen"] = edges_df["first_seen"].astype(str) edges_df["last_seen"] = edges_df["last_seen"].astype(str) all_node_ids = sorted(set(edges_df["src"]).union(set(edges_df["dst"]))) nodes_df = pd.DataFrame({"id": all_node_ids}) nodes_df["entity_type"] = nodes_df["id"].str.split(":", n=1).str[0] nodes_df["label"] = nodes_df["id"].str.split(":", n=1).str[1] touch_src = raw_edges_df[["src", "event_id", "risk_score", "amount", "is_success", "off_hours", "impossible_travel"]].rename(columns={"src": "id"}) touch_dst = raw_edges_df[["dst", "event_id", "risk_score", "amount", "is_success", "off_hours", "impossible_travel"]].rename(columns={"dst": "id"}) touches = pd.concat([touch_src, touch_dst], ignore_index=True) node_stats = ( touches .groupby("id", as_index=False) .agg( touched_events=("event_id", "nunique"), max_risk=("risk_score", "max"), avg_risk=("risk_score", "mean"), failed_touches=("is_success", lambda s: int((~s).sum())), off_hours_touches=("off_hours", "sum"), impossible_travel_touches=("impossible_travel", "sum"), amount_touched=("amount", "sum") ) ) nodes_df = nodes_df.merge(node_stats, on="id", how="left").fillna({ "touched_events": 0, "max_risk": 0.0, "avg_risk": 0.0, "failed_touches": 0, "off_hours_touches": 0, "impossible_travel_touches": 0, "amount_touched": 0.0 })
We transform raw event records into graph relationships by creating edges between users, devices, IPs, services, roles, and geographies. We aggregate repeated interactions into weighted edges with risk scores, counts, failures, timestamps, and activity summaries. We also create the node table and compute basic node-level statistics from all entity interactions.
Computing Graph Analytics Features
Copy CodeCopiedUse a different Browser
G = nx.DiGraph() for row in nodes_df.itertuples(index=False): G.add_node(row.id, entity_type=row.entity_type, label=row.label) for row in edges_df.itertuples(index=False): G.add_edge( row.src, row.dst, relation=row.relation, event_count=float(row.event_count), max_risk=float(row.max_risk), avg_risk=float(row.avg_risk), failed_count=float(row.failed_count), amount_sum=float(row.amount_sum) ) degree_w = dict(G.degree(weight="event_count")) in_degree_w = dict(G.in_degree(weight="event_count")) out_degree_w = dict(G.out_degree(weight="event_count")) try: pagerank = nx.pagerank(G, weight="event_count", max_iter=250) except Exception: pagerank = {n: 0.0 for n in G.nodes()} try: betweenness = nx.betweenness_centrality(G, k=min(90, max(2, G.number_of_nodes())), seed=SEED) except Exception: betweenness = {n: 0.0 for n in G.nodes()} UG = G.to_undirected() try: communities = list(nx.community.greedy_modularity_communities(UG, weight="event_count")) except Exception: communities = [set(c) for c in nx.connected_components(UG)] community_map = {} for cid, members in enumerate(communities): for n in members: community_map[n] = cid nodes_df["degree_w"] = nodes_df["id"].map(degree_w).fillna(0.0) nodes_df["in_degree_w"] = nodes_df["id"].map(in_degree_w).fillna(0.0) nodes_df["out_degree_w"] = nodes_df["id"].map(out_degree_w).fillna(0.0) nodes_df["pagerank"] = nodes_df["id"].map(pagerank).fillna(0.0) nodes_df["betweenness"] = nodes_df["id"].map(betweenness).fillna(0.0) nodes_df["community"] = nodes_df["id"].map(community_map).fillna(-1).astype(int) risk_bins = [-0.001, 0.35, 0.65, 0.85, 1.001] risk_labels = ["low", "medium", "high", "critical"] nodes_df["risk_band"] = pd.cut(nodes_df["max_risk"], bins=risk_bins, labels=risk_labels).astype(str) feature_cols = [ "touched_events", "max_risk", "avg_risk", "failed_touches", "off_hours_touches", "impossible_travel_touches", "amount_touched", "degree_w", "in_degree_w", "out_degree_w", "pagerank", "betweenness" ] X_num = nodes_df[feature_cols].replace([np.inf, -np.inf], 0).fillna(0.0) X_scaled = StandardScaler().fit_transform(X_num) iso = IsolationForest( n_estimators=250, contamination=0.10, random_state=SEED ) iso.fit(X_scaled) nodes_df["anomaly_score"] = -iso.score_samples(X_scaled) nodes_df["is_anomaly"] = iso.predict(X_scaled) == -1 type_color_map = { "user": "#1f77b4", "device": "#ff7f0e", "ip": "#2ca02c", "service": "#9467bd", "role": "#8c564b", "geo": "#17becf", "event": "#7f7f7f" } nodes_df["node_color"] = nodes_df["entity_type"].map(type_color_map).fillna("#999999") nodes_df.loc[nodes_df["risk_band"].eq("critical"), "node_color"] = "#d62728" nodes_df.loc[nodes_df["is_anomaly"], "node_color"] = "#000000" size_raw = ( 8 + 6 * np.log1p(nodes_df["degree_w"].astype(float)) + 10 * nodes_df["pagerank"].astype(float) / max(nodes_df["pagerank"].max(), 1e-9) + 8 * nodes_df["is_anomaly"].astype(int) ) nodes_df["node_size"] = np.clip(size_raw, 5, 60) model_features = pd.concat([ nodes_df[feature_cols + ["anomaly_score"]].replace([np.inf, -np.inf], 0).fillna(0.0), pd.get_dummies(nodes_df[["entity_type", "risk_band"]], dtype=float) ], axis=1) try: import umap reducer = umap.UMAP( n_components=2, n_neighbors=min(18, max(2, len(nodes_df) - 1)), min_dist=0.08, metric="euclidean", random_state=SEED ) emb = reducer.fit_transform(StandardScaler().fit_transform(model_features)) layout_name = "UMAP" except Exception: reducer = PCA(n_components=2, random_state=SEED) emb = reducer.fit_transform(StandardScaler().fit_transform(model_features)) layout_name = "PCA fallback" nodes_df["x"] = emb[:, 0].astype(float) nodes_df["y"] = emb[:, 1].astype(float
[truncated for AI cost control]