diff --git a/AbletonMCP_AI/__init__.py b/AbletonMCP_AI/__init__.py new file mode 100644 index 0000000..df989b7 --- /dev/null +++ b/AbletonMCP_AI/__init__.py @@ -0,0 +1,2143 @@ +# AbletonMCP/init.py +from __future__ import absolute_import, print_function, unicode_literals + +from _Framework.ControlSurface import ControlSurface +import socket +import json +import threading +import time +import traceback + +# Change queue import for Python 2 +try: + import Queue as queue # Python 2 +except ImportError: + import queue # Python 3 + +try: + string_types = basestring # Python 2 +except NameError: + string_types = str # Python 3 + +# Constants for socket communication +DEFAULT_PORT = 9877 +HOST = "localhost" + +def create_instance(c_instance): + """Create and return the AbletonMCP script instance""" + return AbletonMCP(c_instance) + +class AbletonMCP(ControlSurface): + """AbletonMCP Remote Script for Ableton Live""" + + def __init__(self, c_instance): + """Initialize the control surface""" + ControlSurface.__init__(self, c_instance) + self.log_message("AbletonMCP Remote Script initializing...") + + # Socket server for communication + self.server = None + self.client_threads = [] + self.server_thread = None + self.running = False + + # Cache the song reference for easier access + self._song = self.song() + + # Start the socket server + self.start_server() + + self.log_message("AbletonMCP initialized") + + # Show a message in Ableton + self.show_message("AbletonMCP: Listening for commands on port " + str(DEFAULT_PORT)) + + def disconnect(self): + """Called when Ableton closes or the control surface is removed""" + self.log_message("AbletonMCP disconnecting...") + self.running = False + + # Stop the server + if self.server: + try: + self.server.close() + except: + pass + + # Wait for the server thread to exit + if self.server_thread and self.server_thread.is_alive(): + self.server_thread.join(1.0) + + # Clean up any client threads + for client_thread in self.client_threads[:]: + if client_thread.is_alive(): + # We don't join them as they might be stuck + self.log_message("Client thread still alive during disconnect") + + ControlSurface.disconnect(self) + self.log_message("AbletonMCP disconnected") + + def start_server(self): + """Start the socket server in a separate thread""" + try: + self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server.bind((HOST, DEFAULT_PORT)) + self.server.listen(5) # Allow up to 5 pending connections + + self.running = True + self.server_thread = threading.Thread(target=self._server_thread) + self.server_thread.daemon = True + self.server_thread.start() + + self.log_message("Server started on port " + str(DEFAULT_PORT)) + except Exception as e: + self.log_message("Error starting server: " + str(e)) + self.show_message("AbletonMCP: Error starting server - " + str(e)) + + def _server_thread(self): + """Server thread implementation - handles client connections""" + try: + self.log_message("Server thread started") + # Set a timeout to allow regular checking of running flag + self.server.settimeout(1.0) + + while self.running: + try: + # Accept connections with timeout + client, address = self.server.accept() + self.log_message("Connection accepted from " + str(address)) + self.show_message("AbletonMCP: Client connected") + + # Handle client in a separate thread + client_thread = threading.Thread( + target=self._handle_client, + args=(client,) + ) + client_thread.daemon = True + client_thread.start() + + # Keep track of client threads + self.client_threads.append(client_thread) + + # Clean up finished client threads + self.client_threads = [t for t in self.client_threads if t.is_alive()] + + except socket.timeout: + # No connection yet, just continue + continue + except Exception as e: + if self.running: # Only log if still running + self.log_message("Server accept error: " + str(e)) + time.sleep(0.5) + + self.log_message("Server thread stopped") + except Exception as e: + self.log_message("Server thread error: " + str(e)) + + def _handle_client(self, client): + """Handle communication with a connected client""" + self.log_message("Client handler started") + client.settimeout(None) # No timeout for client socket + buffer = '' # Changed from b'' to '' for Python 2 + + try: + while self.running: + try: + # Receive data + data = client.recv(8192) + + if not data: + # Client disconnected + self.log_message("Client disconnected") + break + + # Accumulate data in buffer with explicit encoding/decoding + try: + # Python 3: data is bytes, decode to string + buffer += data.decode('utf-8') + except AttributeError: + # Python 2: data is already string + buffer += data + + try: + # Try to parse command from buffer + command = json.loads(buffer) # Removed decode('utf-8') + buffer = '' # Clear buffer after successful parse + + self.log_message("Received command: " + str(command.get("type", "unknown"))) + + # Process the command and get response + response = self._process_command(command) + + # Send the response with explicit encoding + try: + # Python 3: encode string to bytes + client.sendall(json.dumps(response).encode('utf-8')) + except AttributeError: + # Python 2: string is already bytes + client.sendall(json.dumps(response)) + except ValueError: + # Incomplete data, wait for more + continue + + except Exception as e: + self.log_message("Error handling client data: " + str(e)) + self.log_message(traceback.format_exc()) + + # Send error response if possible + error_response = { + "status": "error", + "message": str(e) + } + try: + # Python 3: encode string to bytes + client.sendall(json.dumps(error_response).encode('utf-8')) + except AttributeError: + # Python 2: string is already bytes + client.sendall(json.dumps(error_response)) + except: + # If we can't send the error, the connection is probably dead + break + + # For serious errors, break the loop + if not isinstance(e, ValueError): + break + except Exception as e: + self.log_message("Error in client handler: " + str(e)) + finally: + try: + client.close() + except: + pass + self.log_message("Client handler stopped") + + def _process_command(self, command): + """Process a command from the client and return a response""" + command_type = command.get("type", "") + params = command.get("params", {}) + + # Initialize response + response = { + "status": "success", + "result": {} + } + + try: + # Route the command to the appropriate handler + if command_type == "get_session_info": + response["result"] = self._get_session_info() + elif command_type == "get_track_info": + track_index = params.get("track_index", 0) + response["result"] = self._get_track_info(track_index) + # Commands that modify Live's state should be scheduled on the main thread + elif command_type in [ + "create_midi_track", "create_audio_track", "create_return_track", + "set_track_name", "set_track_mute", "set_track_solo", "set_track_arm", + "set_track_volume", "set_track_pan", "set_track_send", "set_track_color", + "set_track_monitoring", "set_master_volume", "set_master_pan", + "create_clip", "delete_clip", "add_notes_to_clip", "set_clip_name", + "set_clip_loop", "set_tempo", "set_signature", "set_current_song_time", + "set_loop", "set_loop_region", "set_metronome", "set_overdub", + "set_record_mode", "fire_clip", "stop_clip", "stop_all_clips", + "start_playback", "stop_playback", "fire_scene", "create_scene", + "set_scene_name", "delete_scene", "load_instrument_or_effect", + "load_browser_item", "load_browser_item_by_name", + "load_browser_item_at_path", "set_device_parameter", "set_device_on" + ]: + # Use a thread-safe approach with a response queue + response_queue = queue.Queue() + + # Define a function to execute on the main thread + def main_thread_task(): + try: + result = None + if command_type == "create_midi_track": + index = params.get("index", -1) + result = self._create_midi_track(index) + elif command_type == "create_audio_track": + index = params.get("index", -1) + result = self._create_audio_track(index) + elif command_type == "create_return_track": + result = self._create_return_track() + elif command_type == "set_track_name": + track_index = params.get("track_index", 0) + name = params.get("name", "") + result = self._set_track_name(track_index, name) + elif command_type == "set_track_mute": + track_index = params.get("track_index", 0) + mute = params.get("mute", False) + result = self._set_track_mute(track_index, mute) + elif command_type == "set_track_solo": + track_index = params.get("track_index", 0) + solo = params.get("solo", False) + result = self._set_track_solo(track_index, solo) + elif command_type == "set_track_arm": + track_index = params.get("track_index", 0) + arm = params.get("arm", False) + result = self._set_track_arm(track_index, arm) + elif command_type == "set_track_volume": + track_index = params.get("track_index", 0) + volume = params.get("volume", 0.85) + result = self._set_track_volume(track_index, volume) + elif command_type == "set_track_pan": + track_index = params.get("track_index", 0) + pan = params.get("pan", 0.0) + result = self._set_track_pan(track_index, pan) + elif command_type == "set_track_send": + track_index = params.get("track_index", 0) + send_index = params.get("send_index", 0) + value = params.get("value", 0.0) + result = self._set_track_send(track_index, send_index, value) + elif command_type == "set_track_color": + track_index = params.get("track_index", 0) + color = params.get("color", 0) + result = self._set_track_color(track_index, color) + elif command_type == "set_track_monitoring": + track_index = params.get("track_index", 0) + state = params.get("state", 0) + result = self._set_track_monitoring(track_index, state) + elif command_type == "set_master_volume": + volume = params.get("volume", 0.85) + result = self._set_master_volume(volume) + elif command_type == "set_master_pan": + pan = params.get("pan", 0.0) + result = self._set_master_pan(pan) + elif command_type == "create_clip": + track_index = params.get("track_index", 0) + clip_index = params.get("clip_index", 0) + length = params.get("length", 4.0) + result = self._create_clip(track_index, clip_index, length) + elif command_type == "create_arrangement_clip": + track_index = params.get("track_index", 0) + start_time = params.get("start_time", 0.0) + length = params.get("length", 4.0) + result = self._create_arrangement_clip(track_index, start_time, length) + elif command_type == "delete_clip": + track_index = params.get("track_index", 0) + clip_index = params.get("clip_index", 0) + result = self._delete_clip(track_index, clip_index) + elif command_type == "add_notes_to_clip": + track_index = params.get("track_index", 0) + clip_index = params.get("clip_index", 0) + notes = params.get("notes", []) + result = self._add_notes_to_clip(track_index, clip_index, notes) + elif command_type == "add_notes_to_arrangement_clip": + track_index = params.get("track_index", 0) + start_time = params.get("start_time", 0.0) + notes = params.get("notes", []) + result = self._add_notes_to_arrangement_clip(track_index, start_time, notes) + elif command_type == "duplicate_clip_to_arrangement": + track_index = params.get("track_index", 0) + clip_index = params.get("clip_index", 0) + start_time = params.get("start_time", 0.0) + result = self._duplicate_clip_to_arrangement(track_index, clip_index, start_time) + elif command_type == "set_clip_name": + track_index = params.get("track_index", 0) + clip_index = params.get("clip_index", 0) + name = params.get("name", "") + result = self._set_clip_name(track_index, clip_index, name) + elif command_type == "set_clip_loop": + track_index = params.get("track_index", 0) + clip_index = params.get("clip_index", 0) + loop_start = params.get("loop_start", None) + loop_end = params.get("loop_end", None) + loop_length = params.get("loop_length", None) + looping = params.get("looping", None) + result = self._set_clip_loop( + track_index, + clip_index, + loop_start, + loop_end, + loop_length, + looping + ) + elif command_type == "set_tempo": + tempo = params.get("tempo", 120.0) + result = self._set_tempo(tempo) + elif command_type == "set_signature": + numerator = params.get("numerator", 4) + denominator = params.get("denominator", 4) + result = self._set_signature(numerator, denominator) + elif command_type == "set_current_song_time": + time_value = params.get("time", 0.0) + result = self._set_current_song_time(time_value) + elif command_type == "set_loop": + enabled = params.get("enabled", False) + result = self._set_loop(enabled) + elif command_type == "set_loop_region": + start = params.get("start", 0.0) + length = params.get("length", 4.0) + result = self._set_loop_region(start, length) + elif command_type == "set_metronome": + enabled = params.get("enabled", False) + result = self._set_metronome(enabled) + elif command_type == "set_overdub": + enabled = params.get("enabled", False) + result = self._set_overdub(enabled) + elif command_type == "set_record_mode": + enabled = params.get("enabled", False) + result = self._set_record_mode(enabled) + elif command_type == "fire_clip": + track_index = params.get("track_index", 0) + clip_index = params.get("clip_index", 0) + result = self._fire_clip(track_index, clip_index) + elif command_type == "stop_clip": + track_index = params.get("track_index", 0) + clip_index = params.get("clip_index", 0) + result = self._stop_clip(track_index, clip_index) + elif command_type == "stop_all_clips": + result = self._stop_all_clips() + elif command_type == "start_playback": + result = self._start_playback() + elif command_type == "stop_playback": + result = self._stop_playback() + elif command_type == "fire_scene": + scene_index = params.get("scene_index", 0) + result = self._fire_scene(scene_index) + elif command_type == "create_scene": + index = params.get("index", -1) + result = self._create_scene(index) + elif command_type == "set_scene_name": + scene_index = params.get("scene_index", 0) + name = params.get("name", "") + result = self._set_scene_name(scene_index, name) + elif command_type == "delete_scene": + scene_index = params.get("scene_index", 0) + result = self._delete_scene(scene_index) + elif command_type == "load_instrument_or_effect": + track_index = params.get("track_index", 0) + uri = params.get("uri", "") + result = self._load_instrument_or_effect(track_index, uri) + elif command_type == "load_browser_item": + track_index = params.get("track_index", 0) + item_uri = params.get("item_uri", "") + result = self._load_browser_item(track_index, item_uri) + elif command_type == "load_browser_item_by_name": + track_index = params.get("track_index", 0) + query = params.get("query", "") + category_type = params.get("category_type", "all") + max_depth = params.get("max_depth", 5) + result = self._load_browser_item_by_name( + track_index, + query, + category_type, + max_depth + ) + elif command_type == "load_browser_item_at_path": + track_index = params.get("track_index", 0) + path = params.get("path", "") + item_name = params.get("item_name", None) + result = self._load_browser_item_at_path( + track_index, + path, + item_name + ) + elif command_type == "set_device_parameter": + track_index = params.get("track_index", 0) + device_index = params.get("device_index", 0) + parameter_index = params.get("parameter_index", None) + parameter_name = params.get("parameter_name", None) + value = params.get("value", 0.0) + result = self._set_device_parameter( + track_index, + device_index, + parameter_index, + parameter_name, + value + ) + elif command_type == "set_device_on": + track_index = params.get("track_index", 0) + device_index = params.get("device_index", 0) + enabled = params.get("enabled", True) + result = self._set_device_on(track_index, device_index, enabled) + + # Put the result in the queue + response_queue.put({"status": "success", "result": result}) + except Exception as e: + self.log_message("Error in main thread task: " + str(e)) + self.log_message(traceback.format_exc()) + response_queue.put({"status": "error", "message": str(e)}) + + # Schedule the task to run on the main thread + try: + self.schedule_message(0, main_thread_task) + except AssertionError: + # If we're already on the main thread, execute directly + main_thread_task() + + # Wait for the response with a timeout + try: + task_response = response_queue.get(timeout=10.0) + if task_response.get("status") == "error": + response["status"] = "error" + response["message"] = task_response.get("message", "Unknown error") + else: + response["result"] = task_response.get("result", {}) + except queue.Empty: + response["status"] = "error" + response["message"] = "Timeout waiting for operation to complete" + elif command_type == "get_tracks": + response["result"] = self._get_tracks() + elif command_type == "get_clip_info": + track_index = params.get("track_index", 0) + clip_index = params.get("clip_index", 0) + response["result"] = self._get_clip_info(track_index, clip_index) + elif command_type == "get_scenes": + response["result"] = self._get_scenes() + elif command_type == "get_track_devices": + track_index = params.get("track_index", 0) + response["result"] = self._get_track_devices(track_index) + elif command_type == "get_device_parameters": + track_index = params.get("track_index", 0) + device_index = params.get("device_index", 0) + response["result"] = self._get_device_parameters(track_index, device_index) + elif command_type == "search_browser_items": + query = params.get("query", "") + category_type = params.get("category_type", "all") + max_results = params.get("max_results", 25) + max_depth = params.get("max_depth", 5) + loadable_only = params.get("loadable_only", False) + response["result"] = self._search_browser_items( + query, + category_type, + max_results, + max_depth, + loadable_only + ) + elif command_type == "get_browser_item": + uri = params.get("uri", None) + path = params.get("path", None) + response["result"] = self._get_browser_item(uri, path) + elif command_type == "get_browser_categories": + category_type = params.get("category_type", "all") + response["result"] = self._get_browser_categories(category_type) + elif command_type == "get_browser_items": + path = params.get("path", "") + item_type = params.get("item_type", "all") + response["result"] = self._get_browser_items(path, item_type) + # Add the new browser commands + elif command_type == "get_browser_tree": + category_type = params.get("category_type", "all") + max_depth = params.get("max_depth", 2) + response["result"] = self.get_browser_tree(category_type, max_depth) + elif command_type == "get_browser_items_at_path": + path = params.get("path", "") + response["result"] = self.get_browser_items_at_path(path) + else: + response["status"] = "error" + response["message"] = "Unknown command: " + command_type + except Exception as e: + self.log_message("Error processing command: " + str(e)) + self.log_message(traceback.format_exc()) + response["status"] = "error" + response["message"] = str(e) + + return response + + # Command implementations + + def _get_session_info(self): + """Get information about the current session""" + try: + result = { + "tempo": self._song.tempo, + "signature_numerator": self._song.signature_numerator, + "signature_denominator": self._song.signature_denominator, + "is_playing": self._song.is_playing, + "current_song_time": self._song.current_song_time, + "loop": self._song.loop, + "loop_start": self._song.loop_start, + "loop_length": self._song.loop_length, + "metronome": self._song.metronome, + "overdub": self._song.overdub, + "track_count": len(self._song.tracks), + "return_track_count": len(self._song.return_tracks), + "scene_count": len(self._song.scenes), + "master_track": { + "name": "Master", + "volume": self._song.master_track.mixer_device.volume.value, + "panning": self._song.master_track.mixer_device.panning.value + } + } + if hasattr(self._song, "record_mode"): + result["record_mode"] = self._song.record_mode + elif hasattr(self._song, "session_record"): + result["record_mode"] = self._song.session_record + return result + except Exception as e: + self.log_message("Error getting session info: " + str(e)) + raise + + def _get_track_info(self, track_index): + """Get information about a track""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + + track = self._song.tracks[track_index] + track_type = "midi" if track.has_midi_input else "audio" if track.has_audio_input else "unknown" + + # Get clip slots + clip_slots = [] + for slot_index, slot in enumerate(track.clip_slots): + clip_info = None + if slot.has_clip: + clip = slot.clip + clip_info = { + "name": clip.name, + "length": clip.length, + "is_playing": clip.is_playing, + "is_recording": clip.is_recording + } + + clip_slots.append({ + "index": slot_index, + "has_clip": slot.has_clip, + "clip": clip_info + }) + + # Get devices + devices = [] + for device_index, device in enumerate(track.devices): + devices.append({ + "index": device_index, + "name": device.name, + "class_name": device.class_name, + "type": self._get_device_type(device) + }) + + sends = [] + if hasattr(track.mixer_device, "sends"): + for send in track.mixer_device.sends: + sends.append(send.value) + + color_value = None + if hasattr(track, "color"): + color_value = track.color + elif hasattr(track, "color_index"): + color_value = track.color_index + + result = { + "index": track_index, + "name": track.name, + "track_type": track_type, + "is_audio_track": track.has_audio_input, + "is_midi_track": track.has_midi_input, + "mute": track.mute, + "solo": track.solo, + "arm": track.arm, + "volume": track.mixer_device.volume.value, + "panning": track.mixer_device.panning.value, + "sends": sends, + "clip_slots": clip_slots, + "devices": devices, + "device_count": len(track.devices) + } + if color_value is not None: + result["color"] = color_value + return result + except Exception as e: + self.log_message("Error getting track info: " + str(e)) + raise + + def _summarize_track(self, track, index, track_type): + """Summarize a track for listing.""" + info = { + "index": index, + "name": track.name, + "type": track_type + } + if hasattr(track, "mute"): + info["mute"] = track.mute + if hasattr(track, "solo"): + info["solo"] = track.solo + if track_type == "track": + try: + info["arm"] = track.arm + except Exception: + pass + if hasattr(track, "mixer_device"): + info["volume"] = track.mixer_device.volume.value + info["panning"] = track.mixer_device.panning.value + if hasattr(track, "has_audio_input"): + info["is_audio_track"] = track.has_audio_input + if hasattr(track, "has_midi_input"): + info["is_midi_track"] = track.has_midi_input + if hasattr(track, "devices"): + info["device_count"] = len(track.devices) + if hasattr(track, "color"): + info["color"] = track.color + elif hasattr(track, "color_index"): + info["color"] = track.color_index + return info + + def _get_tracks(self): + """Get summary info for all tracks, return tracks, and master.""" + try: + tracks = [] + for index, track in enumerate(self._song.tracks): + tracks.append(self._summarize_track(track, index, "track")) + + return_tracks = [] + for index, track in enumerate(self._song.return_tracks): + return_tracks.append(self._summarize_track(track, index, "return")) + + master = self._summarize_track(self._song.master_track, -1, "master") + + return { + "tracks": tracks, + "return_tracks": return_tracks, + "master_track": master + } + except Exception as e: + self.log_message("Error getting tracks: " + str(e)) + raise + + def _create_midi_track(self, index): + """Create a new MIDI track at the specified index""" + try: + # Create the track + self._song.create_midi_track(index) + + # Get the new track + new_track_index = len(self._song.tracks) - 1 if index == -1 else index + new_track = self._song.tracks[new_track_index] + + result = { + "index": new_track_index, + "name": new_track.name + } + return result + except Exception as e: + self.log_message("Error creating MIDI track: " + str(e)) + raise + + def _create_audio_track(self, index): + """Create a new audio track at the specified index""" + try: + self._song.create_audio_track(index) + new_track_index = len(self._song.tracks) - 1 if index == -1 else index + new_track = self._song.tracks[new_track_index] + return { + "index": new_track_index, + "name": new_track.name + } + except Exception as e: + self.log_message("Error creating audio track: " + str(e)) + raise + + def _create_return_track(self): + """Create a new return track""" + try: + if not hasattr(self._song, "create_return_track"): + raise RuntimeError("Return tracks are not available in this Live version") + self._song.create_return_track() + new_index = len(self._song.return_tracks) - 1 + new_track = self._song.return_tracks[new_index] + return { + "index": new_index, + "name": new_track.name + } + except Exception as e: + self.log_message("Error creating return track: " + str(e)) + raise + + def _set_track_mute(self, track_index, mute): + """Set track mute state""" + try: + track = self._song.tracks[track_index] + track.mute = bool(mute) + return {"mute": track.mute} + except Exception as e: + self.log_message("Error setting track mute: " + str(e)) + raise + + def _set_track_solo(self, track_index, solo): + """Set track solo state""" + try: + track = self._song.tracks[track_index] + track.solo = bool(solo) + return {"solo": track.solo} + except Exception as e: + self.log_message("Error setting track solo: " + str(e)) + raise + + def _set_track_arm(self, track_index, arm): + """Set track arm state""" + try: + track = self._song.tracks[track_index] + if not hasattr(track, "arm"): + raise RuntimeError("Track does not support arm") + track.arm = bool(arm) + return {"arm": track.arm} + except Exception as e: + self.log_message("Error setting track arm: " + str(e)) + raise + + def _set_track_volume(self, track_index, volume): + """Set track volume""" + try: + track = self._song.tracks[track_index] + track.mixer_device.volume.value = float(volume) + return {"volume": track.mixer_device.volume.value} + except Exception as e: + self.log_message("Error setting track volume: " + str(e)) + raise + + def _set_track_pan(self, track_index, pan): + """Set track panning""" + try: + track = self._song.tracks[track_index] + track.mixer_device.panning.value = float(pan) + return {"panning": track.mixer_device.panning.value} + except Exception as e: + self.log_message("Error setting track pan: " + str(e)) + raise + + def _set_track_send(self, track_index, send_index, value): + """Set track send level""" + try: + track = self._song.tracks[track_index] + sends = track.mixer_device.sends + if send_index < 0 or send_index >= len(sends): + raise IndexError("Send index out of range") + sends[send_index].value = float(value) + return {"send_index": send_index, "value": sends[send_index].value} + except Exception as e: + self.log_message("Error setting track send: " + str(e)) + raise + + def _set_track_color(self, track_index, color): + """Set track color index or value""" + try: + track = self._song.tracks[track_index] + if hasattr(track, "color"): + track.color = int(color) + return {"color": track.color} + if hasattr(track, "color_index"): + track.color_index = int(color) + return {"color": track.color_index} + raise RuntimeError("Track color is not supported") + except Exception as e: + self.log_message("Error setting track color: " + str(e)) + raise + + def _set_track_monitoring(self, track_index, state): + """Set track monitoring state (0=off,1=auto,2=in)""" + try: + track = self._song.tracks[track_index] + if not hasattr(track, "current_monitoring_state"): + raise RuntimeError("Track does not support monitoring state") + track.current_monitoring_state = int(state) + return {"current_monitoring_state": track.current_monitoring_state} + except Exception as e: + self.log_message("Error setting track monitoring: " + str(e)) + raise + + def _set_master_volume(self, volume): + """Set master volume""" + try: + self._song.master_track.mixer_device.volume.value = float(volume) + return {"volume": self._song.master_track.mixer_device.volume.value} + except Exception as e: + self.log_message("Error setting master volume: " + str(e)) + raise + + def _set_master_pan(self, pan): + """Set master panning""" + try: + self._song.master_track.mixer_device.panning.value = float(pan) + return {"panning": self._song.master_track.mixer_device.panning.value} + except Exception as e: + self.log_message("Error setting master pan: " + str(e)) + raise + + + def _set_track_name(self, track_index, name): + """Set the name of a track""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + + # Set the name + track = self._song.tracks[track_index] + track.name = name + + result = { + "name": track.name + } + return result + except Exception as e: + self.log_message("Error setting track name: " + str(e)) + raise + + def _create_clip(self, track_index, clip_index, length): + """Create a new MIDI clip in the specified track and clip slot""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + + track = self._song.tracks[track_index] + + if clip_index < 0 or clip_index >= len(track.clip_slots): + raise IndexError("Clip index out of range") + + clip_slot = track.clip_slots[clip_index] + + # Check if the clip slot already has a clip + if clip_slot.has_clip: + raise Exception("Clip slot already has a clip") + + # Create the clip + clip_slot.create_clip(length) + + result = { + "name": clip_slot.clip.name, + "length": clip_slot.clip.length + } + return result + except Exception as e: + self.log_message("Error creating clip: " + str(e)) + raise + + def _create_arrangement_clip(self, track_index, start_time, length): + """Create a new MIDI clip in Arrangement View at the specified time""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + + track = self._song.tracks[track_index] + + # Create clip in arrangement view + clip = track.create_clip(start_time, length) + + result = { + "name": clip.name, + "length": clip.length, + "start_time": start_time + } + return result + except Exception as e: + self.log_message("Error creating arrangement clip: " + str(e)) + raise + + def _get_clip_info(self, track_index, clip_index): + """Get information about a clip in a track""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + track = self._song.tracks[track_index] + if clip_index < 0 or clip_index >= len(track.clip_slots): + raise IndexError("Clip index out of range") + clip_slot = track.clip_slots[clip_index] + if not clip_slot.has_clip: + raise Exception("No clip in slot") + clip = clip_slot.clip + result = { + "name": clip.name, + "length": clip.length, + "is_playing": clip.is_playing, + "is_recording": clip.is_recording + } + if hasattr(clip, "is_audio_clip"): + result["is_audio_clip"] = clip.is_audio_clip + if hasattr(clip, "is_midi_clip"): + result["is_midi_clip"] = clip.is_midi_clip + if hasattr(clip, "looping"): + result["looping"] = clip.looping + if hasattr(clip, "loop_start"): + result["loop_start"] = clip.loop_start + if hasattr(clip, "loop_end"): + result["loop_end"] = clip.loop_end + if hasattr(clip, "loop_length"): + result["loop_length"] = clip.loop_length + if hasattr(clip, "start_marker"): + result["start_marker"] = clip.start_marker + if hasattr(clip, "end_marker"): + result["end_marker"] = clip.end_marker + return result + except Exception as e: + self.log_message("Error getting clip info: " + str(e)) + raise + + def _delete_clip(self, track_index, clip_index): + """Delete a clip from a slot""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + track = self._song.tracks[track_index] + if clip_index < 0 or clip_index >= len(track.clip_slots): + raise IndexError("Clip index out of range") + clip_slot = track.clip_slots[clip_index] + if not clip_slot.has_clip: + raise Exception("No clip in slot") + clip_slot.delete_clip() + return {"deleted": True} + except Exception as e: + self.log_message("Error deleting clip: " + str(e)) + raise + + def _set_clip_loop(self, track_index, clip_index, loop_start, loop_end, loop_length, looping): + """Set clip loop settings""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + track = self._song.tracks[track_index] + if clip_index < 0 or clip_index >= len(track.clip_slots): + raise IndexError("Clip index out of range") + clip_slot = track.clip_slots[clip_index] + if not clip_slot.has_clip: + raise Exception("No clip in slot") + clip = clip_slot.clip + if loop_start is not None and hasattr(clip, "loop_start"): + clip.loop_start = float(loop_start) + if loop_end is not None and hasattr(clip, "loop_end"): + clip.loop_end = float(loop_end) + if loop_length is not None and hasattr(clip, "loop_length") and loop_end is None: + clip.loop_length = float(loop_length) + if looping is not None and hasattr(clip, "looping"): + clip.looping = bool(looping) + return { + "looping": clip.looping if hasattr(clip, "looping") else None, + "loop_start": clip.loop_start if hasattr(clip, "loop_start") else None, + "loop_end": clip.loop_end if hasattr(clip, "loop_end") else None, + "loop_length": clip.loop_length if hasattr(clip, "loop_length") else None + } + except Exception as e: + self.log_message("Error setting clip loop: " + str(e)) + raise + + def _add_notes_to_clip(self, track_index, clip_index, notes): + """Add MIDI notes to a clip""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + + track = self._song.tracks[track_index] + + if clip_index < 0 or clip_index >= len(track.clip_slots): + raise IndexError("Clip index out of range") + + clip_slot = track.clip_slots[clip_index] + + if not clip_slot.has_clip: + raise Exception("No clip in slot") + + clip = clip_slot.clip + + # Convert note data to Live's format + live_notes = [] + for note in notes: + pitch = note.get("pitch", 60) + start_time = note.get("start_time", 0.0) + duration = note.get("duration", 0.25) + velocity = note.get("velocity", 100) + mute = note.get("mute", False) + + live_notes.append((pitch, start_time, duration, velocity, mute)) + + # Add the notes + clip.set_notes(tuple(live_notes)) + + result = { + "note_count": len(notes) + } + return result + except Exception as e: + self.log_message("Error adding notes to clip: " + str(e)) + raise + + def _add_notes_to_arrangement_clip(self, track_index, start_time, notes): + """Add MIDI notes to an Arrangement View clip at the specified start time""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + + track = self._song.tracks[track_index] + + # Find clip in arrangement by start time + # In Ableton Live API, arrangement clips are accessed via track.clips + target_clip = None + for clip in track.clips: + if hasattr(clip, 'start_time') and abs(clip.start_time - start_time) < 0.01: + target_clip = clip + break + + if target_clip is None: + raise Exception(f"No clip found at start_time {start_time}") + + # Convert note data to Live's format + live_notes = [] + for note in notes: + pitch = note.get("pitch", 60) + note_start = note.get("start_time", 0.0) + duration = note.get("duration", 0.25) + velocity = note.get("velocity", 100) + mute = note.get("mute", False) + + live_notes.append((pitch, note_start, duration, velocity, mute)) + + # Add the notes + target_clip.set_notes(tuple(live_notes)) + + result = { + "note_count": len(notes), + "clip_name": target_clip.name + } + return result + except Exception as e: + self.log_message("Error adding notes to arrangement clip: " + str(e)) + raise + + def _set_clip_name(self, track_index, clip_index, name): + """Set the name of a clip""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + + track = self._song.tracks[track_index] + + if clip_index < 0 or clip_index >= len(track.clip_slots): + raise IndexError("Clip index out of range") + + clip_slot = track.clip_slots[clip_index] + + if not clip_slot.has_clip: + raise Exception("No clip in slot") + + clip = clip_slot.clip + clip.name = name + + result = { + "name": clip.name + } + return result + except Exception as e: + self.log_message("Error setting clip name: " + str(e)) + raise + + def _set_tempo(self, tempo): + """Set the tempo of the session""" + try: + self._song.tempo = tempo + + result = { + "tempo": self._song.tempo + } + return result + except Exception as e: + self.log_message("Error setting tempo: " + str(e)) + raise + + def _set_signature(self, numerator, denominator): + """Set the time signature""" + try: + self._song.signature_numerator = int(numerator) + self._song.signature_denominator = int(denominator) + return { + "signature_numerator": self._song.signature_numerator, + "signature_denominator": self._song.signature_denominator + } + except Exception as e: + self.log_message("Error setting signature: " + str(e)) + raise + + def _set_current_song_time(self, time_value): + """Set the current song time""" + try: + self._song.current_song_time = float(time_value) + return {"current_song_time": self._song.current_song_time} + except Exception as e: + self.log_message("Error setting song time: " + str(e)) + raise + + def _set_loop(self, enabled): + """Enable or disable loop""" + try: + self._song.loop = bool(enabled) + return {"loop": self._song.loop} + except Exception as e: + self.log_message("Error setting loop: " + str(e)) + raise + + def _set_loop_region(self, start, length): + """Set loop start and length""" + try: + self._song.loop_start = float(start) + self._song.loop_length = float(length) + return { + "loop_start": self._song.loop_start, + "loop_length": self._song.loop_length + } + except Exception as e: + self.log_message("Error setting loop region: " + str(e)) + raise + + def _set_metronome(self, enabled): + """Enable or disable metronome""" + try: + self._song.metronome = bool(enabled) + return {"metronome": self._song.metronome} + except Exception as e: + self.log_message("Error setting metronome: " + str(e)) + raise + + def _set_overdub(self, enabled): + """Enable or disable overdub""" + try: + self._song.overdub = bool(enabled) + return {"overdub": self._song.overdub} + except Exception as e: + self.log_message("Error setting overdub: " + str(e)) + raise + + def _set_record_mode(self, enabled): + """Enable or disable record mode""" + try: + if hasattr(self._song, "record_mode"): + self._song.record_mode = bool(enabled) + return {"record_mode": self._song.record_mode} + if hasattr(self._song, "session_record"): + self._song.session_record = bool(enabled) + return {"record_mode": self._song.session_record} + raise RuntimeError("Record mode is not supported") + except Exception as e: + self.log_message("Error setting record mode: " + str(e)) + raise + + def _duplicate_clip_to_arrangement(self, track_index, clip_index, start_time): + """Duplicate a Session View clip to Arrangement View at the specified start time""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + + track = self._song.tracks[track_index] + + if clip_index < 0 or clip_index >= len(track.clip_slots): + raise IndexError("Clip index out of range") + + clip_slot = track.clip_slots[clip_index] + + if not clip_slot.has_clip: + raise Exception("No clip in slot") + + source_clip = clip_slot.clip + + # Create a new clip in arrangement at the specified start time + arrangement_clip = track.create_clip(start_time, source_clip.length) + + # Copy all notes from source clip to arrangement clip + if hasattr(source_clip, 'get_notes'): + # Get notes from source clip + source_notes = source_clip.get_notes(1, 1) # Get all notes + arrangement_clip.set_notes(source_notes) + + # Copy other properties + if hasattr(source_clip, 'name') and source_clip.name: + try: + arrangement_clip.name = source_clip.name + except: + pass + + if hasattr(source_clip, 'looping'): + try: + arrangement_clip.looping = source_clip.looping + except: + pass + + result = { + "track_index": track_index, + "start_time": start_time, + "length": arrangement_clip.length, + "name": arrangement_clip.name + } + return result + except Exception as e: + self.log_message("Error duplicating clip to arrangement: " + str(e)) + raise + + def _fire_clip(self, track_index, clip_index): + """Fire a clip""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + + track = self._song.tracks[track_index] + + if clip_index < 0 or clip_index >= len(track.clip_slots): + raise IndexError("Clip index out of range") + + clip_slot = track.clip_slots[clip_index] + + if not clip_slot.has_clip: + raise Exception("No clip in slot") + + clip_slot.fire() + + result = { + "fired": True + } + return result + except Exception as e: + self.log_message("Error firing clip: " + str(e)) + raise + + def _stop_clip(self, track_index, clip_index): + """Stop a clip""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + + track = self._song.tracks[track_index] + + if clip_index < 0 or clip_index >= len(track.clip_slots): + raise IndexError("Clip index out of range") + + clip_slot = track.clip_slots[clip_index] + + clip_slot.stop() + + result = { + "stopped": True + } + return result + except Exception as e: + self.log_message("Error stopping clip: " + str(e)) + raise + + def _stop_all_clips(self): + """Stop all clips in the session""" + try: + self._song.stop_all_clips() + return {"stopped": True} + except Exception as e: + self.log_message("Error stopping all clips: " + str(e)) + raise + + def _get_scenes(self): + """Get list of scenes""" + try: + scenes = [] + for index, scene in enumerate(self._song.scenes): + scenes.append({ + "index": index, + "name": scene.name + }) + return {"scenes": scenes} + except Exception as e: + self.log_message("Error getting scenes: " + str(e)) + raise + + def _create_scene(self, index): + """Create a new scene at index""" + try: + scene_index = len(self._song.scenes) if index == -1 else index + self._song.create_scene(scene_index) + scene = self._song.scenes[scene_index] + return {"index": scene_index, "name": scene.name} + except Exception as e: + self.log_message("Error creating scene: " + str(e)) + raise + + def _set_scene_name(self, scene_index, name): + """Set a scene name""" + try: + if scene_index < 0 or scene_index >= len(self._song.scenes): + raise IndexError("Scene index out of range") + scene = self._song.scenes[scene_index] + scene.name = name + return {"name": scene.name} + except Exception as e: + self.log_message("Error setting scene name: " + str(e)) + raise + + def _fire_scene(self, scene_index): + """Fire a scene""" + try: + if scene_index < 0 or scene_index >= len(self._song.scenes): + raise IndexError("Scene index out of range") + scene = self._song.scenes[scene_index] + scene.fire() + return {"fired": True} + except Exception as e: + self.log_message("Error firing scene: " + str(e)) + raise + + def _delete_scene(self, scene_index): + """Delete a scene""" + try: + if scene_index < 0 or scene_index >= len(self._song.scenes): + raise IndexError("Scene index out of range") + if hasattr(self._song, "delete_scene"): + self._song.delete_scene(scene_index) + else: + raise RuntimeError("Scene deletion is not supported") + return {"deleted": True} + except Exception as e: + self.log_message("Error deleting scene: " + str(e)) + raise + + + def _start_playback(self): + """Start playing the session""" + try: + self._song.start_playing() + + result = { + "playing": self._song.is_playing + } + return result + except Exception as e: + self.log_message("Error starting playback: " + str(e)) + raise + + def _stop_playback(self): + """Stop playing the session""" + try: + self._song.stop_playing() + + result = { + "playing": self._song.is_playing + } + return result + except Exception as e: + self.log_message("Error stopping playback: " + str(e)) + raise + + def _get_track_devices(self, track_index): + """Get devices on a track""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + track = self._song.tracks[track_index] + devices = [] + for device_index, device in enumerate(track.devices): + devices.append({ + "index": device_index, + "name": device.name, + "class_name": device.class_name, + "type": self._get_device_type(device), + "parameter_count": len(device.parameters) + }) + return {"devices": devices} + except Exception as e: + self.log_message("Error getting track devices: " + str(e)) + raise + + def _get_device_parameters(self, track_index, device_index): + """Get device parameters""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + track = self._song.tracks[track_index] + if device_index < 0 or device_index >= len(track.devices): + raise IndexError("Device index out of range") + device = track.devices[device_index] + parameters = [] + for index, param in enumerate(device.parameters): + param_info = { + "index": index, + "name": param.name, + "value": param.value, + "min": param.min, + "max": param.max, + "is_quantized": param.is_quantized + } + if hasattr(param, "value_items") and param.is_quantized: + param_info["value_items"] = list(param.value_items) + parameters.append(param_info) + return { + "device_name": device.name, + "parameters": parameters + } + except Exception as e: + self.log_message("Error getting device parameters: " + str(e)) + raise + + def _set_device_parameter(self, track_index, device_index, parameter_index, parameter_name, value): + """Set a device parameter by index or name""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + track = self._song.tracks[track_index] + if device_index < 0 or device_index >= len(track.devices): + raise IndexError("Device index out of range") + device = track.devices[device_index] + + param = None + if parameter_index is not None: + if parameter_index < 0 or parameter_index >= len(device.parameters): + raise IndexError("Parameter index out of range") + param = device.parameters[parameter_index] + elif parameter_name: + name_lower = parameter_name.lower() + for candidate in device.parameters: + if candidate.name.lower() == name_lower: + param = candidate + break + if param is None: + raise ValueError("Parameter not found") + + if isinstance(value, string_types): + try: + value = float(value) + except Exception: + if hasattr(param, "value_items") and param.is_quantized: + items = list(param.value_items) + if value in items: + value = float(items.index(value)) + else: + raise ValueError("Parameter value is not valid") + else: + raise + + if isinstance(value, (int, float)): + if value < param.min: + value = param.min + if value > param.max: + value = param.max + param.value = value + + return { + "name": param.name, + "value": param.value + } + except Exception as e: + self.log_message("Error setting device parameter: " + str(e)) + raise + + def _set_device_on(self, track_index, device_index, enabled): + """Enable or disable a device""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + track = self._song.tracks[track_index] + if device_index < 0 or device_index >= len(track.devices): + raise IndexError("Device index out of range") + device = track.devices[device_index] + + if hasattr(device, "is_enabled"): + device.is_enabled = bool(enabled) + return {"enabled": device.is_enabled} + if hasattr(device, "is_active"): + device.is_active = bool(enabled) + return {"enabled": device.is_active} + + for param in device.parameters: + if param.name.lower() in ["device on", "on", "power"]: + param.value = 1.0 if enabled else 0.0 + return {"enabled": bool(param.value)} + + raise RuntimeError("Device on/off is not supported") + except Exception as e: + self.log_message("Error setting device on: " + str(e)) + raise + + def _get_browser_categories(self, category_type): + """Get browser categories (shallow tree).""" + try: + return self.get_browser_tree(category_type, 0) + except Exception as e: + self.log_message("Error getting browser categories: " + str(e)) + raise + + def _get_browser_items(self, path, item_type): + """Get browser items at path with optional filtering.""" + try: + result = self.get_browser_items_at_path(path) + items = result.get("items", []) + if item_type == "loadable": + items = [item for item in items if item.get("is_loadable")] + elif item_type == "folders": + items = [item for item in items if item.get("is_folder")] + result["items"] = items + return result + except Exception as e: + self.log_message("Error getting browser items: " + str(e)) + raise + + def _get_browser_item(self, uri, path): + """Get a browser item by URI or path""" + try: + # Access the application's browser instance instead of creating a new one + app = self.application() + if not app: + raise RuntimeError("Could not access Live application") + + result = { + "uri": uri, + "path": path, + "found": False + } + + # Try to find by URI first if provided + if uri: + item = self._find_browser_item_by_uri(app.browser, uri) + if item: + result["found"] = True + result["item"] = { + "name": item.name, + "is_folder": item.is_folder, + "is_device": item.is_device, + "is_loadable": item.is_loadable, + "uri": item.uri + } + return result + + # If URI not provided or not found, try by path + if path: + # Parse the path and navigate to the specified item + path_parts = path.split("/") + + # Determine the root based on the first part + current_item = None + if path_parts[0].lower() == "instruments": + current_item = app.browser.instruments + elif path_parts[0].lower() == "sounds": + current_item = app.browser.sounds + elif path_parts[0].lower() == "drums": + current_item = app.browser.drums + elif path_parts[0].lower() == "audio_effects": + current_item = app.browser.audio_effects + elif path_parts[0].lower() == "midi_effects": + current_item = app.browser.midi_effects + else: + # Default to instruments if not specified + current_item = app.browser.instruments + # Don't skip the first part in this case + path_parts = ["instruments"] + path_parts + + # Navigate through the path + for i in range(1, len(path_parts)): + part = path_parts[i] + if not part: # Skip empty parts + continue + + found = False + for child in current_item.children: + if child.name.lower() == part.lower(): + current_item = child + found = True + break + + if not found: + result["error"] = "Path part '{0}' not found".format(part) + return result + + # Found the item + result["found"] = True + result["item"] = { + "name": current_item.name, + "is_folder": current_item.is_folder, + "is_device": current_item.is_device, + "is_loadable": current_item.is_loadable, + "uri": current_item.uri + } + + return result + except Exception as e: + self.log_message("Error getting browser item: " + str(e)) + self.log_message(traceback.format_exc()) + raise + + + + def _load_browser_item(self, track_index, item_uri): + """Load a browser item onto a track by its URI""" + try: + if track_index < 0 or track_index >= len(self._song.tracks): + raise IndexError("Track index out of range") + + track = self._song.tracks[track_index] + + # Access the application's browser instance instead of creating a new one + app = self.application() + + # Find the browser item by URI + item = self._find_browser_item_by_uri(app.browser, item_uri) + + if not item: + raise ValueError("Browser item with URI '{0}' not found".format(item_uri)) + + # Select the track + self._song.view.selected_track = track + + # Load the item + app.browser.load_item(item) + + result = { + "loaded": True, + "item_name": item.name, + "track_name": track.name, + "uri": item_uri + } + return result + except Exception as e: + self.log_message("Error loading browser item: {0}".format(str(e))) + self.log_message(traceback.format_exc()) + raise + + def _load_instrument_or_effect(self, track_index, uri): + """Alias for loading a browser item by URI""" + return self._load_browser_item(track_index, uri) + + def _get_browser_roots(self, category_type): + """Get browser root items based on category type.""" + app = self.application() + if not app or not hasattr(app, "browser"): + raise RuntimeError("Could not access Live browser") + browser = app.browser + roots = [] + if category_type in ["all", "instruments"] and hasattr(browser, "instruments"): + roots.append(("Instruments", browser.instruments)) + if category_type in ["all", "sounds"] and hasattr(browser, "sounds"): + roots.append(("Sounds", browser.sounds)) + if category_type in ["all", "drums"] and hasattr(browser, "drums"): + roots.append(("Drums", browser.drums)) + if category_type in ["all", "audio_effects"] and hasattr(browser, "audio_effects"): + roots.append(("Audio Effects", browser.audio_effects)) + if category_type in ["all", "midi_effects"] and hasattr(browser, "midi_effects"): + roots.append(("MIDI Effects", browser.midi_effects)) + + if category_type == "all": + for attr in dir(browser): + if attr.startswith("_"): + continue + if attr in ["instruments", "sounds", "drums", "audio_effects", "midi_effects"]: + continue + try: + item = getattr(browser, attr) + except Exception: + continue + if hasattr(item, "children") or hasattr(item, "name"): + roots.append((attr.replace("_", " ").title(), item)) + return roots + + def _search_browser_items_internal(self, query, category_type, max_results, max_depth, loadable_only): + """Search browser items by name.""" + results = [] + query_lower = query.lower() + + def visit(item, path_parts, depth): + if len(results) >= max_results: + return + name = getattr(item, "name", None) + next_path_parts = path_parts + if name and (not path_parts or path_parts[-1] != name): + next_path_parts = path_parts + [name] + if name: + if query_lower in name.lower(): + is_loadable = hasattr(item, "is_loadable") and item.is_loadable + if not loadable_only or is_loadable: + results.append({ + "name": name, + "path": "/".join(next_path_parts), + "is_folder": hasattr(item, "children") and bool(item.children), + "is_device": hasattr(item, "is_device") and item.is_device, + "is_loadable": is_loadable, + "uri": item.uri if hasattr(item, "uri") else None + }) + if depth >= max_depth: + return + if hasattr(item, "children") and item.children: + for child in item.children: + visit(child, next_path_parts, depth + 1) + if len(results) >= max_results: + return + + roots = self._get_browser_roots(category_type) + for root_name, root in roots: + visit(root, [root_name], 0) + if len(results) >= max_results: + break + + return results + + def _search_browser_items(self, query, category_type, max_results, max_depth, loadable_only): + """Search for browser items by name and return matches.""" + try: + results = self._search_browser_items_internal( + query, + category_type, + max_results, + max_depth, + loadable_only + ) + return { + "query": query, + "category_type": category_type, + "max_results": max_results, + "items": results + } + except Exception as e: + self.log_message("Error searching browser items: {0}".format(str(e))) + self.log_message(traceback.format_exc()) + raise + + def _load_browser_item_by_name(self, track_index, query, category_type, max_depth): + """Search and load the first matching loadable browser item by name.""" + try: + results = self._search_browser_items_internal( + query, + category_type, + 1, + max_depth, + True + ) + if not results: + raise ValueError("No loadable item found for query '{0}'".format(query)) + item = results[0] + if not item.get("uri"): + raise ValueError("Item does not have a URI") + return self._load_browser_item(track_index, item.get("uri")) + except Exception as e: + self.log_message("Error loading browser item by name: {0}".format(str(e))) + self.log_message(traceback.format_exc()) + raise + + def _load_browser_item_at_path(self, track_index, path, item_name): + """Load a browser item from a path, optionally matching by name.""" + try: + path_result = self.get_browser_items_at_path(path) + items = path_result.get("items", []) + selected = None + if item_name: + name_lower = item_name.lower() + for item in items: + if item.get("name", "").lower() == name_lower and item.get("is_loadable"): + selected = item + break + else: + for item in items: + if item.get("is_loadable"): + selected = item + break + if not selected or not selected.get("uri"): + raise ValueError("No loadable item found at path") + return self._load_browser_item(track_index, selected.get("uri")) + except Exception as e: + self.log_message("Error loading browser item at path: {0}".format(str(e))) + self.log_message(traceback.format_exc()) + raise + + def _find_browser_item_by_uri(self, browser_or_item, uri, max_depth=10, current_depth=0): + """Find a browser item by its URI""" + try: + # Check if this is the item we're looking for + if hasattr(browser_or_item, 'uri') and browser_or_item.uri == uri: + return browser_or_item + + # Stop recursion if we've reached max depth + if current_depth >= max_depth: + return None + + # Check if this is a browser with root categories + if hasattr(browser_or_item, 'instruments'): + try: + roots = self._get_browser_roots("all") + except Exception: + roots = [] + + for _, category in roots: + item = self._find_browser_item_by_uri(category, uri, max_depth, current_depth + 1) + if item: + return item + + return None + + # Check if this item has children + if hasattr(browser_or_item, 'children') and browser_or_item.children: + for child in browser_or_item.children: + item = self._find_browser_item_by_uri(child, uri, max_depth, current_depth + 1) + if item: + return item + + return None + except Exception as e: + self.log_message("Error finding browser item by URI: {0}".format(str(e))) + return None + + # Helper methods + + def _get_device_type(self, device): + """Get the type of a device""" + try: + # Simple heuristic - in a real implementation you'd look at the device class + if device.can_have_drum_pads: + return "drum_machine" + elif device.can_have_chains: + return "rack" + elif "instrument" in device.class_display_name.lower(): + return "instrument" + elif "audio_effect" in device.class_name.lower(): + return "audio_effect" + elif "midi_effect" in device.class_name.lower(): + return "midi_effect" + else: + return "unknown" + except: + return "unknown" + + def get_browser_tree(self, category_type="all", max_depth=2): + """ + Get a simplified tree of browser categories. + + Args: + category_type: Type of categories to get ('all', 'instruments', 'sounds', etc.) + max_depth: Maximum depth to traverse + + Returns: + Dictionary with the browser tree structure + """ + try: + # Access the application's browser instance instead of creating a new one + app = self.application() + if not app: + raise RuntimeError("Could not access Live application") + + # Check if browser is available + if not hasattr(app, 'browser') or app.browser is None: + raise RuntimeError("Browser is not available in the Live application") + + # Log available browser attributes to help diagnose issues + browser_attrs = [attr for attr in dir(app.browser) if not attr.startswith('_')] + self.log_message("Available browser attributes: {0}".format(browser_attrs)) + + result = { + "type": category_type, + "categories": [], + "available_categories": browser_attrs, + "total_folders": 0 + } + folder_count = [0] + + # Helper function to process a browser item and its children + def process_item(item, depth=0, path_parts=None): + if not item: + return None + if path_parts is None: + path_parts = [] + + name = item.name if hasattr(item, 'name') else "Unknown" + node = { + "name": name, + "path": "/".join(path_parts + [name]), + "is_folder": hasattr(item, 'children') and bool(item.children), + "is_device": hasattr(item, 'is_device') and item.is_device, + "is_loadable": hasattr(item, 'is_loadable') and item.is_loadable, + "uri": item.uri if hasattr(item, 'uri') else None, + "children": [] + } + + if hasattr(item, 'children') and item.children: + if depth >= max_depth: + node["has_more"] = True + return node + for child in item.children: + child_node = process_item(child, depth + 1, path_parts + [name]) + if child_node: + node["children"].append(child_node) + folder_count[0] += 1 + + return node + + # Process based on category type and available attributes + if (category_type == "all" or category_type == "instruments") and hasattr(app.browser, 'instruments'): + try: + instruments = process_item(app.browser.instruments, 0, []) + if instruments: + instruments["name"] = "Instruments" # Ensure consistent naming + instruments["path"] = "Instruments" + result["categories"].append(instruments) + except Exception as e: + self.log_message("Error processing instruments: {0}".format(str(e))) + + if (category_type == "all" or category_type == "sounds") and hasattr(app.browser, 'sounds'): + try: + sounds = process_item(app.browser.sounds, 0, []) + if sounds: + sounds["name"] = "Sounds" # Ensure consistent naming + sounds["path"] = "Sounds" + result["categories"].append(sounds) + except Exception as e: + self.log_message("Error processing sounds: {0}".format(str(e))) + + if (category_type == "all" or category_type == "drums") and hasattr(app.browser, 'drums'): + try: + drums = process_item(app.browser.drums, 0, []) + if drums: + drums["name"] = "Drums" # Ensure consistent naming + drums["path"] = "Drums" + result["categories"].append(drums) + except Exception as e: + self.log_message("Error processing drums: {0}".format(str(e))) + + if (category_type == "all" or category_type == "audio_effects") and hasattr(app.browser, 'audio_effects'): + try: + audio_effects = process_item(app.browser.audio_effects, 0, []) + if audio_effects: + audio_effects["name"] = "Audio Effects" # Ensure consistent naming + audio_effects["path"] = "Audio Effects" + result["categories"].append(audio_effects) + except Exception as e: + self.log_message("Error processing audio_effects: {0}".format(str(e))) + + if (category_type == "all" or category_type == "midi_effects") and hasattr(app.browser, 'midi_effects'): + try: + midi_effects = process_item(app.browser.midi_effects, 0, []) + if midi_effects: + midi_effects["name"] = "MIDI Effects" + midi_effects["path"] = "MIDI Effects" + result["categories"].append(midi_effects) + except Exception as e: + self.log_message("Error processing midi_effects: {0}".format(str(e))) + + # Try to process other potentially available categories + for attr in browser_attrs: + if attr not in ['instruments', 'sounds', 'drums', 'audio_effects', 'midi_effects'] and \ + (category_type == "all" or category_type == attr): + try: + item = getattr(app.browser, attr) + if hasattr(item, 'children') or hasattr(item, 'name'): + category = process_item(item, 0, []) + if category: + category["name"] = attr.capitalize() + category["path"] = attr.capitalize() + result["categories"].append(category) + except Exception as e: + self.log_message("Error processing {0}: {1}".format(attr, str(e))) + result["total_folders"] = folder_count[0] + self.log_message("Browser tree generated for {0} with {1} root categories".format( + category_type, len(result['categories']))) + return result + + except Exception as e: + self.log_message("Error getting browser tree: {0}".format(str(e))) + self.log_message(traceback.format_exc()) + raise + + def get_browser_items_at_path(self, path): + """ + Get browser items at a specific path. + + Args: + path: Path in the format "category/folder/subfolder" + where category is one of: instruments, sounds, drums, audio_effects, midi_effects + or any other available browser category + + Returns: + Dictionary with items at the specified path + """ + try: + # Access the application's browser instance instead of creating a new one + app = self.application() + if not app: + raise RuntimeError("Could not access Live application") + + # Check if browser is available + if not hasattr(app, 'browser') or app.browser is None: + raise RuntimeError("Browser is not available in the Live application") + + # Log available browser attributes to help diagnose issues + browser_attrs = [attr for attr in dir(app.browser) if not attr.startswith('_')] + self.log_message("Available browser attributes: {0}".format(browser_attrs)) + + # Parse the path + path_parts = path.split("/") + if not path_parts: + raise ValueError("Invalid path") + + # Determine the root category + root_category = path_parts[0].lower() + current_item = None + + # Check standard categories first + if root_category == "instruments" and hasattr(app.browser, 'instruments'): + current_item = app.browser.instruments + elif root_category == "sounds" and hasattr(app.browser, 'sounds'): + current_item = app.browser.sounds + elif root_category == "drums" and hasattr(app.browser, 'drums'): + current_item = app.browser.drums + elif root_category == "audio_effects" and hasattr(app.browser, 'audio_effects'): + current_item = app.browser.audio_effects + elif root_category == "midi_effects" and hasattr(app.browser, 'midi_effects'): + current_item = app.browser.midi_effects + else: + # Try to find the category in other browser attributes + found = False + for attr in browser_attrs: + if attr.lower() == root_category: + try: + current_item = getattr(app.browser, attr) + found = True + break + except Exception as e: + self.log_message("Error accessing browser attribute {0}: {1}".format(attr, str(e))) + + if not found: + # If we still haven't found the category, return available categories + return { + "path": path, + "error": "Unknown or unavailable category: {0}".format(root_category), + "available_categories": browser_attrs, + "items": [] + } + + # Navigate through the path + for i in range(1, len(path_parts)): + part = path_parts[i] + if not part: # Skip empty parts + continue + + if not hasattr(current_item, 'children'): + return { + "path": path, + "error": "Item at '{0}' has no children".format('/'.join(path_parts[:i])), + "items": [] + } + + found = False + for child in current_item.children: + if hasattr(child, 'name') and child.name.lower() == part.lower(): + current_item = child + found = True + break + + if not found: + return { + "path": path, + "error": "Path part '{0}' not found".format(part), + "items": [] + } + + # Get items at the current path + items = [] + if hasattr(current_item, 'children'): + for child in current_item.children: + item_info = { + "name": child.name if hasattr(child, 'name') else "Unknown", + "is_folder": hasattr(child, 'children') and bool(child.children), + "is_device": hasattr(child, 'is_device') and child.is_device, + "is_loadable": hasattr(child, 'is_loadable') and child.is_loadable, + "uri": child.uri if hasattr(child, 'uri') else None + } + items.append(item_info) + + result = { + "path": path, + "name": current_item.name if hasattr(current_item, 'name') else "Unknown", + "uri": current_item.uri if hasattr(current_item, 'uri') else None, + "is_folder": hasattr(current_item, 'children') and bool(current_item.children), + "is_device": hasattr(current_item, 'is_device') and current_item.is_device, + "is_loadable": hasattr(current_item, 'is_loadable') and current_item.is_loadable, + "items": items + } + + self.log_message("Retrieved {0} items at path: {1}".format(len(items), path)) + return result + + except Exception as e: + self.log_message("Error getting browser items at path: {0}".format(str(e))) + self.log_message(traceback.format_exc()) + raise