Source code for langgraph_compare.jsons_to_csv

import os
import json
import csv
from typing import Dict, List, Optional, Any, Union, Tuple
from dataclasses import dataclass
from glob import glob
from .experiment import ExperimentPaths


[docs] @dataclass class SupervisorConfig: """ Configuration for a supervisor node. :param name: Name of the supervisor. :type name: str :param supervisor_type: Type of the supervisor ('graph' or 'subgraph'). :type supervisor_type: str """ name: str supervisor_type: str
[docs] @dataclass class SubgraphConfig: """ Configuration for a subgraph and its components. :param name: Name of the subgraph. :type name: str :param nodes: List of nodes within the subgraph. :type nodes: List[str] :param supervisor: Configuration of the subgraph's supervisor. :type supervisor: SupervisorConfig """ name: str nodes: List[str] supervisor: SupervisorConfig # Subgraph's supervisor
[docs] @dataclass class GraphConfig: """ Overall graph configuration. :param supervisors: Optional list of graph-level supervisors. :type supervisors: Optional[List[SupervisorConfig]] :param subgraphs: Optional list of subgraphs. :type subgraphs: Optional[List[SubgraphConfig]] :param nodes: Optional list of nodes for non-subgraph mode. :type nodes: Optional[List[str]] """ # Optional graph-level supervisors supervisors: Optional[List[SupervisorConfig]] = None # Optional subgraphs subgraphs: Optional[List[SubgraphConfig]] = None # Optional nodes list for non-subgraph mode nodes: Optional[List[str]] = None
def _build_config_mappings(graph_config: GraphConfig) -> Dict[str, Any]: """ Build lookup structures from graph configuration. :param graph_config: The graph configuration object. :type graph_config: GraphConfig :return: A dictionary with mappings for supervisors, subgraphs, and nodes. :rtype: Dict """ # Initialize all mappings # Set of graph-level supervisors (filled in later) graph_supervisors = set() # Dictionary -> key: subgraph name, value: SubgraphConfig object subgraph_map = {} # Dictionary -> key: subgraph supervisor name, value: subgraph name # For quick lookup to which subgraph a supervisor belongs subgraph_supervisors = {} # Dictionary -> key: node name, value: subgraph name - for assigning nodes to subgraphs node_to_subgraph = {} # Set of valid nodes - helps to figure out if activities are valid valid_nodes = set() # Handle supervisors if present on a graph level in GraphConfig if graph_config.supervisors: # Fills out the set with all graph-level supervisors names graph_supervisors = {sup.name for sup in graph_config.supervisors} # Handle subgraphs if present in GraphConfig if graph_config.subgraphs: # Build mappings for subgraphs (name of subgraph -> SubgraphConfig object) subgraph_map = {sg.name: sg for sg in graph_config.subgraphs} # Build mappings for subgraph supervisors (name of supervisor -> name of subgraph) subgraph_supervisors = {sg.supervisor.name: sg.name for sg in graph_config.subgraphs} # Collect nodes from subgraphs for sg in graph_config.subgraphs: for node in sg.nodes: # Assign nodes to subgraphs node_to_subgraph[node] = sg.name # Add nodes to valid nodes set = ready for processing valid_nodes.add(node) # Handle non-subgraph case elif graph_config.nodes: # If there are no subgraphs, we check if we have nodes to process in non-subgraph mode # Add all nodes to valid nodes set valid_nodes.update(graph_config.nodes) else: raise ValueError("Either subgraphs or nodes must be provided in the configuration") return { 'graph_supervisors': graph_supervisors, 'subgraph_map': subgraph_map, 'subgraph_supervisors': subgraph_supervisors, 'node_to_subgraph': node_to_subgraph, 'valid_nodes': valid_nodes } def _is_graph_supervisor(activity: str, config: Dict[str, Any]) -> bool: """ Check if an activity is a graph-level supervisor. :param activity: Name of the activity. :type activity: str :param config: Precomputed configuration mappings. :type config: Dict :return: True if the activity is a graph-level supervisor. :rtype: bool """ return activity in config['graph_supervisors'] def _get_subgraph_context(json_entry: Dict, config: Dict[str, Any], has_subgraphs: bool) -> Optional[str]: """ Determine the current subgraph context from a JSON entry. :param json_entry: A single JSON entry to evaluate. :type json_entry: Dict :param config: Precomputed configuration mappings. :type config: Dict :param has_subgraphs: Indicates whether the graph config has subgraphs. :type has_subgraphs: bool :return: Name of the subgraph, if applicable. :rtype: Optional[str] """ # If there are no subgraphs, we can't determine the subgraph context if not has_subgraphs: return None # Fetch metadata field from JSON entry metadata = json_entry.get('metadata', {}) # langgraph_checkpoint_ns can contain information on what subgraph we're in like: # "langgraph_checkpoint_ns": "ResearchTeam:513d809b-96a1-a27b-6e39-d31ac60f8c30|Search:a5527da0-52df-17fe-4e51-e7073b748101" if 'langgraph_checkpoint_ns' in metadata: checkpoint_ns = metadata['langgraph_checkpoint_ns'] # Split the checkpoint_ns by '|' and take the first part parts = checkpoint_ns.split('|') if parts: # Split the first part by ':' and take the first part (e.g. "ResearchTeam") subgraph_name = parts[0].split(':')[0] # Check if the subgraph_name is in the subgraph_map if subgraph_name in config['subgraph_map']: return subgraph_name # If langgraph_checkpoint_ns is not present, we can try to get the langgraph_node like: # "langgraph_node": "ResearchTeam" node = metadata.get('langgraph_node') # If the node is in the node_to_subgraph mapping, we can return the subgraph name if node in config['node_to_subgraph']: return config['node_to_subgraph'][node] return None def _is_valid_activity(activity: str, config: Dict[str, Any]) -> bool: """ Check if an activity should be included in the CSV output. :param activity: Name of the activity :param config: Precomputed configuration mappings :return: True if the activity should be included """ return ( activity == '__start__' or _is_graph_supervisor(activity, config) or activity in config['subgraph_supervisors'] or activity in config['valid_nodes'] ) def _find_end_timestamp(json_data: List[Dict], start_index: int, case_id: Any, config: Dict[str, Any]) -> Optional[Any]: """ Look ahead in the JSON entries to find the next timestamp for the same case_id, but only from activities that will be saved to the CSV. :param json_data: List of all JSON entries :param start_index: Current index in the JSON data :param case_id: The thread_ID/case_id we are processing :param config: Precomputed configuration mappings :return: The next valid timestamp or None if not found """ # Look ahead for the next timestamp for j in range(start_index + 1, len(json_data)): # Get the next JSON entry next_json = json_data[j] # Check same thread_ID if next_json.get('thread_ID') != case_id: continue # Get writes from metadata writes = next_json.get('metadata', {}).get('writes', {}) if not writes: continue # Find the next activity (first key that isn't 'messages') next_activity = next((key for key in writes.keys() if key != 'messages'), None) if not next_activity: continue # Only use timestamp if the activity would be saved to CSV if _is_valid_activity(next_activity, config): return next_json['checkpoint'].get('ts') return None def _insert_subgraph_start_if_needed( json_data: List[Dict], current_index: int, case_id: Any, entries_to_write: List[Dict[str, Any]], config: Dict[str, Any] ) -> None: """ Look ahead for a subgraph supervisor activity when we're currently on a graph supervisor and insert a '__start__' entry for it, if found. :param json_data: List of all JSON entries. :type json_data: List[Dict] :param current_index: Current index in the JSON data. :type current_index: int :param case_id: The thread_ID/case_id we are processing. :type case_id: Any :param entries_to_write: The list of entries to eventually be written to CSV. :type entries_to_write: List[Dict[str, Any]] :param config: Precomputed configuration mappings. :type config: Dict """ # Look ahead for next subgraph supervisor activity for j in range(current_index + 1, len(json_data)): next_json = json_data[j] # If the thread_ID is different, skip if next_json.get('thread_ID') != case_id: continue # Looking for the next JSON entry with writes next_metadata = next_json.get('metadata', {}) next_writes = next_metadata.get('writes', {}) if not next_writes: continue # Find the next key that isn't 'messages' - this is the next activity next_activity = next((key for key in next_writes.keys() if key != 'messages'), None) # If the next activity is a subgraph supervisor, we need to write an __start__ entry if next_activity in config['subgraph_supervisors']: # Get information about the next subgraph supervisor (the subgraph name) next_context = config['subgraph_supervisors'][next_activity] # Write an __start__ entry for the next subgraph supervisor entries_to_write.append({ 'case_id': case_id, 'timestamp': next_json['checkpoint'].get('ts'), 'end_timestamp': next_json['checkpoint'].get('ts'), 'cost': 0, 'activity': '__start__', # org:resource is the next subgraph's name 'org:resource': next_context }) break def _handle_subgraph_mode( activity: str, visited_global_start: bool, i: int, json_data: List[Dict], case_id: Any, config: Dict[str, Any], context: Optional[str], entries_to_write: List[Dict[str, Any]] ) -> Tuple[bool, Optional[str], bool]: """ Handle the processing logic for subgraph mode. :param activity: The identified activity from a JSON entry. :type activity: str :param visited_global_start: Whether we've visited the global '__start__'. :type visited_global_start: bool :param i: The current index of the JSON entry in the json_data list. :type i: int :param json_data: The entire list of JSON entries. :type json_data: List[Dict] :param case_id: The thread_ID/case_id we are processing. :type case_id: Any :param config: Precomputed configuration mappings. :type config: Dict :param context: The subgraph context if available. :type context: Optional[str] :param entries_to_write: The list where processed entries are accumulated. :type entries_to_write: List[Dict] :return: A tuple of (should_write, org_resource, visited_global_start). :rtype: Tuple[bool, Optional[str], bool] """ should_write = False org_resource = None # Visiting global __start__ activity for the first time if activity == '__start__' and not visited_global_start: should_write = True org_resource = '__start__' visited_global_start = True # If activity is a graph supervisor elif _is_graph_supervisor(activity, config): should_write = True # org_resource is the name of the graph supervisor org_resource = activity # Look ahead for next subgraph supervisor activity _insert_subgraph_start_if_needed(json_data, i, case_id, entries_to_write, config) # If activity is a subgraph supervisor elif activity in config['subgraph_supervisors']: should_write = True # org_resource is name of the subgraph org_resource = config['subgraph_supervisors'][activity] # If activity is a node in a subgraph elif activity in config['valid_nodes'] and context: should_write = True # org_resource is the subgraph name org_resource = config['node_to_subgraph'][activity] return should_write, org_resource, visited_global_start def _handle_non_subgraph_mode( activity: str, visited_global_start: bool, has_supervisors: bool, config: Dict[str, Any] ) -> Tuple[bool, Optional[str], bool]: """ Handle the processing logic for non-subgraph mode. :param activity: The identified activity from a JSON entry. :type activity: str :param visited_global_start: Whether we've visited the global '__start__'. :type visited_global_start: bool :param has_supervisors: Indicates whether the graph config has supervisors. :type has_supervisors: bool :param config: Precomputed configuration mappings. :type config: Dict :return: A tuple of (should_write, org_resource, visited_global_start). :rtype: Tuple[bool, Optional[str], bool] """ should_write = False org_resource = None # Visiting global __start__ activity for the first time if activity == '__start__' and not visited_global_start: should_write = True org_resource = '__start__' visited_global_start = True # If activity is a graph supervisor elif has_supervisors and _is_graph_supervisor(activity, config): should_write = True # org_resource is the activity itself org_resource = activity # Only process explicitly listed nodes elif activity in config['valid_nodes']: should_write = True # org_resource is the activity itself org_resource = activity return should_write, org_resource, visited_global_start def _process_single_json(json_data: List[Dict], graph_config: GraphConfig, config: Dict) -> List[Dict[str, Any]]: """ Process a single JSON and return entries to write. :param json_data: List of JSON entries. :type json_data: List[Dict] :param graph_config: The graph configuration object. :type graph_config: GraphConfig :param config: Precomputed configuration mappings. :type config: Dict :return: List of processed entries to write. :rtype: List[Dict] """ # Check if the graph configuration has subgraphs has_subgraphs = bool(graph_config.subgraphs) # Check if the graph configuration has supervisors has_supervisors = bool(graph_config.supervisors) # Have we visited global __start__ activity? visited_global_start = False # First collect all valid entries organized by case_id entries_by_case = {} # First pass to collect valid activities for i, json_entry in enumerate(json_data): # Extract thread_ID from JSON entry case_id = json_entry.get('thread_ID') # Extract timestamp from checkpoint field in JSON entry timestamp = json_entry['checkpoint'].get('ts') # Extract metadata field from JSON entry metadata = json_entry.get('metadata', {}) # Extract writes field from metadata writes = metadata.get('writes', {}) if not writes: continue # Look for the first key in writes that isn't 'messages' # This is the activity name activity = next((key for key in writes.keys() if key != 'messages'), None) if not activity: continue # If we haven't seen this case_id before, create an empty list if case_id not in entries_by_case: entries_by_case[case_id] = [] # Get the subgraph context if applicable context = _get_subgraph_context(json_entry, config, has_subgraphs) if has_subgraphs else None # Determine if we should write this entry and get its org_resource if has_subgraphs: should_write, org_resource, visited_global_start = _handle_subgraph_mode( activity=activity, visited_global_start=visited_global_start, i=i, json_data=json_data, case_id=case_id, config=config, context=context, # We'll handle __start__ entries separately entries_to_write=[] ) else: should_write, org_resource, visited_global_start = _handle_non_subgraph_mode( activity=activity, visited_global_start=visited_global_start, has_supervisors=has_supervisors, config=config ) # If the activity should be written, compute end_timestamp if should_write: entries_by_case[case_id].append({ 'case_id': case_id, 'timestamp': timestamp, 'activity': activity, 'org:resource': org_resource, }) # Now process the collected entries, add subgraph __start__ entries where needed, and set end_timestamps final_entries = [] for case_id, entries in entries_by_case.items(): # Sort entries by timestamp entries.sort(key=lambda x: x['timestamp']) processed_entries = [] # Process entries and add __start__ entries where needed for i, entry in enumerate(entries): current_activity = entry['activity'] # If this is a subgraph supervisor and the previous activity was a global supervisor # (or we're just starting), add a __start__ entry if (current_activity in config['subgraph_supervisors'] and (i == 0 or _is_graph_supervisor(entries[i - 1]['activity'], config))): subgraph_name = config['subgraph_supervisors'][current_activity] processed_entries.append({ 'case_id': case_id, 'timestamp': entry['timestamp'], 'activity': '__start__', 'org:resource': subgraph_name, }) processed_entries.append(entry) # Set end_timestamps for i, entry in enumerate(processed_entries): if i < len(processed_entries) - 1: # If not the last entry, end_timestamp is the next entry's timestamp end_timestamp = processed_entries[i + 1]['timestamp'] else: # If last entry, end_timestamp is its own timestamp end_timestamp = entry['timestamp'] final_entries.append({ 'case_id': entry['case_id'], 'timestamp': entry['timestamp'], 'end_timestamp': end_timestamp, 'cost': 0, 'activity': entry['activity'], 'org:resource': entry['org:resource'] }) return final_entries def _validate_directory(directory_path: str) -> None: """ Validate that the specified directory exists. :param directory_path: Path to the directory :type directory_path: str :raises FileNotFoundError: If the directory does not exist """ if not os.path.exists(directory_path): raise FileNotFoundError(f"Directory does not exist: {directory_path}")
[docs] def export_jsons_to_csv(source: Union[ExperimentPaths, str], graph_config: GraphConfig, output_dir: Optional[str] = None) -> None: """ Process all JSON files and export them to csv_output.csv in the specified directory. Can use either an ExperimentPaths instance or explicit paths. :param source: Either an ExperimentPaths instance or a path to the JSON directory :type source: Union[ExperimentPaths, str] :param graph_config: The graph configuration object :type graph_config: GraphConfig :param output_dir: Directory where csv_output.csv will be saved (required if source is a str) :type output_dir: Optional[str] **Examples:** >>> # Using ExperimentPaths: >>> exp = create_experiment("my_experiment") >>> graph_config = GraphConfig(nodes=["chatbot_node"]) >>> export_jsons_to_csv(exp, graph_config) Processed: experiments/my_experiment/json/thread_1.json Processed: experiments/my_experiment/json/thread_2.json Processed: experiments/my_experiment/json/thread_3.json Successfully exported combined data to: experiments/my_experiment/csv/csv_output.csv >>> # Using direct paths: >>> export_jsons_to_csv("path/to/jsons", graph_config, "path/to/output_directory") Processed: path/to/jsons/thread_1.json Processed: path/to/jsons/thread_2.json Processed: path/to/jsons/thread_3.json Successfully exported combined data to: path/to/output_directory/csv_output.csv """ csv_fields = ['case_id', 'timestamp', 'end_timestamp', 'cost', 'activity', 'org:resource'] # Determine paths based on input type if isinstance(source, ExperimentPaths): json_dir = source.json_dir output_path = source.get_csv_path() else: if output_dir is None: raise ValueError("output_dir must be provided when using a JSON directory path directly") # Validate that both directories exist _validate_directory(source) # Validate JSON directory _validate_directory(output_dir) # Validate output directory json_dir = source output_path = os.path.join(output_dir, 'csv_output.csv') # Build configuration mappings config = _build_config_mappings(graph_config) # Get all JSON files from the experiment's json directory json_files = glob(os.path.join(json_dir, '*.json')) if not json_files: raise ValueError(f"No JSON files found in {json_dir}") # Placeholder for all entries all_entries = [] # Process each JSON file for json_file in json_files: try: with open(json_file, 'r') as f: jsons = json.load(f) # Process the current JSON file entries = _process_single_json(jsons, graph_config, config) # Append processed entries to all_entries all_entries.extend(entries) print(f"Processed: {json_file}") except Exception as e: print(f"Error processing {json_file}: {str(e)}") # Sort all entries by timestamp all_entries.sort(key=lambda x: x['timestamp']) # Write combined results to CSV with open(output_path, mode='w', newline='') as csv_file: writer = csv.DictWriter(csv_file, fieldnames=csv_fields) writer.writeheader() writer.writerows(all_entries) print(f"Successfully exported combined data to: {output_path}")