import argparse import csv import io import json import os import re import sys from googleapiclient.discovery import build from googleapiclient.errors import HttpError sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from auth import get_credentials def _get_sheets_service(): creds = get_credentials() return build("sheets", "v4", credentials=creds) def _extract_spreadsheet_id(value): match = re.search(r"/spreadsheets/d/([a-zA-Z0-9_-]+)", value) if match: return match.group(1) return value def cmd_get_text(args): service = _get_sheets_service() sid = _extract_spreadsheet_id(args.spreadsheet_id) meta = service.spreadsheets().get(spreadsheetId=sid).execute() title = meta.get("properties", {}).get("title", "Untitled") output = io.StringIO() for sheet in meta.get("sheets", []): props = sheet.get("properties", {}) sheet_name = props.get("title", "Sheet1") output.write(f"\n=== {sheet_name} ===\n") result = service.spreadsheets().values().get( spreadsheetId=sid, range=sheet_name ).execute() rows = result.get("values", []) if not rows: output.write("(empty)\n") continue if args.format == "csv": writer = csv.writer(output) for row in rows: writer.writerow(row) output.write("\n") elif args.format == "json": headers = rows[0] if rows else [] data = [dict(zip(headers, row)) for row in rows[1:]] if args.format == "json" else rows output.write(json.dumps({title: data if args.format == "json" else rows}, indent=2)) output.write("\n") else: output.write(f"Sheet: {sheet_name}\n") for row in rows: output.write(" | ".join(row) + "\n") output.write("\n") print(output.getvalue().strip()) def cmd_get_range(args): service = _get_sheets_service() sid = _extract_spreadsheet_id(args.spreadsheet_id) result = service.spreadsheets().values().get( spreadsheetId=sid, range=args.range ).execute() rows = result.get("values", []) if not rows: print("(empty)") return for row in rows: print(" | ".join(row)) def cmd_find(args): service = _get_sheets_service() result = service.spreadsheets().get( spreadsheetId="me", fields="files(id,name,mimeType)" ).execute() drive_service = build("drive", "v3", credentials=get_credentials()) query = f"name contains '{args.query}' and mimeType = 'application/vnd.google-apps.spreadsheet'" results = drive_service.files().list( q=query, pageSize=args.limit, fields="files(id, name, modifiedTime)" ).execute() files = results.get("files", []) if not files: print(f"No spreadsheets found matching '{args.query}'") return for f in files: print(f"{f['name']:60s} {f['id']:50s} {f.get('modifiedTime', '')}") def cmd_get_metadata(args): service = _get_sheets_service() sid = _extract_spreadsheet_id(args.spreadsheet_id) meta = service.spreadsheets().get(spreadsheetId=sid).execute() props = meta.get("properties", {}) print(f"Title: {props.get('title')}") print(f"Locale: {props.get('locale')}") print(f"Timezone: {props.get('timeZone')}") print(f"URL: https://docs.google.com/spreadsheets/d/{sid}/edit") print() for sheet in meta.get("sheets", []): sp = sheet.get("properties", {}) print(f" Sheet: {sp.get('title')} (id={sp.get('sheetId')})") grid = sp.get("gridProperties", {}) print(f" Rows: {grid.get('rowCount')}, Cols: {grid.get('columnCount')}") def cmd_update_range(args): service = _get_sheets_service() sid = _extract_spreadsheet_id(args.spreadsheet_id) try: values = json.loads(args.values) except json.JSONDecodeError as e: print(f"Invalid JSON: {e}") sys.exit(1) body = { "range": args.range, "values": values, "majorDimension": "ROWS", } value_input = "RAW" if args.raw else "USER_ENTERED" result = service.spreadsheets().values().update( spreadsheetId=sid, range=args.range, valueInputOption=value_input, body=body, ).execute() print(f"Updated {result.get('updatedCells')} cells.") def cmd_append_rows(args): service = _get_sheets_service() sid = _extract_spreadsheet_id(args.spreadsheet_id) try: values = json.loads(args.values) except json.JSONDecodeError as e: print(f"Invalid JSON: {e}") sys.exit(1) body = { "range": args.range, "values": values, "majorDimension": "ROWS", } result = service.spreadsheets().values().append( spreadsheetId=sid, range=args.range, valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body=body, ).execute() print(f"Appended to {result.get('updates', {}).get('updatedRange')}") def cmd_clear_range(args): service = _get_sheets_service() sid = _extract_spreadsheet_id(args.spreadsheet_id) result = service.spreadsheets().values().clear( spreadsheetId=sid, range=args.range, body={} ).execute() print(f"Cleared range: {result.get('clearedRange')}") def cmd_batch_update(args): service = _get_sheets_service() sid = _extract_spreadsheet_id(args.spreadsheet_id) try: requests = json.loads(args.requests_json) except json.JSONDecodeError as e: print(f"Invalid JSON: {e}") sys.exit(1) body = {"requests": requests if isinstance(requests, list) else [requests]} result = service.spreadsheets().batchUpdate( spreadsheetId=sid, body=body ).execute() print(f"Batch update completed. Replies: {len(result.get('replies', []))}") def main(): parser = argparse.ArgumentParser(description="Google Sheets Operations") sub = parser.add_subparsers(dest="command") p = sub.add_parser("get-text", help="Get spreadsheet content as text/csv/json") p.add_argument("spreadsheet_id", help="Spreadsheet ID or URL") p.add_argument("--format", choices=["text", "csv", "json"], default="text") p = sub.add_parser("get-range", help="Get values from a specific range") p.add_argument("spreadsheet_id", help="Spreadsheet ID or URL") p.add_argument("range", help="Range in A1 notation (e.g. Sheet1!A1:D10)") p = sub.add_parser("find", help="Find spreadsheets by name") p.add_argument("query", help="Search query") p.add_argument("--limit", type=int, default=10) p = sub.add_parser("get-metadata", help="Get spreadsheet metadata") p.add_argument("spreadsheet_id", help="Spreadsheet ID or URL") p = sub.add_parser("update-range", help="Update a range of cells") p.add_argument("spreadsheet_id", help="Spreadsheet ID or URL") p.add_argument("range", help="Range in A1 notation") p.add_argument("values", help="JSON 2D array of values") p.add_argument("--raw", action="store_true", help="Use RAW input (no formula parsing)") p = sub.add_parser("append-rows", help="Append rows to a sheet") p.add_argument("spreadsheet_id", help="Spreadsheet ID or URL") p.add_argument("range", help="Range in A1 notation (e.g. Sheet1!A:Z)") p.add_argument("values", help="JSON 2D array of rows") p = sub.add_parser("clear-range", help="Clear values from a range") p.add_argument("spreadsheet_id", help="Spreadsheet ID or URL") p.add_argument("range", help="Range in A1 notation") p = sub.add_parser("batch-update", help="Advanced batch update") p.add_argument("spreadsheet_id", help="Spreadsheet ID or URL") p.add_argument("requests_json", help="JSON array of batch update requests") args = parser.parse_args() if args.command == "get-text": cmd_get_text(args) elif args.command == "get-range": cmd_get_range(args) elif args.command == "find": cmd_find(args) elif args.command == "get-metadata": cmd_get_metadata(args) elif args.command == "update-range": cmd_update_range(args) elif args.command == "append-rows": cmd_append_rows(args) elif args.command == "clear-range": cmd_clear_range(args) elif args.command == "batch-update": cmd_batch_update(args) else: parser.print_help() if __name__ == "__main__": main()