app
🧩 Syntax:
# # import requests
# # import json
# # import time
# # import os
# # import socket # For basic network connectivity check
# # from dotenv import load_dotenv
# # def has_internet():
# # """
# # Checks for internet connectivity by trying to resolve a well-known host.
# # """
# # try:
# # socket.create_connection(("8.8.8.8", 53), timeout=5) # Google's public DNS
# # return True
# # except OSError:
# # return False
# # def fetch_data_with_retry(url, headers=None, params=None, method='GET', max_retries=float('inf'), retry_delay=1):
# # """
# # Fetches data from a URL with indefinite retries until a network connection is available.
# # Args:
# # url (str): The URL to fetch.
# # headers (dict, optional): HTTP headers. Defaults to None.
# # params (dict, optional): Query parameters for the request. Defaults to None.
# # method (str, optional): HTTP method ('GET', 'POST', etc.). Defaults to 'GET'.
# # max_retries (int, optional): Maximum number of retries (inf for indefinite). Defaults to float('inf').
# # retry_delay (int, optional): Delay in seconds between retries. Defaults to 1.
# # Returns:
# # requests.Response or None: The response object if successful, None otherwise (after max retries).
# # """
# # retries = 0
# # while retries < max_retries:
# # if has_internet():
# # try:
# # if method.upper() == 'GET':
# # response = requests.get(url, headers=headers, params=params)
# # else:
# # print(f"Unsupported HTTP method: {method}")
# # return None
# # response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
# # return response
# # except requests.exceptions.RequestException as e:
# # print(f"Network error during request to {url}: {e}")
# # print(f"Retrying in {retry_delay} seconds...")
# # time.sleep(retry_delay)
# # retries += 1
# # else:
# # print("No internet connection. Waiting for connection...")
# # time.sleep(retry_delay)
# # print(f"Max retries reached. Unable to fetch data from {url}.")
# # return None
# # def make_moxfield_api_request(endpoint, user_agent, params=None, method='GET', use_delay=True):
# # """
# # Makes a request to the Moxfield API with network retry.
# # Args:
# # endpoint (str): The API endpoint to call (e.g., "/v2/decks/search/").
# # user_agent (str): The user agent string to use.
# # params (dict, optional): Query parameters for the request. Defaults to None.
# # method (str, optional): HTTP method ('GET', 'POST', etc.). Defaults to 'GET'.
# # use_delay (bool, optional): Whether to wait for 1 second before making the request. Defaults to False.
# # Returns:
# # dict or list or None: The JSON response as a dictionary or list,
# # or None if an error occurred.
# # """
# # base_url = "https://api2.moxfield.com"
# # default_user_agent = "Mozilla/5.0 (Windows NT 10.0; rv:115.0) Gecko/20100101 Firefox/115.0"
# # url = f"{base_url}{endpoint}"
# # headers = {
# # 'User-Agent': f"{default_user_agent} {user_agent}",
# # }
# # if use_delay:
# # time.sleep(1)
# # response = fetch_data_with_retry(url, headers=headers, params=params, method=method)
# # if response:
# # try:
# # return response.json()
# # except json.JSONDecodeError:
# # print(f"Error decoding JSON response from {url}: {response.text}")
# # return None
# # return None
# # def get_all_deck_objects(search_endpoint, user_agent, max_pages=400): #added max_pages
# # """
# # Retrieves all deck objects from the given search endpoint, iterating through pages,
# # and filters for decks with "format": "commander" and a "mainboardCount" of 100.
# # Args:
# # search_endpoint (str): The Moxfield API endpoint to search for decks (e.g., "/v2/decks/search/").
# # user_agent (str): The user agent string to use for the API requests.
# # max_pages (int, optional): The maximum number of pages to retrieve. Defaults to 400.
# # Returns:
# # list or None: A list of dictionaries, where each dictionary represents a Commander deck object
# # with a mainboardCount of 100, or None if an error occurred or no such decks were found.
# # """
# # commander_decks = []
# # total_decks_found = 0
# # page_number = 1
# # has_more_pages = True #changed from false to true
# # while has_more_pages and page_number <= max_pages: #added page_number check
# # paged_endpoint = f"{search_endpoint}?pageNumber={page_number}"
# # decks_data = make_moxfield_api_request(paged_endpoint, user_agent, use_delay=True)
# # if isinstance(decks_data, dict) and 'data' in decks_data and isinstance(decks_data['data'], list):
# # if not decks_data['data']:
# # has_more_pages = False # Stop if a page returns no data
# # print(f"Page {page_number} returned no data. Stopping iteration.")
# # else:
# # retrieved_count = len(decks_data['data'])
# # total_decks_found += retrieved_count
# # for deck in decks_data['data']:
# # format = deck.get("format")
# # mainboard_count = deck.get("mainboardCount")
# # if format == "commander" and mainboard_count == 100:
# # commander_decks.append(deck)
# # percentage = (page_number / max_pages) * 100 if max_pages > 0 else 0
# # print(f"Page: {page_number}/{max_pages} | Commander: {len(commander_decks)}/{total_decks_found} ({percentage:.2f}%)", end='\r')
# # page_number += 1
# # else:
# # print(f"Error: Unexpected response structure on page {page_number}.")
# # has_more_pages = False # Stop on error, prevent infinite loop
# # return None
# # if page_number > max_pages:
# # print(f"\nReached maximum page limit ({max_pages}). Stopping iteration.")
# # break
# # print(f"\nTotal Commander decks found with 100 mainboard cards: {len(commander_decks)}")
# # return commander_decks
# # def find_commander_decks_by_name(search_endpoint, user_agent, commander_name, max_pages=4):
# # """
# # Retrieves deck objects from the given search endpoint, filtering for Commander
# # decks where the commander's name matches the user input using the API's
# # 'commanderCardName' parameter.
# # Args:
# # search_endpoint (str): The Moxfield API endpoint to search for decks.
# # user_agent (str): The user agent string for API requests.
# # commander_name (str): The exact name of the commander to search for.
# # max_pages (int, optional): The maximum number of pages to retrieve. Defaults to 4.
# # Returns:
# # list or None: A list of dictionaries, where each dictionary represents a Commander
# # deck object with the specified commander, or None if an error occurred.
# # """
# # matching_commander_decks = []
# # total_decks_found = 0
# # page_number = 1
# # has_more_pages = True
# # while has_more_pages and page_number <= max_pages:
# # params = {
# # 'pageNumber': page_number,
# # 'fmt': 'commander',
# # 'commanderCardName': commander_name,
# # # 'commanderCardId': '61M29' # Consider making this dynamic if needed
# # }
# # decks_data = make_moxfield_api_request(search_endpoint, user_agent, params=params, use_delay=True)
# # if isinstance(decks_data, dict) and 'data' in decks_data and isinstance(decks_data['data'], list):
# # if not decks_data['data']:
# # has_more_pages = False
# # print(f"Page {page_number} returned no data for commander '{commander_name}'. Stopping iteration.")
# # else:
# # retrieved_count = len(decks_data['data'])
# # total_decks_found += retrieved_count
# # matching_commander_decks.extend(decks_data['data']) # All decks on this page should match
# # percentage = (page_number / max_pages) * 100 if max_pages > 0 else 0
# # print(f"Page: {page_number}/{max_pages} | Found matching Commanders: {len(matching_commander_decks)}/{total_decks_found} ({percentage:.2f}%)", end='\r')
# # page_number += 1
# # else:
# # print(f"Error: Unexpected response structure on page {page_number}.")
# # has_more_pages = False
# # return None
# # if page_number > max_pages:
# # print(f"\nReached maximum page limit ({max_pages}). Stopping iteration.")
# # break
# # print(f"\nTotal Commander decks found with commander '{commander_name}': {len(matching_commander_decks)}")
# # return matching_commander_decks
# # def get_deck_card_names(deck_details):
# # """
# # Extracts the card names (Commander and Mainboard) from a single deck details dictionary,
# # correctly identifying the designated commander from the single element within the 'cards' dictionary.
# # Args:
# # deck_details (dict): The full details of a single Moxfield deck.
# # Returns:
# # dict: A dictionary containing the public ID and a list of card names, or None if no cards found.
# # """
# # card_names = []
# # public_id = deck_details.get('publicId')
# # if not public_id:
# # print("\nWarning: Deck object does not contain 'publicId'.")
# # return None
# # # Extract designated commander name from the single element in the 'cards' dictionary
# # commander_cards_dict = deck_details.get('boards', {}).get('commanders', {}).get('cards', {})
# # if isinstance(commander_cards_dict, dict):
# # if commander_cards_dict: # Check if the dictionary is not empty
# # # Get the first (and assumedly only) value in the dictionary
# # commander_info = next(iter(commander_cards_dict.values()), None)
# # if isinstance(commander_info, dict) and 'card' in commander_info and isinstance(commander_info['card'], dict) and 'name' in commander_info['card']:
# # commander_name = commander_info['card']['name']
# # if commander_name:
# # card_names.append(f"Commander:\n{commander_name}")
# # else:
# # print(f"\nWarning: Commander name not found within the single card info for deck ID: {public_id}.")
# # else:
# # print(f"\nWarning: Unexpected structure for the single commander card info for deck ID: {public_id}.")
# # else:
# # print(f"\nWarning: No entries found in 'boards.commanders.cards' for deck ID: {public_id}.")
# # else:
# # print(f"\nWarning: 'boards.commanders.cards' is not a dictionary for deck ID: {public_id}.")
# # # Extract other card names from mainboard
# # mainboard_cards = deck_details.get('boards', {}).get('mainboard', {}).get('cards', {})
# # main_cards = [] #create a list to hold the mainboard cards
# # for card_data in mainboard_cards.values():
# # quantity = card_data.get('quantity', 1) # Default to 1 if quantity is missing
# # card_name = card_data.get('card', {}).get('name')
# # if card_name:
# # main_cards.append(f"{quantity} {card_name}") #append the mainboard cards to the list
# # else:
# # print(f"\nWarning: Card name not found in mainboard for deck ID: {public_id}.")
# # if main_cards:
# # card_names.append("\nMain:") #add main: only if there are cards
# # card_names.extend(main_cards)
# # if card_names:
# # return {public_id: card_names}
# # else:
# # print(f"\nWarning: No cards found for deck ID: {public_id}. Skipping.")
# # return None
# # def process_and_save_deck_names(deck_objects, user_agent, output_folder='DeckLists', save_interval=25):
# # """
# # Processes a list of deck objects, retrieves card names, and saves them to TXT files
# # in batches to manage memory usage.
# # Args:
# # deck_objects (list): A list of deck objects (returned by find_commander_decks_by_name).
# # user_agent (str): The user agent string for API requests.
# # output_folder (str, optional): The folder to save TXT files. Defaults to 'DeckLists'.
# # save_interval (int, optional): The number of decks to process before saving. Defaults to 25.
# # Returns:
# # list or None: A list of filepaths of the saved TXT files, or None if no files were saved.
# # """
# # if not deck_objects:
# # print("No deck objects to process.")
# # return None
# # os.makedirs(output_folder, exist_ok=True)
# # saved_files = []
# # processed_decks = {}
# # total_decks = len(deck_objects)
# # processed_count = 0
# # for deck in deck_objects:
# # public_id = deck.get('publicId')
# # if public_id:
# # details_endpoint = f"/v3/decks/all/{public_id}"
# # deck_details = make_moxfield_api_request(details_endpoint, user_agent, use_delay=True)
# # if deck_details:
# # card_names_data = get_deck_card_names(deck_details)
# # if card_names_data:
# # processed_decks.update(card_names_data)
# # processed_count += 1
# # percentage = (processed_count / total_decks) * 100
# # print(f"Processed: {processed_count}/{total_decks} decks ({percentage:.2f}%)", end='\r')
# # if processed_count % save_interval == 0 or processed_count == total_decks:
# # print(f"\nSaving batch of {len(processed_decks)} deck lists...")
# # for pid, names in processed_decks.items():
# # filename = f"{pid}.txt"
# # filepath = os.path.join(output_folder, filename)
# # try:
# # with open(filepath, 'w') as f:
# # for card_info in names:
# # f.write(card_info + '\n')
# # saved_files.append(filepath)
# # except Exception as e:
# # print(f"\nError saving deck list to file {filepath}: {e}")
# # processed_decks = {} # Clear processed decks after saving
# # else:
# # print("\nWarning: Deck object does not contain 'publicId'. Skipping.")
# # print(f"\nSuccessfully processed and saved {len(saved_files)} deck lists.")
# # return saved_files
# # if __name__ == "__main__":
# # load_dotenv()
# # user_agent = os.getenv("user_agent")
# # search_endpoint = "/v2/decks/search/"
# # save_frequency = 10
# # commander_to_find = 'Braids, Cabal Minion'
# # output_directory = f'{commander_to_find} Decks'
# # # Find Commander decks by name using API's commanderCardName parameter
# # matching_decks = find_commander_decks_by_name(search_endpoint, user_agent, commander_to_find)
# # if matching_decks:
# # print(f"\nFound {len(matching_decks)} Commander decks with '{commander_to_find}'. Processing...")
# # # Process the matching deck objects, get card names, and save in batches
# # saved_files = process_and_save_deck_names(matching_decks, user_agent, output_directory, save_frequency)
# # if saved_files:
# # print(f"\nSuccessfully saved deck lists for '{commander_to_find}' to the following files:")
# # for filepath in saved_files:
# # print(filepath)
# # else:
# # print(f"\nNo deck lists were saved for '{commander_to_find}'.")
# # else:
# # print(f"\nNo Commander decks found with '{commander_to_find}'.")
# import requests
# import json
# import time
# import os
# import socket # For basic network connectivity check
# from dotenv import load_dotenv
# def has_internet():
# """
# Checks for internet connectivity by trying to resolve a well-known host.
# """
# try:
# socket.create_connection(("8.8.8.8", 53), timeout=5) # Google's public DNS
# return True
# except OSError:
# return False
# def fetch_data_with_retry(url, headers=None, params=None, method='GET', max_retries=float('inf'), retry_delay=1):
# """
# Fetches data from a URL with indefinite retries until a network connection is available.
# Args:
# url (str): The URL to fetch.
# headers (dict, optional): HTTP headers. Defaults to None.
# params (dict, optional): Query parameters for the request. Defaults to None.
# method (str, optional): HTTP method ('GET', 'POST', etc.). Defaults to 'GET'.
# max_retries (int, optional): Maximum number of retries (inf for indefinite). Defaults to float('inf').
# retry_delay (int, optional): Delay in seconds between retries. Defaults to 1.
# Returns:
# requests.Response or None: The response object if successful, None otherwise (after max retries).
# """
# retries = 0
# while retries < max_retries:
# if has_internet():
# try:
# if method.upper() == 'GET':
# response = requests.get(url, headers=headers, params=params)
# else:
# print(f"Unsupported HTTP method: {method}")
# return None
# response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
# return response
# except requests.exceptions.RequestException as e:
# print(f"Network error during request to {url}: {e}")
# print(f"Retrying in {retry_delay} seconds...")
# time.sleep(retry_delay)
# retries += 1
# else:
# print("No internet connection. Waiting for connection...")
# time.sleep(retry_delay)
# print(f"Max retries reached. Unable to fetch data from {url}.")
# return None
# def make_moxfield_api_request(endpoint, user_agent, params=None, method='GET', use_delay=True):
# """
# Makes a request to the Moxfield API with network retry.
# Args:
# endpoint (str): The API endpoint to call (e.g., "/v2/decks/search/").
# user_agent (str): The user agent string to use.
# params (dict, optional): Query parameters for the request. Defaults to None.
# method (str, optional): HTTP method ('GET', 'POST', etc.). Defaults to 'GET'.
# use_delay (bool, optional): Whether to wait for 1 second before making the request. Defaults to False.
# Returns:
# dict or list or None: The JSON response as a dictionary or list,
# or None if an error occurred.
# """
# base_url = "https://api2.moxfield.com"
# default_user_agent = "Mozilla/5.0 (Windows NT 10.0; rv:115.0) Gecko/20100101 Firefox/115.0"
# url = f"{base_url}{endpoint}"
# headers = {
# 'User-Agent': f"{default_user_agent} {user_agent}",
# }
# if use_delay:
# time.sleep(1)
# response = fetch_data_with_retry(url, headers=headers, params=params, method=method)
# if response:
# try:
# return response.json()
# except json.JSONDecodeError:
# print(f"Error decoding JSON response from {url}: {response.text}")
# return None
# return None
# def get_card_id_by_name(card_name, user_agent):
# """
# Retrieves the Moxfield card id from the 'data' list in the card search response.
# """
# search_card_endpoint = "/v2/cards/search/"
# query = f'(({card_name}))'
# params = {'q': query}
# card_data_raw = make_moxfield_api_request(search_card_endpoint, user_agent, params=params)
# print(f"Raw card search data for '{card_name}': {card_data_raw}") # Log the raw response
# if isinstance(card_data_raw, dict) and 'data' in card_data_raw and isinstance(card_data_raw['data'], list) and card_data_raw['data']:
# first_card = card_data_raw['data'][0]
# card_id = first_card.get('id')
# print(f"Extracted card id for '{card_name}': {card_id}") # Log the extracted ID
# return card_id
# else:
# print(f"Warning: Could not find card id for '{card_name}'. Response: {card_data_raw}")
# return None
# def find_commander_decks_by_name(search_endpoint, user_agent, commander_name, max_pages=400):
# """
# Retrieves deck objects, filtering for Commander decks where the commander's
# name matches the user input, dynamically fetching the commander's card ID.
# """
# commander_card_id = get_card_id_by_name(commander_name, user_agent)
# if not commander_card_id:
# print(f"Could not retrieve card ID for '{commander_name}'. Searching by name only.")
# params = {'pageNumber': 1, 'fmt': 'commander', 'commanderCardName': commander_name}
# else:
# params = {'pageNumber': 1, 'fmt': 'commander', 'commanderCardName': commander_name, 'commanderCardId': commander_card_id}
# all_matching_commander_decks = []
# total_decks_found = 0
# page_number = 1
# has_more_pages = True
# while has_more_pages and page_number <= max_pages:
# params['pageNumber'] = page_number
# decks_data = make_moxfield_api_request(search_endpoint, user_agent, params=params, use_delay=True)
# if isinstance(decks_data, dict) and 'data' in decks_data and isinstance(decks_data['data'], list):
# if not decks_data['data']:
# has_more_pages = False
# print(f"Page {page_number} returned no data for commander '{commander_name}'. Stopping.")
# else:
# retrieved_count = len(decks_data['data'])
# total_decks_found += retrieved_count
# all_matching_commander_decks.extend(decks_data['data'])
# percentage = (page_number / max_pages) * 100 if max_pages > 0 else 0
# print(f"Page: {page_number}/{max_pages} | Found matching Commanders: {len(all_matching_commander_decks)}/{total_decks_found} ({percentage:.2f}%)", end='\r')
# page_number += 1
# else:
# print(f"Error: Unexpected response structure on page {page_number}.")
# has_more_pages = False
# return None
# if page_number > max_pages:
# print(f"\nReached maximum page limit ({max_pages}). Stopping.")
# break
# print(f"\nTotal Commander decks found with commander '{commander_name}': {len(all_matching_commander_decks)}")
# return all_matching_commander_decks
# def get_deck_card_names(deck_details):
# """
# Extracts the card names (Commander and Mainboard) from a single deck details dictionary,
# correctly identifying the designated commander from the single element within the 'cards' dictionary.
# Args:
# deck_details (dict): The full details of a single Moxfield deck.
# Returns:
# dict: A dictionary containing the public ID and a list of card names, or None if no cards found.
# """
# card_names = []
# public_id = deck_details.get('publicId')
# if not public_id:
# print("\nWarning: Deck object does not contain 'publicId'.")
# return None
# # Extract designated commander name from the single element in the 'cards' dictionary
# commander_cards_dict = deck_details.get('boards', {}).get('commanders', {}).get('cards', {})
# if isinstance(commander_cards_dict, dict):
# if commander_cards_dict: # Check if the dictionary is not empty
# # Get the first (and assumedly only) value in the dictionary
# commander_info = next(iter(commander_cards_dict.values()), None)
# if isinstance(commander_info, dict) and 'card' in commander_info and isinstance(commander_info['card'], dict) and 'name' in commander_info['card']:
# commander_name = commander_info['card']['name']
# if commander_name:
# card_names.append(f"Commander:\n{commander_name}")
# else:
# print(f"\nWarning: Commander name not found within the single card info for deck ID: {public_id}.")
# else:
# print(f"\nWarning: Unexpected structure for the single commander card info for deck ID: {public_id}.")
# else:
# print(f"\nWarning: No entries found in 'boards.commanders.cards' for deck ID: {public_id}.")
# else:
# print(f"\nWarning: 'boards.commanders.cards' is not a dictionary for deck ID: {public_id}.")
# # Extract other card names from mainboard
# mainboard_cards = deck_details.get('boards', {}).get('mainboard', {}).get('cards', {})
# main_cards = [] #create a list to hold the mainboard cards
# for card_data in mainboard_cards.values():
# quantity = card_data.get('quantity', 1) # Default to 1 if quantity is missing
# card_name = card_data.get('card', {}).get('name')
# if card_name:
# main_cards.append(f"{quantity} {card_name}") #append the mainboard cards to the list
# else:
# print(f"\nWarning: Card name not found in mainboard for deck ID: {public_id}.")
# if main_cards:
# card_names.append("\nMain:") #add main: only if there are cards
# card_names.extend(main_cards)
# if card_names:
# return {public_id: card_names}
# else:
# print(f"\nWarning: No cards found for deck ID: {public_id}. Skipping.")
# return None
# def process_and_save_deck_names(deck_objects, user_agent, output_folder='DeckLists', save_interval=25):
# """
# Processes a list of deck objects, retrieves card names, and saves them to TXT files
# in batches to manage memory usage.
# Args:
# deck_objects (list): A list of deck objects (returned by find_commander_decks_by_name).
# user_agent (str): The user agent string for API requests.
# output_folder (str, optional): The folder to save TXT files. Defaults to 'DeckLists'.
# save_interval (int, optional): The number of decks to process before saving. Defaults to 25.
# Returns:
# list or None: A list of filepaths of the saved TXT files, or None if no files were saved.
# """
# if not deck_objects:
# print("No deck objects to process.")
# return None
# os.makedirs(output_folder, exist_ok=True)
# saved_files = []
# processed_decks = {}
# total_decks = len(deck_objects)
# processed_count = 0
# for deck in deck_objects:
# public_id = deck.get('publicId')
# if public_id:
# details_endpoint = f"/v3/decks/all/{public_id}"
# deck_details = make_moxfield_api_request(details_endpoint, user_agent, use_delay=True)
# if deck_details:
# card_names_data = get_deck_card_names(deck_details)
# if card_names_data:
# processed_decks.update(card_names_data)
# processed_count += 1
# percentage = (processed_count / total_decks) * 100
# print(f"Processed: {processed_count}/{total_decks} decks ({percentage:.2f}%)", end='\r')
# if processed_count % save_interval == 0 or processed_count == total_decks:
# print(f"\nSaving batch of {len(processed_decks)} deck lists...")
# for pid, names in processed_decks.items():
# filename = f"{pid}.txt"
# filepath = os.path.join(output_folder, filename)
# try:
# with open(filepath, 'w') as f:
# for card_info in names:
# f.write(card_info + '\n')
# saved_files.append(filepath)
# except Exception as e:
# print(f"\nError saving deck list to file {filepath}: {e}")
# processed_decks = {} # Clear processed decks after saving
# else:
# print("\nWarning: Deck object does not contain 'publicId'. Skipping.")
# print(f"\nSuccessfully processed and saved {len(saved_files)} deck lists.")
# return saved_files
# if __name__ == "__main__":
# load_dotenv()
# user_agent = os.getenv("user_agent")
# search_endpoint = "/v2/decks/search/"
# save_frequency = 10
# commander_to_find = input('Which commander decks are you looking for? ')
# output_directory = f'{commander_to_find} Decks'
# # Find Commander decks by name using API's commanderCardName parameter
# matching_decks = find_commander_decks_by_name(search_endpoint, user_agent, commander_to_find)
# if matching_decks:
# print(f"\nFound {len(matching_decks)} Commander decks with '{commander_to_find}'. Processing...")
# # Process the matching deck objects, get card names, and save in batches
# saved_files = process_and_save_deck_names(matching_decks, user_agent, output_directory, save_frequency)
# if saved_files:
# print(f"\nSuccessfully saved deck lists for '{commander_to_find}' to the following files:")
# for filepath in saved_files:
# print(filepath)
# else:
# print(f"\nNo deck lists were saved for '{commander_to_find}'.")
# else:
# print(f"\nNo Commander decks found with '{commander_to_find}'.")
import requests
import json
import time
import os
import socket # For basic network connectivity check
from dotenv import load_dotenv
def has_internet():
"""
Checks for internet connectivity by trying to resolve a well-known host.
"""
try:
socket.create_connection(("8.8.8.8", 53), timeout=5) # Google's public DNS
return True
except OSError:
return False
def fetch_data_with_retry(url, headers=None, params=None, method='GET', max_retries=float('inf'), retry_delay=1):
"""
Fetches data from a URL with indefinite retries until a network connection is available.
Args:
url (str): The URL to fetch.
headers (dict, optional): HTTP headers. Defaults to None.
params (dict, optional): Query parameters for the request. Defaults to None.
method (str, optional): HTTP method ('GET', 'POST', etc.). Defaults to 'GET'.
max_retries (int, optional): Maximum number of retries (inf for indefinite). Defaults to float('inf').
retry_delay (int, optional): Delay in seconds between retries. Defaults to 1.
Returns:
requests.Response or None: The response object if successful, None otherwise (after max retries).
"""
retries = 0
while retries < max_retries:
if has_internet():
try:
if method.upper() == 'GET':
response = requests.get(url, headers=headers, params=params)
else:
print(f"Unsupported HTTP method: {method}")
return None
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
return response
except requests.exceptions.RequestException as e:
print(f"Network error during request to {url}: {e}")
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retries += 1
else:
print("No internet connection. Waiting for connection...")
time.sleep(retry_delay)
print(f"Max retries reached. Unable to fetch data from {url}.")
return None
def make_moxfield_api_request(endpoint, user_agent, params=None, method='GET', use_delay=True):
"""
Makes a request to the Moxfield API with network retry.
Args:
endpoint (str): The API endpoint to call (e.g., "/v2/decks/search/").
user_agent (str): The user agent string to use.
params (dict, optional): Query parameters for the request. Defaults to None.
method (str, optional): HTTP method ('GET', 'POST', etc.). Defaults to 'GET'.
use_delay (bool, optional): Whether to wait for 1 second before making the request. Defaults to False.
Returns:
dict or list or None: The JSON response as a dictionary or list,
or None if an error occurred.
"""
base_url = "https://api2.moxfield.com"
default_user_agent = "Mozilla/5.0 (Windows NT 10.0; rv:115.0) Gecko/20100101 Firefox/115.0"
url = f"{base_url}{endpoint}"
headers = {
'User-Agent': f"{default_user_agent} {user_agent}",
}
if use_delay:
time.sleep(1)
response = fetch_data_with_retry(url, headers=headers, params=params, method=method)
if response:
try:
return response.json()
except json.JSONDecodeError:
print(f"Error decoding JSON response from {url}: {response.text}")
return None
return None
def get_all_deck_objects(search_endpoint, user_agent, max_pages=400): #added max_pages
"""
Retrieves all deck objects from the given search endpoint, iterating through pages,
and filters for decks with "format": "commander" and a "mainboardCount" of 100.
Args:
search_endpoint (str): The Moxfield API endpoint to search for decks (e.g., "/v2/decks/search/").
user_agent (str): The user agent string to use for the API requests.
max_pages (int, optional): The maximum number of pages to retrieve. Defaults to 400.
Returns:
list or None: A list of dictionaries, where each dictionary represents a Commander deck object
with a mainboardCount of 100, or None if a critical error occurred during the process.
"""
commander_decks = []
invalid_decks_found = 0
total_decks_found = 0
page_number = 1
has_more_pages = True #changed from false to true
while has_more_pages and page_number <= max_pages: #added page_number check
paged_endpoint = f"{search_endpoint}?pageNumber={page_number}"
decks_data = make_moxfield_api_request(paged_endpoint, user_agent, use_delay=True)
if isinstance(decks_data, dict) and 'data' in decks_data and isinstance(decks_data['data'], list):
if not decks_data['data']:
has_more_pages = False # Stop if a page returns no data
print(f"Page {page_number} returned no data. Stopping iteration.")
else:
retrieved_count = len(decks_data['data'])
total_decks_found += retrieved_count
for deck in decks_data['data']:
format = deck.get("format")
mainboard_count = deck.get("mainboardCount")
if format == "commander" and mainboard_count == 100:
commander_decks.append(deck)
else:
invalid_decks_found += 1
percentage = (page_number / max_pages) * 100 if max_pages > 0 else 0
print(f"Page: {page_number}/{max_pages} | Commander: {len(commander_decks)}/{total_decks_found} ({percentage:.2f}%)", end='\r')
page_number += 1
else:
print(f"Error: Unexpected response structure on page {page_number}.")
has_more_pages = False # Stop on error, prevent potential infinite loop
return None
if page_number > max_pages:
print(f"\nReached maximum page limit ({max_pages}). Stopping iteration.")
break
print(f"\nTotal Commander decks found with 100 mainboard cards: {len(commander_decks)}")
return commander_decks
def find_commander_decks_by_name(search_endpoint, user_agent, commander_name, max_pages=2):
"""
Retrieves deck objects from the given search endpoint, filtering for Commander
decks where the commander's name matches the user input using the API's
'commanderCardName' parameter.
Args:
search_endpoint (str): The Moxfield API endpoint to search for decks.
user_agent (str): The user agent string for API requests.
commander_name (str): The exact name of the commander to search for.
max_pages (int, optional): The maximum number of pages to retrieve. Defaults to 4.
Returns:
list or None: A list of dictionaries, where each dictionary represents a Commander
deck object with the specified commander, or None if an error occurred.
"""
matching_commander_decks = []
total_decks_found = 0
page_number = 1
has_more_pages = True
while has_more_pages and page_number <= max_pages:
params = {
'pageNumber': page_number,
'fmt': 'commander',
'commanderCardName': commander_name,
}
decks_data = make_moxfield_api_request(search_endpoint, user_agent, params=params, use_delay=True)
if isinstance(decks_data, dict) and 'data' in decks_data and isinstance(decks_data['data'], list):
if not decks_data['data']:
has_more_pages = False
print(f"Page {page_number} returned no data for commander '{commander_name}'. Stopping iteration.")
else:
retrieved_count = len(decks_data['data'])
total_decks_found += retrieved_count
matching_commander_decks.extend(decks_data['data']) # All decks on this page should match
percentage = (page_number / max_pages) * 100 if max_pages > 0 else 0
print(f"Page: {page_number}/{max_pages} | Found matching Commanders: {len(matching_commander_decks)}/{total_decks_found} ({percentage:.2f}%)", end='\r')
page_number += 1
else:
print(f"Error: Unexpected response structure on page {page_number}.")
has_more_pages = False
return None
if page_number > max_pages:
print(f"\nReached maximum page limit ({max_pages}). Stopping iteration.")
break
print(f"\nTotal Commander decks found with commander '{commander_name}': {len(matching_commander_decks)}")
return matching_commander_decks
def get_deck_card_names(deck_details):
"""
Extracts the card names (Commander and Mainboard) from a single deck details dictionary,
correctly identifying the designated commander from the single element within the 'cards' dictionary.
Args:
deck_details (dict): The full details of a single Moxfield deck.
Returns:
dict: A dictionary containing the public ID and a list of card names, or None if no cards found.
"""
card_names = []
public_id = deck_details.get('publicId')
if not public_id:
print("\nWarning: Deck object does not contain 'publicId'.")
return None
# Extract designated commander name from the single element in the 'cards' dictionary
commander_cards_dict = deck_details.get('boards', {}).get('commanders', {}).get('cards', {})
if isinstance(commander_cards_dict, dict):
if commander_cards_dict: # Check if the dictionary is not empty
# Get the first (and assumedly only) value in the dictionary
commander_info = next(iter(commander_cards_dict.values()), None)
if isinstance(commander_info, dict) and 'card' in commander_info and isinstance(commander_info['card'], dict) and 'name' in commander_info['card']:
commander_name = commander_info['card']['name']
if commander_name:
card_names.append(f"Commander:\n{commander_name}")
else:
print(f"\nWarning: Commander name not found within the single card info for deck ID: {public_id}.")
else:
print(f"\nWarning: Unexpected structure for the single commander card info for deck ID: {public_id}.")
else:
print(f"\nWarning: No entries found in 'boards.commanders.cards' for deck ID: {public_id}.")
else:
print(f"\nWarning: 'boards.commanders.cards' is not a dictionary for deck ID: {public_id}.")
# Extract other card names from mainboard
mainboard_cards = deck_details.get('boards', {}).get('mainboard', {}).get('cards', {})
main_cards = [] #create a list to hold the mainboard cards
for card_data in mainboard_cards.values():
quantity = card_data.get('quantity', 1) # Default to 1 if quantity is missing
card_name = card_data.get('card', {}).get('name')
if card_name:
main_cards.append(f"{quantity} {card_name}") #append the mainboard cards to the list
else:
print(f"\nWarning: Card name not found in mainboard for deck ID: {public_id}.")
if main_cards:
card_names.append("\nMain:") #add main: only if there are cards
card_names.extend(main_cards)
if card_names:
return {public_id: card_names}
else:
print(f"\nWarning: No cards found for deck ID: {public_id}. Skipping.")
return None
def process_and_save_deck_names(deck_objects, user_agent, output_folder='DeckLists', save_interval=25):
"""
Processes a list of deck objects, retrieves card names, and saves them to TXT files
in batches to manage memory usage.
Args:
deck_objects (list): A list of deck objects (returned by find_commander_decks_by_name).
user_agent (str): The user agent string for API requests.
output_folder (str, optional): The folder to save TXT files. Defaults to 'DeckLists'.
save_interval (int, optional): The number of decks to process before saving. Defaults to 25.
Returns:
list or None: A list of filepaths of the saved TXT files, or None if no files were saved.
"""
if not deck_objects:
print("No deck objects to process.")
return None
os.makedirs(output_folder, exist_ok=True)
saved_files = []
processed_decks = {}
total_decks = len(deck_objects)
processed_count = 0
processed_log_file = os.path.join(output_folder, "processed_decks.log")
already_processed = set()
# Load already processed deck IDs if the log file exists
if os.path.exists(processed_log_file):
with open(processed_log_file, 'r') as f:
for line in f:
already_processed.add(line.strip())
print(f"Loaded {len(already_processed)} already processed decks.")
for deck in deck_objects:
public_id = deck.get('publicId')
if public_id and public_id not in already_processed:
details_endpoint = f"/v3/decks/all/{public_id}"
deck_details = make_moxfield_api_request(details_endpoint, user_agent, use_delay=True)
if deck_details:
card_names_data = get_deck_card_names(deck_details)
if card_names_data:
processed_decks.update(card_names_data)
processed_count += 1
percentage = (processed_count / total_decks) * 100
print(f"Processed: {processed_count}/{total_decks} decks ({percentage:.2f}%)", end='\r')
if processed_count % save_interval == 0 or processed_count == total_decks:
print(f"\nSaving batch of {len(processed_decks)} deck lists...")
for pid, names in processed_decks.items():
filename = f"{pid}.txt"
filepath = os.path.join(output_folder, filename)
try:
with open(filepath, 'w') as f:
for card_info in names:
f.write(card_info + '\n')
saved_files.append(filepath)
# Log the processed deck ID
with open(processed_log_file, 'a') as log_file:
log_file.write(f"{pid}\n")
except Exception as e:
print(f"\nError saving deck list to file {filepath}: {e}")
processed_decks = {} # Clear processed decks after saving
elif public_id in already_processed:
print(f"Skipping already processed deck: {public_id}", end='\r')
processed_count += 1 # Still count it for overall progress
else:
print("\nWarning: Deck object does not contain 'publicId'. Skipping.")
print(f"\nSuccessfully processed and saved {len(saved_files)} deck lists.")
return saved_files
if __name__ == "__main__":
load_dotenv()
user_agent = os.getenv("user_agent")
search_endpoint = "/v2/decks/search/"
save_frequency = 10
commander_to_find = input('Which commander decks are you looking for? ')
output_directory = f'{commander_to_find} Decks'
# Find Commander decks by name using API's commanderCardName parameter
matching_decks = find_commander_decks_by_name(search_endpoint, user_agent, commander_to_find)
if matching_decks:
print(f"\nFound {len(matching_decks)} Commander decks with '{commander_to_find}'. Processing...")
# Process the matching deck objects, get card names, and save in batches
saved_files = process_and_save_deck_names(matching_decks, user_agent, output_directory, save_frequency)
if saved_files:
print(f"\nSuccessfully saved deck lists for '{commander_to_find}' to the following files:")
for filepath in saved_files:
print(filepath)
else:
print(f"\nNo deck lists were saved for '{commander_to_find}'.")
else:
print(f"\nNo Commander decks found with '{commander_to_find}'.")