current-sec-script
🧩 Syntax:
import concurrent.futures, csv, gc, glob, hashlib, importlib, itertools, logging, os, platform, re, shutil, subprocess, sys, textwrap, threading, time, urllib.request, urllib.error, zipfile
from datetime import datetime, timedelta
from queue import PriorityQueue, Empty, Queue
from threading import Lock
from zipfile import ZipFile
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from urllib.error import HTTPError, URLError
# Native Python modulesss
native_modules = [
'csv', 'gc', 'glob', 'hashlib', 'itertools', 'logging', 'os', 're', 'shutil',
'sys', 'textwrap', 'threading', 'time', 'urllib.request', 'urllib.error', 'zipfile',
'datetime', 'queue', 'pathlib'
]
# Non-native Python modules for third-party installation
third_party_modules = [
'chardet', 'pandas', 'requests', 'bs4', 'tqdm'
]
# Constants
ROOT_DIR = "./"
FILELIST = os.path.join(ROOT_DIR, "filelist.txt")
FORMD_SOURCE_DIR = os.path.join(ROOT_DIR, "SecFormD")
NCEN_SOURCE_DIR = os.path.join(ROOT_DIR, "SecNcen")
NPORT_SOURCE_DIR = os.path.join(ROOT_DIR, "SecNport")
THRTNF_SOURCE_DIR = os.path.join(ROOT_DIR, "Sec13F")
NMFP_SOURCE_DIR = os.path.join(ROOT_DIR, "SecNmfp")
CREDIT_SOURCE_DIR = os.path.join(ROOT_DIR, "CREDITS")
EQUITY_SOURCE_DIR = os.path.join(ROOT_DIR, "EQUITY")
CFTC_EQUITY_SOURCE_DIR = os.path.join(ROOT_DIR, "CFTC_EQ")
CFTC_CREDIT_SOURCE_DIR = os.path.join(ROOT_DIR, "CFTC_CR")
CFTC_COMMODITIES_SOURCE_DIR = os.path.join(ROOT_DIR, "CFTC_CO")
CFTC_FOREX_SOURCE_DIR = os.path.join(ROOT_DIR, "FOREX")
CFTC_RATES_SOURCE_DIR = os.path.join(ROOT_DIR, "CFTC_IR")
EDGAR_SOURCE_DIR = os.path.join(ROOT_DIR, "EDGAR")
EXCHANGE_SOURCE_DIR = os.path.join(ROOT_DIR, "EXCHANGE")
INSIDER_SOURCE_DIR = os.path.join(ROOT_DIR, "INSIDERS")
NCSR_DIR = os.path.join(EDGAR_SOURCE_DIR, "NCSR") # New subdir for isolation
directories = [
INSIDER_SOURCE_DIR,
EXCHANGE_SOURCE_DIR,
EDGAR_SOURCE_DIR,
EQUITY_SOURCE_DIR,
CREDIT_SOURCE_DIR,
CFTC_CREDIT_SOURCE_DIR,
CFTC_EQUITY_SOURCE_DIR,
CFTC_COMMODITIES_SOURCE_DIR,
CFTC_FOREX_SOURCE_DIR,
CFTC_RATES_SOURCE_DIR,
NMFP_SOURCE_DIR,
THRTNF_SOURCE_DIR,
NPORT_SOURCE_DIR,
NCEN_SOURCE_DIR,
FORMD_SOURCE_DIR,
NCSR_DIR,
]
for directory in directories:
os.makedirs(directory, exist_ok=True)
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# List of User-Agent strings for rotation
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0",
"Mozilla/5.0 (X11; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0"
]
def check_and_install_modules():
os_name = platform.system()
if os_name == "Linux":
# Install pip if not already installed
try:
subprocess.check_call(["sudo", "apt", "-qq", "-y", "install", "python3-pip"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
print("Failed to install pip. Ensure you have sudo privileges.")
# For Windows and macOS, we'll rely on pip for Python packages
for module in third_party_modules:
try:
importlib.import_module(module.replace('.', '_')) # Handle modules with dots in name
print(f"{module} is already installed.")
except ImportError:
print(f"{module} is not installed.")
pip_command = [sys.executable, '-m', 'pip', 'install', module]
try:
subprocess.check_call(pip_command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print(f"{module} installed successfully.")
except subprocess.CalledProcessError:
print(f"Failed to install {module}. Please install it manually.")
def import_modules():
global chardet, concurrent, requests, BeautifulSoup, tqdm, pd
# Third-party modules
import chardet
import concurrent.futures as concurrent
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm
import pandas as pd
# Specific imports from concurrent.futures
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
def gamecock_ascii():
print(r"""
__
_________ _____ ____ ____ ____ ____ | | __
/ ___\__ \ / \_/ __ \_/ ___\/ _ \_/ ___\| |/ /W
/ /_/ > __ \| Y Y \ ___/\ \__( <_> ) \___| <
\___ (____ /__|_| /\___ >\___ >____/ \___ >__|_ |
/_____/ \/ \/ \/ \/ \/ \|
""")
def gamecat_ascii():
print(r"""
_ /\_/\
( \ / `\
) ) __| G G|
/ / /` `'.= Y)=
( ( / `"`}
\ \| \ }
\ \ ), //
'._, /'-\ ( (
\, ,))\,),)
ASBT SAYS GAME ON.
""")
def codex():
"""Introductory function to clear the screen, display ASCII art, and prompt the user."""
# ANSI escape codes for colors
COLORS = [
'\033[31m', # Red
'\033[33m', # Yellow
'\033[32m', # Green
'\033[36m', # Cyan
'\033[34m', # Blue
'\033[35m', # Magenta
]
RESET = '\033[0m' # Reset to default color
def colorize_text(text):
"""Colorize the text with a rainbow gradient."""
color_cycle = itertools.cycle(COLORS)
colored_text = ''
for char in text:
if char == '\n':
colored_text += char
else:
colored_text += next(color_cycle) + char
return colored_text + RESET
def get_terminal_width():
"""Get the current width of the terminal window."""
try:
# Get terminal size (columns, lines)
columns, _ = os.get_terminal_size()
except AttributeError:
# Default width if os.get_terminal_size() is not available (e.g., on Windows)
columns = 80
return columns
def display_text_normally(text, width=80):
"""Display the given text with word wrap and ensure newlines are preserved."""
# Split the text into lines and handle each line individually
lines = text.split('\n')
wrapped_lines = []
for line in lines:
# Wrap each line of text
wrapped_lines.append(textwrap.fill(line, width=width))
# Join the wrapped lines back together with newlines in between
wrapped_text = '\n'.join(wrapped_lines)
print(wrapped_text)
def display_hardcoded_ascii_art():
"""Display hardcoded ASCII art with rainbow gradient."""
ascii_art = """\
mmmmmmm m m mmmmmm mmm mmmm mmmm mmmmmm m m
# # # # m" " m" "m # "m # # # a
# #mmmm# #mmmmm # # # # # #mmmmm ##
# # # # # # # # # # m""m
# # # #mmmmm "mmm" #mm# #mmm" #mmmmm m" "m
"""
print(colorize_text(ascii_art))
time.sleep(3) # Show for 3 seconds
def prompt_user():
"""Prompt the user to choose between learning SEC forms, Market Instruments, or quitting."""
while True:
print("\nPlease choose an option:")
print("1. Learn about SEC forms pt. 6")
print("2. Learn about SEC forms pt. 9")
print("3. Learn about Market Instruments pt. 420")
print("Q. Quit")
choice = input("Enter 1, 2, or Q: ").strip().lower()
if choice == '1' or choice == 'sec forms':
text_content ="""
1. 10-K
- Description: The 10-K is an annual report filed by publicly traded companies to provide a comprehensive overview of the company's financial performance. It includes audited financial statements, management discussion and analysis, and details on operations, risk factors, and governance.
- Investopedia Link: https://www.investopedia.com/terms/1/10-k.asp
2. 10-K/A
- Description: The 10-K/A is an amendment to the annual 10-K report. It is used to correct or update information that was originally filed in the 10-K.
- Investopedia Link: https://www.investopedia.com/terms/1/10-k.asp
3. 10-Q
- Description: The 10-Q is a quarterly report that companies must file after the end of each of the first three quarters of their fiscal year. It provides an update on the company's financial performance, including unaudited financial statements and management discussion.
- Investopedia Link: https://www.investopedia.com/terms/1/10-q.asp
4. 10-Q/A
- Description: The 10-Q/A is an amendment to the quarterly 10-Q report. It is used to correct or update information that was originally filed in the 10-Q.
- Investopedia Link: https://www.investopedia.com/terms/1/10-q.asp
5. 8-K
- Description: The 8-K is used to report major events or corporate changes that are important to shareholders. These events can include mergers, acquisitions, bankruptcy, or changes in executives.
- Investopedia Link: https://www.investopedia.com/terms/1/8-k.asp
6. 8-K/A
- Description: The 8-K/A is an amendment to the 8-K report. It is filed to provide additional information or correct information originally reported in an 8-K.
- Investopedia Link: https://www.investopedia.com/terms/1/8-k.asp
7. DEF 14A
- Description: The DEF 14A, or Definitive Proxy Statement, provides information about matters to be voted on at a company’s annual meeting, including executive compensation, board nominees, and other significant proposals.
- Investopedia Link: https://www.investopedia.com/terms/d/definitive-proxy-statement.asp
8. DEF 14A/A
- Description: The DEF 14A/A is an amendment to the DEF 14A Proxy Statement. It is used to update or correct information originally filed in the DEF 14A.
- Investopedia Link: https://www.investopedia.com/terms/d/definitive-proxy-statement.asp
9. F-1
- Description: The F-1 is used by foreign companies seeking to list their shares on U.S. exchanges. It provides information similar to the S-1 but tailored for foreign entities.
- Investopedia Link: https://www.investopedia.com/terms/f/f-1.asp
10. F-1/A
- Description: The F-1/A is an amendment to the F-1 registration statement. It is used to update or correct information for foreign companies seeking to list their shares on U.S. exchanges.
- Investopedia Link: https://www.investopedia.com/terms/f/f-1.asp
11. Form 3
- Description: Form 3 is used by insiders of a company to report their ownership of the company's securities upon becoming an insider. It is required to be filed within 10 days of becoming an officer, director, or beneficial owner.
- Investopedia Link: https://www.investopedia.com/terms/f/form-3.asp
12. Form 3/A
- Description: The Form 3/A is an amendment to the original Form 3 filing. It is used to correct or update information regarding insider ownership.
- Investopedia Link: https://www.investopedia.com/terms/f/form-3.asp
13. Form 4
- Description: Form 4 is used to report changes in the holdings of company insiders. It must be filed within two business days of the transaction.
- Investopedia Link: https://www.investopedia.com/terms/f/form-4.asp
14. Form 4/A
- Description: The Form 4/A is an amendment to the original Form 4 filing. It is used to correct or update information regarding changes in insider holdings.
- Investopedia Link: https://www.investopedia.com/terms/f/form-4.asp
15. Form 5
- Description: Form 5 is an annual report used to disclose transactions that were not reported on Form 4, including certain gifts or changes in ownership.
- Investopedia Link: https://www.investopedia.com/terms/f/form-5.asp
16. Form 5/A
- Description: The Form 5/A is an amendment to the original Form 5 filing. It is used to correct or update information about transactions not previously reported.
- Investopedia Link: https://www.investopedia.com/terms/f/form-5.asp
17. Form ADV
- Description: Form ADV is filed by investment advisers to register with the SEC and state regulators. It provides details about the adviser’s business, services, and fees.
- Investopedia Link: https://www.investopedia.com/terms/f/form-adv.asp
18. Form ADV/A
- Description: Form ADV/A is an amendment to the original Form ADV filing. It is used to update or correct information about investment advisers.
- Investopedia Link: https://www.investopedia.com/terms/f/form-adv.asp
19. Form D
- Description: Form D is filed by companies offering securities that are exempt from registration under Regulation D. It includes information about the offering and the issuer.
- Investopedia Link: https://www.investopedia.com/terms/f/form-d.asp
"""
break
elif choice == '2' or choice == 'more sec forms':
text_content ="""
20. Form D/A
- Description: Form D/A is an amendment to the original Form D filing. It is used to update or correct information about securities offerings exempt from registration.
- Investopedia Link: https://www.investopedia.com/terms/f/form-d.asp
21. Form N-1A
- Description: Form N-1A is used by mutual funds to register with the SEC and provide information to investors about the fund’s investment objectives, strategies, and fees.
- Investopedia Link: https://www.investopedia.com/terms/f/form-n-1a.asp
22. Form N-1A/A
- Description: Form N-1A/A is an amendment to the original Form N-1A filing. It is used to update or correct information about mutual funds.
- Investopedia Link: https://www.investopedia.com/terms/f/form-n-1a.asp
23. Form N-CSR
- Description: Form N-CSR is filed by registered management investment companies to report their certified shareholder reports and other important financial statements.
- Investopedia Link: https://www.investopedia.com/terms/f/form-n-csr.asp
24. Form N-CSR/A
- Description: Form N-CSR/A is an amendment to the original Form N-CSR filing. It is used to update or correct information about certified shareholder reports.
- Investopedia Link: https://www.investopedia.com/terms/f/form-n-csr.asp
25. Form N-Q
- Description: Form N-Q is used by investment companies to report their portfolio holdings on a quarterly basis, providing details on the investments and their values.
- Investopedia Link: https://www.investopedia.com/terms/f/form-n-q.asp
26. Form N-Q/A
- Description: Form N-Q/A is an amendment to the original Form N-Q filing. It is used to update or correct information about investment company portfolio holdings.
- Investopedia Link: https://www.investopedia.com/terms/f/form-n-q.asp
27. 13D
- Description: Schedule 13D is filed by investors who acquire more than 5% of a company's outstanding shares. It includes information about the investor's intentions and background.
- Investopedia Link: https://www.investopedia.com/terms/s/schedule-13d.asp
28. 13D/A
- Description: Schedule 13D/A is an amendment to the original Schedule 13D filing. It is used to update or correct information about significant shareholders.
- Investopedia Link: https://www.investopedia.com/terms/s/schedule-13d.asp
29. 13G
- Description: Schedule 13G is an alternative to Schedule 13D for investors who acquire more than 5% of a company but do not intend to influence or control the company. It is typically used by passive investors.
- Investopedia Link: https://www.investopedia.com/terms/s/schedule-13g.asp
30. 13G/A
- Description: Schedule 13G/A is an amendment to the original Schedule 13G filing. It is used to update or correct information about passive investors who hold more than 5% of a company's shares.
- Investopedia Link: https://www.investopedia.com/terms/s/schedule-13g.asp
31. 13F
- Description: Form 13F is filed quarterly by institutional investment managers to disclose their holdings in publicly traded securities. It provides transparency into the investment activities of large institutional investors.
- Investopedia Link: https://www.investopedia.com/terms/1/13f.asp
32. 13F/A
- Description: Form 13F/A is an amendment to the original Form 13F filing. It is used to update or correct information regarding institutional investment holdings.
- Investopedia Link: https://www.investopedia.com/terms/1/13f.asp
33. S-1
- Description: The S-1 is a registration statement required by the SEC for companies intending to go public through an initial public offering (IPO). It includes detailed information about the company’s business model, financials, and risks.
- Investopedia Link: https://www.investopedia.com/terms/s/s-1.asp
34. S-1/A
- Description: The S-1/A is an amendment to the S-1 registration statement. It is used to update or correct information in the original S-1 filing.
- Investopedia Link: https://www.investopedia.com/terms/s/s-1.asp
35. S-3
- Description: The S-3 is a simplified registration form used by companies that already have a track record of compliance with SEC reporting requirements. It allows for faster and easier registration of securities for public sale.
- Investopedia Link: https://www.investopedia.com/terms/s/s-3.asp
36. S-3/A
- Description: The S-3/A is an amendment to the S-3 registration statement. It is used to update or correct information in the original S-3 filing.
- Investopedia Link: https://www.investopedia.com/terms/s/s-3.asp
37. S-4
- Description: The S-4 is used for registration of securities in connection with mergers, acquisitions, and other business combinations. It includes detailed information about the transaction and the companies involved.
- Investopedia Link: https://www.investopedia.com/terms/s/s-4.asp
38. S-4/A
- Description: The S-4/A is an amendment to the S-4 registration statement. It is used to update or correct information in the original S-4 filing.
- Investopedia Link: https://www.investopedia.com/terms/s/s-4.asp
"""
break
elif choice == '3' or choice == 'market instruments':
text_content ="""
Codex of Financial Instruments ver 1.42069
To avoid enslavement by the increasingly sophisticated and total control mechanisms outlined in the financial layers, free humans must adopt a multifaceted strategy that emphasizes education, decentralization, community resilience, regulatory reform, and technological empowerment. These moves collectively aim to empower individuals and communities, ensuring they retain autonomy and prevent the concentration of power that leads to total control.\n
Education and Awareness: The first line of defense against financial and societal enslavement is widespread education and awareness. People need to be informed about the complex financial instruments and control mechanisms that can potentially be used against them. This includes understanding basic financial literacy, the risks and benefits of various investment products, and the implications of emerging technologies like AI, blockchain, and quantum computing. By demystifying these elements, individuals can make informed decisions and resist manipulative financial practices.\n
Decentralization of Power: To counteract the concentration of control, promoting decentralized systems is crucial. This can be achieved through the adoption of decentralized financial technologies (DeFi), blockchain, and cryptocurrencies, which reduce reliance on centralized financial institutions and governments. Decentralized systems ensure transparency, enhance security, and empower individuals to manage their assets independently. Furthermore, supporting decentralized governance models can distribute decision-making power more evenly across society, preventing the monopolization of control by a few elites.\n
Strengthening Community Resilience: Building strong, resilient communities is essential to withstand external pressures and maintain autonomy. This involves fostering local economies through community banking, cooperative businesses, and local investment initiatives. Communities should invest in sustainable practices, such as local food production and renewable energy, to reduce dependency on external systems. Additionally, promoting social cohesion and mutual support networks can help communities collectively resist oppressive measures and support each other in times of crisis.\n
Advocacy for Regulatory Reform: Ensuring fair and transparent financial markets requires active advocacy for regulatory reforms. Individuals and communities must pressure governments to implement regulations that protect against financial manipulation, ensure corporate accountability, and promote transparency in all financial dealings. Strengthening anti-corruption measures and enhancing oversight of financial institutions can prevent abuses of power and protect the interests of the general public. Effective regulation can also mitigate the risks associated with advanced financial instruments and technologies.\n
Technological Empowerment: Embracing and harnessing technology in an ethical and controlled manner can empower individuals and communities. Investing in and promoting technologies that enhance privacy, security, and autonomy is critical. This includes using secure communication tools, privacy-focused financial platforms, and ethical AI systems that prioritize human well-being. Additionally, fostering innovation in these areas can create alternatives to the centralized technologies that may be used for control. By being proactive in technological adoption and development, free humans can stay ahead of potential threats and retain their freedom.\n
1. **Level 1 Instruments**
- **Stocks (Equities)**
- **Common Stock**: Represents ownership in a company and constitutes a claim on part of the company's profits. Common stockholders typically have voting rights.
- [Investopedia: Common Stock](https://www.investopedia.com/terms/c/commonstock.asp)\n
- **Preferred Stock**: A class of ownership with a fixed dividend, usually without voting rights. Preferred stockholders have priority over common stockholders in the event of liquidation.
- [Investopedia: Preferred Stock](https://www.investopedia.com/terms/p/preferredstock.asp)\n
- **Government Bonds**
- **Treasury Bills (T-Bills)**: Short-term government securities with maturities ranging from a few days to one year.
- [Investopedia: Treasury Bills](https://www.investopedia.com/terms/t/treasurybill.asp)\n
- **Treasury Notes (T-Notes)**: Government securities with maturities ranging from two to ten years, paying interest every six months.
- [Investopedia: Treasury Notes](https://www.investopedia.com/terms/t/treasurynote.asp)\n
- **Treasury Bonds (T-Bonds)**: Long-term government securities with maturities of 20 to 30 years, paying semiannual interest.
- [Investopedia: Treasury Bonds](https://www.investopedia.com/terms/t/treasurybond.asp)\n
- **Commodity Futures**: Contracts to buy or sell a commodity at a future date at a price agreed upon today.
- [Investopedia: Commodity Futures](https://www.investopedia.com/terms/f/futurescontract.asp)\n
- **Exchange-Traded Funds (ETFs)**: Investment funds traded on stock exchanges, much like stocks.
- [Investopedia: ETF](https://www.investopedia.com/terms/e/exchange-tradedfund-etf.asp)\n
2. **Level 2 Instruments**
- **Corporate Bonds**: Debt securities issued by corporations to raise capital. They offer higher yields but come with higher risk compared to government bonds.
- [Investopedia: Corporate Bonds](https://www.investopedia.com/terms/c/corporate-bond.asp)\n
- **Municipal Bonds**: Bonds issued by local governments or municipalities. Interest is often tax-exempt.
- [Investopedia: Municipal Bonds](https://www.investopedia.com/terms/m/municipal-bond.asp)\n
- **Interest Rate Swaps**: Contracts where parties exchange interest payments based on different interest rates.
- [Investopedia: Interest Rate Swap](https://www.investopedia.com/terms/i/interestrateswap.asp)\n
- **Currency Swaps**: Agreements to exchange principal and interest payments in different currencies.
- [Investopedia: Currency Swap](https://www.investopedia.com/terms/c/currency-swap.asp)\n
- **Credit Default Swaps (CDS)**: Contracts that provide protection against the default of a borrower.
- [Investopedia: Credit Default Swap (CDS)](https://www.investopedia.com/terms/c/creditdefaultswap.asp)\n
- **Money Market Instruments**
- **Certificates of Deposit (CDs)**: Time deposits offered by banks with a fixed interest rate and maturity date.
- [Investopedia: Certificate of Deposit (CD)](https://www.investopedia.com/terms/c/certificate-of-deposit.asp)\n
- **Commercial Paper**: Short-term unsecured promissory notes issued by corporations to raise funds.
- [Investopedia: Commercial Paper](https://www.investopedia.com/terms/c/commercialpaper.asp)\n
- **Repurchase Agreements (Repos)**: Short-term borrowing where one party sells securities to another with an agreement to repurchase them at a later date.
- [Investopedia: Repurchase Agreement (Repo)](https://www.investopedia.com/terms/r/repurchaseagreement.asp)\n
- **Spot Contracts (Forex)**: Agreements to buy or sell a currency at the current exchange rate with immediate settlement.
- [Investopedia: Spot Market](https://www.investopedia.com/terms/s/spotmarket.asp)\n
- **Forward Contracts (Forex)**: Agreements to buy or sell a currency at a specified future date at an agreed-upon rate.
- [Investopedia: Forward Contract](https://www.investopedia.com/terms/f/forwardcontract.asp)\n
3. **Level 3 Instruments**
- **Exotic Options**
- **Barrier Options**: Options that become active or void depending on whether the price of the underlying asset reaches a certain barrier level.
- [Investopedia: Barrier Option](https://www.investopedia.com/terms/b/barrier-option.asp)\n
- **Asian Options**: Options where the payoff is determined by the average price of the underlying asset over a certain period.
- [Investopedia: Asian Option](https://www.investopedia.com/terms/a/asian-option.asp)\n
- **Binary Options**: Options where the payoff is either a fixed amount or nothing at all, based on whether the underlying asset price is above or below a certain level.
- [Investopedia: Binary Option](https://www.investopedia.com/terms/b/binaryoption.asp)\n
- **Digital Options**: Similar to binary options, these offer a fixed payoff if a condition is met at expiration.
- [Investopedia: Digital Option](https://www.investopedia.com/terms/d/digital-option.asp)\n
- **Lookback Options**: Options where the payoff is based on the optimal price of the underlying asset over the life of the option.
- [Investopedia: Lookback Option](https://www.investopedia.com/terms/l/lookback-option.asp)\n
- **Chooser Options**: Options that give the holder the choice of whether to take a call or put option at a later date.
- [Investopedia: Chooser Option](https://www.investopedia.com/terms/c/chooser-option.asp)\n
- **Collateralized Debt Obligations (CDOs)**: Investment vehicles backed by a diversified pool of debt, including loans and bonds. The cash flows from the underlying assets are split into different tranches.
- [Investopedia: Collateralized Debt Obligation (CDO)](https://www.investopedia.com/terms/c/cdo.asp)\n
- **Credit-Linked Notes (CLNs)**: Debt instruments where payments are linked to the credit performance of a reference entity.
- [Investopedia: Credit-Linked Note](https://www.investopedia.com/terms/c/credit-linked-note.asp)\n
- **Mortgage-Backed Securities (MBS)**: Securities backed by a pool of mortgages. Investors receive payments derived from the underlying mortgage payments.
- [Investopedia: Mortgage-Backed Securities](https://www.investopedia.com/terms/m/mortgage-backed-securities-mbs.asp)\n
- **Structured Finance Products**
- **Asset-Backed Securities (ABS)**: Financial securities backed by a pool of assets, such as loans or receivables.
- [Investopedia: Asset-Backed Securities](https://www.investopedia.com/terms/a/asset-backed-securities-abs.asp)\n
- **Collateralized Loan Obligations (CLOs)**: A type of CDO that is backed by a pool of loans, often corporate loans.
- [Investopedia: Collateralized Loan Obligation (CLO)](https://www.investopedia.com/terms/c/collateralized-loan-obligation-clo.asp)\n
- **Longevity Swaps**: Contracts where one party pays a fixed amount in exchange for payments based on the longevity of a population or individual.
- [Investopedia: Longevity Swap](https://www.investopedia.com/terms/l/longevity-swap.asp)\n
4. **Specialty Instruments by Firm**
- **Salomon Instruments**: Instruments used by Salomon Brothers, including certain types of mortgage-backed securities and structured finance products.
- [Investopedia: Salomon Brothers](https://www.investopedia.com/terms/s/salomon-brothers.asp)\n
- **Citi Instruments**: Instruments utilized by Citigroup, including particular types of callable equity-linked notes and complex derivatives.
- [Investopedia: Citigroup](https://www.investopedia.com/terms/c/citigroup.asp)\n
- **Lehman Instruments**: Instruments used by Lehman Brothers, including specific types of collateralized debt obligations (CDOs) and bespoke derivatives.
- [Investopedia: Lehman Brothers](https://www.investopedia.com/terms/l/lehman-brothers.asp)\n
- **Bear Stearns Instruments**: Instruments utilized by Bear Stearns, including particular types of CDOs and bespoke derivatives.
- [Investopedia: Bear Stearns](https://www.investopedia.com/terms/b/bear-stearns.asp)\n"""
break
elif choice == 'q' or choice == 'quit':
print("Quitting the program.")
sys.exit() # Exit the program
else:
print("Invalid choice. Please enter 1, 2, or Q.")
return text_content
# Clear the screen before starting the display
os.system('clear' if os.name != 'nt' else 'cls')
# Display hardcoded ASCII art
display_hardcoded_ascii_art()
# Prompt the user and get the choice
text_content = prompt_user()
# Display the selected text content normally
display_text_normally(text_content)
def download_exchange_archives():
os.makedirs(EXCHANGE_SOURCE_DIR, exist_ok=True)
gamecat_ascii()
def generate_urls():
base_url = "https://www.sec.gov/files/opa/data/market-structure/metrics-individual-security-and-exchange/"
# List of specific file names
file_names = [
"individual_security_exchange_2012_q1.zip",
"individual_security_exchange_2012_q20.zip",
"individual_security_exchange_2012_q30.zip",
"individual_security_exchange_2012_q40.zip",
"individual_security_exchange_2013_q10.zip",
"individual_security_exchange_2013_q20.zip",
"individual_security_exchange_2013_q30.zip",
"individual_security_exchange_2013_q43.zip",
"individual_security_exchange_2014_q1.zip",
"individual_security_exchange_2014_q2.zip",
"individual_security_exchange_2014_q3.zip",
"individual_security_exchange_2014_q4.zip",
"individual_security_exchange_2015_q1.zip",
"individual_security_exchange_2015_q2.zip",
"individual_security_exchange_2015_q3.zip",
"individual_security_exchange_2015_q4.zip",
"individual_security_exchange_2016_q1-v2.zip",
"individual_security_exchange_2016_q2.zip",
"individual_security_exchange_2016_q3.zip",
"individual_security_exchange_2016_q4.zip",
"individual_security_exchange_2017_q1.zip",
"individual_security_exchange_2017_q2.zip",
"individual_security_exchange_2017_q3.zip",
"individual_security_exchange_2017_q4.zip",
"individual_security_exchange_2018_q1.zip",
"individual_security_exchange_2018_q2.zip",
"individual_security_exchange_2018_q3.zip",
"individual_security_exchange_2018_q4.zip",
"individual_security_exchange_2019_q1.zip",
"individual_security_exchange_2019_q2.zip",
"individual_security_exchange_2019_q3.zip",
"individual_security_exchange_2019_q4.zip",
"individual_security_exchange_2020_q1.zip",
"individual_security_exchange_2020_q2.zip",
"individual_security_exchange_2020_q3.zip",
"individual_security_exchange_2020_q4.zip",
"individual_security_exchange_2021_q1.zip",
"individual_security_exchange_2021_q2.zip",
"individual_security_exchange_2021_q3.zip",
"individual_security_exchange_2021_q4.zip",
"individual_security_exchange_2022_q1.zip",
"individual_security_exchange_2022_q2.zip",
"individual_security_exchange_2022_q3.zip",
"individual_security_exchange_2022_q4.zip",
"individual_security_exchange_2023_q1.zip",
"individual_security_exchange_2023_q2.zip",
"individual_security_exchange_2023_q3.zip",
"individual_security_exchange_2023_q4.zip",
"individual_security_exchange_2024_q1.zip",
"individual_security_exchange_2024_q2.zip",
"individual_security_exchange_2024_q3.zip"
]
def sort_key(filename):
# Extract year and quarter, handle cases where the format might differ
year_part = filename[31:35]
quarter_part = filename[36:38]
# Try to convert year to integer, if not possible, use 0 as a fallback
try:
year = int(year_part)
except ValueError:
year = 0 # or any other default value you see fit
# Use the quarter as is, or modify if needed
quarter = quarter_part
return (year, quarter)
sorted_file_names = sorted(file_names, key=sort_key)
url_list = [f"{base_url}{file_name}" for file_name in sorted_file_names]
return url_list
urls = generate_urls()
# Pass the generated URLs to download_archives
download_archives(EXCHANGE_SOURCE_DIR, FILELIST, urls)
print("Download of historical exchange volume archive completed.")
def download_insider_archives():
os.makedirs(INSIDER_SOURCE_DIR, exist_ok=True)
gamecat_ascii()
def generate_urls():
base_url = "https://www.sec.gov/files/structureddata/data/insider-transactions-data-sets/"
# List of specific file names
file_names = [
"2006q1_form345.zip",
"2006q2_form345.zip",
"2006q3_form345.zip",
"2006q4_form345.zip",
"2007q1_form345.zip",
"2007q2_form345.zip",
"2007q3_form345.zip",
"2007q4_form345.zip",
"2008q1_form345.zip",
"2008q2_form345.zip",
"2008q3_form345.zip",
"2008q4_form345.zip",
"2009q1_form345.zip",
"2009q2_form345.zip",
"2009q3_form345.zip",
"2009q4_form345.zip",
"2010q1_form345.zip",
"2010q2_form345.zip",
"2010q3_form345.zip",
"2010q4_form345.zip",
"2011q1_form345.zip",
"2011q2_form345.zip",
"2011q3_form345.zip",
"2011q4_form345.zip",
"2012q1_form345.zip",
"2012q2_form345.zip",
"2012q3_form345.zip",
"2012q4_form345.zip",
"2013q1_form345.zip",
"2013q2_form345.zip",
"2013q3_form345.zip",
"2013q4_form345.zip",
"2014q1_form345.zip",
"2014q2_form345.zip",
"2014q3_form345.zip",
"2014q4_form345.zip",
"2015q1_form345.zip",
"2015q2_form345.zip",
"2015q3_form345.zip",
"2015q4_form345.zip",
"2016q1_form345.zip",
"2016q2_form345.zip",
"2016q3_form345.zip",
"2016q4_form345.zip",
"2017q1_form345.zip",
"2017q2_form345.zip",
"2017q3_form345.zip",
"2017q4_form345.zip",
"2018q1_form345.zip",
"2018q2_form345.zip",
"2018q3_form345.zip",
"2018q4_form345.zip",
"2019q1_form345.zip",
"2019q2_form345.zip",
"2019q3_form345.zip",
"2019q4_form345.zip",
"2020q1_form345.zip",
"2020q2_form345.zip",
"2020q3_form345.zip",
"2020q4_form345.zip",
"2021q1_form345.zip",
"2021q2_form345.zip",
"2021q3_form345.zip",
"2021q4_form345.zip",
"2022q1_form345.zip",
"2022q2_form345.zip",
"2022q3_form345.zip",
"2022q4_form345.zip",
"2023q1_form345.zip",
"2023q2_form345.zip",
"2023q3_form345.zip",
"2023q4_form345.zip",
"2024q1_form345.zip",
"2024q2_form345.zip",
"2024q3_form345.zip",
]
def sort_key(filename):
# Extract year and quarter, handle cases where the format might differ
year_part = filename[31:35]
quarter_part = filename[36:38]
# Try to convert year to integer, if not possible, use 0 as a fallback
try:
year = int(year_part)
except ValueError:
year = 0 # or any other default value you see fit
# Use the quarter as is, or modify if needed
quarter = quarter_part
return (year, quarter)
sorted_file_names = sorted(file_names, key=sort_key)
url_list = [f"{base_url}{file_name}" for file_name in sorted_file_names]
return url_list
urls = generate_urls()
# Pass the generated URLs to download_archives
download_archives(INSIDER_SOURCE_DIR, FILELIST, urls)
print("Download of historical exchange volume archive completed.")
def allyourbasearebelongtous():
file_queue = Queue()
idx_file = os.path.join(EDGAR_SOURCE_DIR, "master.idx")
log_file = os.path.join(EDGAR_SOURCE_DIR, "sec_download_log.txt")
gamecat_ascii()
# Configure logging
logging.basicConfig(
level=logging.ERROR,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='error_log.txt',
filemode='w'
)
logging.error("This is an error message")
def log_progress(message):
with open(log_file, 'a') as log:
log.write(f"{datetime.now()}: {message}\n")
print(message)
def check_file_size(url):
try:
headers = {'User-Agent': "FORTHELULZ@anonops.com"} # Matching TheDoor.py
response = requests.head(url, headers=headers, timeout=10)
response.raise_for_status()
return int(response.headers.get('Content-Length', 0))
except requests.RequestException as e:
log_progress(f"Failed to get size for {url}: {e}", "FileSizeCheck")
return None
def download_file(url, download_directory):
try:
headers = {'User-Agent': "FORTHELULZ@anonops.com"} # Matching TheDoor.py
response = requests.get(url, headers=headers, timeout=60) # Longer timeout
response.raise_for_status()
content = response.content
if len(content) == 0:
log_progress(f"No content available for {url}", "Download")
return False
filename = url.split('/')[-1]
cik = url.split('/data/')[1].split('/')[0] if '/data/' in url else 'unknown'
dir_path = os.path.join(download_directory, cik)
os.makedirs(dir_path, exist_ok=True)
filepath = os.path.join(dir_path, filename)
if os.path.exists(filepath):
with open(filepath, 'rb') as file:
file_hash = hashlib.md5()
while chunk := file.read(8192):
file_hash.update(chunk)
current_md5 = file_hash.hexdigest()
log_file = os.path.join(download_directory, 'download_log.txt')
if os.path.exists(log_file):
with open(log_file, 'r') as log:
for line in log:
parts = line.strip().split(',')
if len(parts) == 4 and parts[2] == filepath:
logged_md5 = parts[3]
if current_md5 == logged_md5:
log_progress(f"FILE already downloaded. {current_md5} verified: {filepath}", "Download")
return True
with open(filepath, 'wb') as file:
file.write(content) # Full write like TheDoor.py
with open(filepath, 'rb') as file:
file_hash = hashlib.md5()
while chunk := file.read(8192):
file_hash.update(chunk)
md5_hash = file_hash.hexdigest()
log_file = os.path.join(download_directory, 'download_log.txt')
log_entry = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')},{url},{filepath},{md5_hash}\n"
with open(log_file, 'a') as log:
log.write(log_entry)
log_progress(f"Downloaded: {filepath}, Size: {os.path.getsize(filepath)} bytes, MD5: {md5_hash}", "Download")
return True
except requests.RequestException as e:
if os.path.exists(filepath) and os.path.getsize(filepath) == 0:
os.remove(filepath)
log_progress(f"Error downloading {url}: {e}", "Download")
return False
def process_line(line):
parts = line.split('|')
if len(parts) >= 5:
filename = parts[4].strip()
if filename.endswith("Filename"):
filename = filename.rsplit('/', 1)[0]
url = f"https://www.sec.gov/Archives/{filename}"
return url
return None
def extract_idx_from_zip(zip_path):
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
for file_name in zip_ref.namelist():
if file_name.endswith('.idx'):
idx_content = zip_ref.read(file_name).decode('utf-8', errors='ignore')
return '\n'.join(idx_content.split('\n')[12:])
raise FileNotFoundError("No IDX file found in ZIP archive.")
def get_user_selection(zip_files):
print("\nEnter a 4-digit year, 'qtr' for specific quarter, 'all' for all archives, or '0' to return to main menu:")
while True:
choice = input("Your choice: ").strip().lower()
if choice == '0':
return None
elif choice == 'all':
return zip_files
elif choice == 'qtr':
print("\nAvailable ZIP files:")
for i, file in enumerate(zip_files, 1):
print(f"{i}. {file}")
while True:
try:
choice = int(input("Enter the number of the ZIP file to process (or 0 to exit): "))
if choice == 0:
break
if 1 <= choice <= len(zip_files):
return [zip_files[choice - 1]]
print("Invalid choice. Please enter a number between 1 and", len(zip_files))
except ValueError:
print("Please enter a valid number.")
elif choice.isdigit() and len(choice) == 4:
year = choice
print(f"Processing files for year {year}. Enter a quarter (1-4) or press Enter for all quarters:")
quarter = input("Quarter (or press Enter for all): ").strip()
if quarter and quarter.isdigit() and 1 <= int(quarter) <= 4:
year_files = [f for f in zip_files if f.startswith(year) and f.endswith(f"_QTR{quarter}.zip")]
else:
year_files = [f for f in zip_files if f.startswith(year)]
if year_files:
print(f"Processing files for year {year}, quarter {quarter if quarter else 'all'}:")
return year_files
print(f"No files found for year {year}, quarter {quarter if quarter else 'all'}.")
else:
print("Only 4-digit year, 'qtr', 'all', or '0' accepted. For example: 1999, qtr, all")
def process_zip(zip_path):
log_progress(f"Processing {zip_path}")
idx_content = extract_idx_from_zip(zip_path)
urls = [process_line(line) for line in idx_content.split('\n') if process_line(line)]
downloaded = 0
failed = 0
total_files = len(urls)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(download_file, url, EDGAR_SOURCE_DIR) for url in urls]
for future in tqdm(concurrent.futures.as_completed(futures), total=total_files, desc=f"Processing {os.path.basename(zip_path)}"):
if future.result():
downloaded += 1
else:
failed += 1
log_progress(f"Progress: Downloaded {downloaded}/{total_files}, Failed {failed}")
log_progress(f"Finished processing {zip_path}. Downloaded {downloaded}/{total_files}, Failed {failed}")
def remove_top_lines(file_path, lines_to_remove=11):
"""Remove the top `lines_to_remove` lines from the given file."""
with open(file_path, 'r') as file:
lines = file.readlines()
with open(file_path, 'w') as file:
file.writelines(lines[lines_to_remove:])
def compile_urls(zip_directory, idx_file):
"""Compile all URLs from the archives into master.idx."""
log_progress(f"Starting URL compilation from {zip_directory} into {idx_file}")
total_zips = len([f for f in os.listdir(zip_directory) if f.endswith('.zip')])
with tqdm(total=total_zips, desc="Compiling URLs") as pbar:
for file in os.listdir(zip_directory):
if file.endswith('.zip'):
zip_path = os.path.join(zip_directory, file)
idx_file_path = extract_idx_from_zip(zip_path)
remove_top_lines(idx_file_path)
with open(idx_file_path, 'r') as f:
content = f.read()
with open(idx_file, 'a') as master_file:
master_file.write(content)
os.remove(idx_file_path)
log_progress(f"Processed ZIP file: {file}")
pbar.update(1)
log_progress(f"URL compilation completed. Processed {total_zips} ZIP files")
def scrape_sec(idx_file, download_directory):
"""Begin scraping the entire SEC."""
log_progress(f"Starting SEC scraping from {idx_file} to {download_directory}")
os.makedirs(download_directory, exist_ok=True)
with open(idx_file, 'r', encoding='utf-8', errors='ignore') as file:
lines = file.readlines()
urls = [process_line(line) for line in lines if process_line(line) is not None]
total_urls = len(urls)
log_progress(f"Found {total_urls} URLs to scrape")
def download_file_task(url):
return download_file(url, download_directory)
failed_urls = []
with ThreadPoolExecutor(max_workers=4) as executor:
with tqdm(total=total_urls, desc="Scraping SEC") as pbar:
future_to_url = {executor.submit(download_file_task, url): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
success = future.result()
if not success:
failed_urls.append(url)
log_progress(f"Processed URL: {url} {'successfully' if success else 'with errors'}")
pbar.update(1)
downloaded = total_urls - len(failed_urls)
log_progress(f"Downloaded {downloaded} files successfully")
if failed_urls:
log_progress(f"Failed to download {len(failed_urls)} files", "Scraping", {"failed_urls": failed_urls})
try:
# Ensure the master.idx file is empty or create it
with open(idx_file, 'w') as master_file:
master_file.write("") # Clear the file if it exists
zip_files = [f for f in os.listdir(EDGAR_SOURCE_DIR) if f.endswith('.zip')]
while True:
selected_zips = get_user_selection(zip_files)
if not selected_zips and zip_files:
selected_zips = zip_files
if not selected_zips:
break
total_files = sum(len([process_line(line) for line in extract_idx_from_zip(os.path.join(EDGAR_SOURCE_DIR, zip)).split('\n') if process_line(line)]) for zip in selected_zips)
log_progress(f"Total files to process across {len(selected_zips)} ZIPs: {total_files}")
for zip_file in selected_zips:
zip_path = os.path.join(EDGAR_SOURCE_DIR, zip_file)
process_zip(zip_path)
log_progress("SEC processing pipeline completed")
except Exception as e:
log_progress(f"An error occurred: {e}")
try:
# Ensure the master.idx file is empty or create it
with open(idx_file, 'w') as master_file:
master_file.write("") # Clear the file if it exists
zip_files = [f for f in os.listdir(EDGAR_SOURCE_DIR) if f.endswith('.zip')]
for zip_file in zip_files:
zip_path = os.path.join(EDGAR_SOURCE_DIR, zip_file)
try:
log_progress(f"Processing ZIP file: {zip_file}")
idx_file_path = extract_idx_from_zip(zip_path)
remove_top_lines(idx_file_path)
with open(idx_file_path, 'r') as f:
content = f.read()
file_queue.put(content)
os.remove(idx_file_path)
log_progress(f"Successfully processed ZIP file: {zip_file}")
except Exception as e:
log_progress(f"Error processing {zip_file}: {e}")
def write_to_master_file():
while not file_queue.empty():
content = file_queue.get()
with open(idx_file, 'a') as master_file:
master_file.write(content)
write_to_master_file()
log_progress("Compilation complete! uwu")
log_progress("Starting to compile URLs from ZIP files...")
start_time = time.time()
compile_urls(EDGAR_SOURCE_DIR, idx_file)
end_time = time.time()
log_progress(f"URL compilation completed in {end_time - start_time:.2f} seconds")
log_progress("Starting to scrape SEC data...")
start_time = time.time()
scrape_sec(idx_file, EDGAR_SOURCE_DIR)
end_time = time.time()
log_progress(f"SEC scraping completed in {end_time - start_time:.2f} seconds")
except Exception as e:
log_progress(f"An error occurred: {e}")
def download_credit_archives():
os.makedirs(CREDIT_SOURCE_DIR, exist_ok=True)
gamecat_ascii()
def generate_urls(start_date, end_date):
url_list = []
current_date = start_date
base_url = "https://pddata.dtcc.com/ppd/api/report/cumulative/sec/SEC_CUMULATIVE_CREDITS_"
while current_date <= end_date:
date_str = current_date.strftime('%Y_%m_%d')
url_list.append(f"{base_url}{date_str}.zip")
current_date += timedelta(days=1)
return url_list
def download_zip_with_rate_limit(url):
zip_filename = url.split('/')[-1]
temp_zip_path = os.path.join(CREDIT_SOURCE_DIR, zip_filename)
print(f"Attempting to download: {zip_filename}")
if os.path.exists(temp_zip_path):
logging.info(f"Skipping download of {zip_filename} as it already exists.")
return
try:
req = requests.get(url, stream=True)
req.raise_for_status() # Raise an exception for bad status codes
file_size = int(req.headers.get('Content-Length', 0))
with open(temp_zip_path, 'wb') as f:
for chunk in req.iter_content(chunk_size=8192):
f.write(chunk)
file_size_downloaded = os.path.getsize(temp_zip_path)
download_time = datetime.now()
logging.info(f"Downloaded: {url}")
logging.info(f"Destination: {temp_zip_path}")
logging.info(f"Timestamp: {download_time}")
logging.info(f"Size: {file_size_downloaded} bytes")
logging.info(f"Expected Size: {file_size} bytes")
logging.info(f"File size match: {file_size == file_size_downloaded}")
print(f"Successfully downloaded: {zip_filename}")
# Add a small delay to avoid overwhelming the server
time.sleep(1) # Sleep for 1 second
except requests.RequestException as e:
logging.error(f"Failed to download {url}: {e}")
print(f"Failed to download: {zip_filename}")
end_date = datetime.now().date()
start_date = end_date - timedelta(days=2*365) # Approximately 2 years back, accounting for leap years
urls = generate_urls(start_date, end_date)
with ThreadPoolExecutor(max_workers=16) as executor:
futures = [executor.submit(download_zip_with_rate_limit, url) for url in urls]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
logging.error(f"Error in thread: {e}")
print("Downloads completed.")
# Display numbered prompt for archive type selection
#print("Would you like to search? (y)es or (n)o?:")
creditquery = input("Would you like to search? (y)es or (n)o?:").strip()
if creditquery == 'y':
credits_second()
else:
print("Y not pushed. exiting.")
exit(1)
def credits_second():
gamecat_ascii()
def parse_zips_in_batches(batch_size=100):
master = pd.DataFrame() # Start with an empty dataframe
zip_files = sorted(glob.glob(os.path.join(CREDIT_SOURCE_DIR, '*.zip')), key=lambda x: os.path.basename(x))
total_files = len(zip_files)
results_count = 0
print(f"\nStarting to process {total_files} zip files...")
for i in range(0, total_files, batch_size):
batch = zip_files[i:i+batch_size]
for index, zip_file in enumerate(batch, 1):
print(f"\nProcessing file {i + index}/{total_files}: {zip_file}")
try:
with ZipFile(zip_file, 'r') as zip_ref:
csv_filename = zip_ref.namelist()[0] # Assuming only one CSV per zip
print(f"Reading CSV file: {csv_filename}")
with zip_ref.open(csv_filename) as csv_file:
df = pd.read_csv(csv_file, low_memory=False)
match_found = False
for column in df.columns:
if column in df.columns and df[column].astype(str).str.contains(search_term, case=False, na=False).any():
print(f"Matches found in column: {column}")
matching_rows = df[df[column].astype(str).str.contains(search_term, case=False, na=False)]
master = pd.concat([master, matching_rows], ignore_index=True)
results_count += len(matching_rows)
match_found = True
print(f"Added {len(matching_rows)} matching rows. Total matches so far: {results_count}")
break # We've found a match, no need to check other columns
if not match_found:
print(f"No matches found in {csv_filename}")
except Exception as e:
logging.error(f"Error processing {zip_file}: {e}")
print(f"Error occurred while processing {zip_file}. Continuing to next file.")
print(f"Current matches count: {results_count}")
# Optionally, save or perform operations on 'master' here if it's getting too large
return master, results_count
print("Press Enter when you are ready to parse the files, or type 'q' to quit.")
user_input = input()
if user_input.lower() != 'q':
search_term = input("Enter the search term: ").strip()
master, final_count = parse_zips_in_batches()
master_csv_path = os.path.join(CREDIT_SOURCE_DIR, f"filtered_{search_term.replace(' ', '_')}.csv")
master.to_csv(master_csv_path, index=False)
print(f"\nSaving results to: {master_csv_path}")
print(f"Total Matches Found: {final_count}")
logging.info(f"Parsing completed. Master file saved as {master_csv_path}")
else:
print("Exiting script.")
def download_equities_archives():
os.makedirs(EQUITY_SOURCE_DIR, exist_ok=True)
def generate_urls(start_date, end_date):
url_list = []
current_date = start_date
base_url = "https://pddata.dtcc.com/ppd/api/report/cumulative/sec/SEC_CUMULATIVE_EQUITIES_"
while current_date <= end_date:
date_str = current_date.strftime('%Y_%m_%d')
url_list.append(f"{base_url}{date_str}.zip")
current_date += timedelta(days=1)
return url_list
def download_zip(url):
zip_filename = url.split('/')[-1]
temp_zip_path = os.path.join(EQUITY_SOURCE_DIR, zip_filename)
if os.path.exists(temp_zip_path):
logging.info(f"Skipping download of {zip_filename} as it already exists.")
return
try:
req = requests.get(url, stream=True)
req.raise_for_status() # Raise an exception for bad status codes
file_size = int(req.headers.get('Content-Length', 0))
with open(temp_zip_path, 'wb') as f:
for chunk in req.iter_content(chunk_size=8192):
f.write(chunk)
file_size_downloaded = os.path.getsize(temp_zip_path)
download_time = datetime.now()
logging.info(f"Downloaded: {url}")
logging.info(f"Destination: {temp_zip_path}")
logging.info(f"Timestamp: {download_time}")
logging.info(f"Size: {file_size_downloaded} bytes")
logging.info(f"Expected Size: {file_size} bytes")
logging.info(f"File size match: {file_size == file_size_downloaded}")
except requests.RequestException as e:
logging.error(f"Failed to download {url}: {e}")
end_date = datetime.now().date()
start_date = end_date - timedelta(days=2*365) # Approximately 2 years back, accounting for leap years
urls = generate_urls(start_date, end_date)
with ThreadPoolExecutor(max_workers=16) as executor:
list(executor.map(download_zip, urls)) # Use list() to ensure all tasks are completed before moving on
print("Downloads completed.")
equitytquery = input("Would you like to search? (y)es or (n)o?:").strip()
if equitytquery == 'y':
equities_second()
else:
print("Y not pushed. exiting.")
exit(1)
def equities_second():
gamecat_ascii()
def parse_zips(search_terms):
master = pd.DataFrame() # Start with an empty dataframe
zip_files = sorted(glob.glob(os.path.join(EQUITY_SOURCE_DIR, '*.zip')), key=lambda x: os.path.basename(x)) # Sort by filename to keep dates in order
first_file_processed = False
first_headers = None
total_files = len(zip_files)
results_count = 0
print(f"\nStarting to process {total_files} zip files...")
for index, zip_file in enumerate(zip_files, 1):
print(f"\nProcessing file {index}/{total_files}: {zip_file}")
try:
with ZipFile(zip_file, 'r') as zip_ref:
csv_filename = zip_ref.namelist()[0] # Assuming only one CSV per zip
print(f"Reading CSV file: {csv_filename}")
with zip_ref.open(csv_filename) as csv_file:
df = pd.read_csv(csv_file, low_memory=False)
if not first_file_processed:
first_headers = df.columns.tolist()
first_file_processed = True
# Combine all columns into a single string per row
df['combined'] = df.apply(lambda row: ' '.join(row.astype(str)), axis=1)
# Create a pattern for OR condition
pattern = '|'.join(search_terms)
# Find rows where any search term appears
mask = df['combined'].str.contains(pattern, case=False, na=False)
matching_rows = df[mask]
# Drop the 'combined' column if it exists
if 'combined' in matching_rows.columns:
matching_rows = matching_rows.drop(columns=['combined'])
# Add to master
master = pd.concat([master, matching_rows], ignore_index=True)
results_count += len(matching_rows)
print(f"Added {len(matching_rows)} matching rows. Total matches so far: {results_count}")
except Exception as e:
logging.error(f"Error processing {zip_file}: {e}")
print(f"Error occurred while processing {zip_file}. Continuing to next file.")
print(f"Current matches count: {results_count}")
# If we have processed at least one file, ensure the CSV starts with the first file's headers
if first_headers:
master = master.reindex(columns=first_headers, fill_value=None)
return master, results_count
print("Press Enter when you are ready to parse the files, or type 'q' to quit.")
user_input = input()
if user_input.lower() != 'q':
search_terms_input = input("Enter search terms separated by commas: ").strip()
search_terms = [term.strip() for term in search_terms_input.split(',')]
if not search_terms:
print("No search terms provided. Exiting.")
return
master, final_count = parse_zips(search_terms)
safe_terms = [re.sub(r'\W+', '_', term) for term in search_terms]
master_csv_path = os.path.join(EQUITY_SOURCE_DIR, f"filtered_{'_'.join(safe_terms)}.csv")
master.to_csv(master_csv_path, index=False)
print(f"\nSaving results to: {master_csv_path}")
print(f"Total Matches Found: {final_count}")
logging.info(f"Parsing completed. Master file saved as {master_csv_path}")
else:
print("Exiting script.")
def download_cftc_credit_archives():
os.makedirs(CFTC_CREDIT_SOURCE_DIR, exist_ok=True)
gamecat_ascii()
def generate_urls(start_date, end_date):
url_list = []
current_date = start_date
base_url = "https://pddata.dtcc.com/ppd/api/report/cumulative/cftc/CFTC_CUMULATIVE_CREDITS_"
while current_date <= end_date:
date_str = current_date.strftime('%Y_%m_%d')
url_list.append(f"{base_url}{date_str}.zip")
current_date += timedelta(days=1)
return url_list
def download_zip_with_rate_limit(url):
zip_filename = url.split('/')[-1]
temp_zip_path = os.path.join(CFTC_CREDIT_SOURCE_DIR, zip_filename)
print(f"Attempting to download: {zip_filename}")
if os.path.exists(temp_zip_path):
logging.info(f"Skipping download of {zip_filename} as it already exists.")
return
try:
req = requests.get(url, stream=True)
req.raise_for_status() # Raise an exception for bad status codes
file_size = int(req.headers.get('Content-Length', 0))
with open(temp_zip_path, 'wb') as f:
for chunk in req.iter_content(chunk_size=8192):
f.write(chunk)
file_size_downloaded = os.path.getsize(temp_zip_path)
download_time = datetime.now()
logging.info(f"Downloaded: {url}")
logging.info(f"Destination: {temp_zip_path}")
logging.info(f"Timestamp: {download_time}")
logging.info(f"Size: {file_size_downloaded} bytes")
logging.info(f"Expected Size: {file_size} bytes")
logging.info(f"File size match: {file_size == file_size_downloaded}")
print(f"Successfully downloaded: {zip_filename}")
# Add a small delay to avoid overwhelming the server
time.sleep(1) # Sleep for 1 second
except requests.RequestException as e:
logging.error(f"Failed to download {url}: {e}")
print(f"Failed to download: {zip_filename}")
end_date = datetime.now().date()
start_date = end_date - timedelta(days=2*365) # Approximately 2 years back, accounting for leap years
urls = generate_urls(start_date, end_date)
with ThreadPoolExecutor(max_workers=16) as executor:
futures = [executor.submit(download_zip_with_rate_limit, url) for url in urls]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
logging.error(f"Error in thread: {e}")
print("Downloads completed.")
# Display numbered prompt for archive type selection
#print("Would you like to search? (y)es or (n)o?:")
creditquery = input("Would you like to search? (y)es or (n)o?:").strip()
if creditquery == 'y':
CFTC_credits_second()
else:
print("Y not pushed. exiting.")
exit(1)
def CFTC_credits_second():
gamecat_ascii()
def parse_zips_in_batches(batch_size=100):
master = pd.DataFrame() # Start with an empty dataframe
zip_files = sorted(glob.glob(os.path.join(CFTC_CREDIT_SOURCE_DIR, '*.zip')), key=lambda x: os.path.basename(x))
total_files = len(zip_files)
results_count = 0
print(f"\nStarting to process {total_files} zip files...")
for i in range(0, total_files, batch_size):
batch = zip_files[i:i+batch_size]
for index, zip_file in enumerate(batch, 1):
print(f"\nProcessing file {i + index}/{total_files}: {zip_file}")
try:
with ZipFile(zip_file, 'r') as zip_ref:
csv_filename = zip_ref.namelist()[0] # Assuming only one CSV per zip
print(f"Reading CSV file: {csv_filename}")
with zip_ref.open(csv_filename) as csv_file:
df = pd.read_csv(csv_file, low_memory=False)
match_found = False
for column in df.columns:
if column in df.columns and df[column].astype(str).str.contains(search_term, case=False, na=False).any():
print(f"Matches found in column: {column}")
matching_rows = df[df[column].astype(str).str.contains(search_term, case=False, na=False)]
master = pd.concat([master, matching_rows], ignore_index=True)
results_count += len(matching_rows)
match_found = True
print(f"Added {len(matching_rows)} matching rows. Total matches so far: {results_count}")
break # We've found a match, no need to check other columns
if not match_found:
print(f"No matches found in {csv_filename}")
except Exception as e:
logging.error(f"Error processing {zip_file}: {e}")
print(f"Error occurred while processing {zip_file}. Continuing to next file.")
print(f"Current matches count: {results_count}")
# Optionally, save or perform operations on 'master' here if it's getting too large
return master, results_count
print("Press Enter when you are ready to parse the files, or type 'q' to quit.")
user_input = input()
if user_input.lower() != 'q':
search_term = input("Enter the search term: ").strip()
master, final_count = parse_zips_in_batches()
master_csv_path = os.path.join(CFTC_CREDIT_SOURCE_DIR, f"filtered_{search_term.replace(' ', '_')}.csv")
master.to_csv(master_csv_path, index=False)
print(f"\nSaving results to: {master_csv_path}")
print(f"Total Matches Found: {final_count}")
logging.info(f"Parsing completed. Master file saved as {master_csv_path}")
else:
print("Exiting script.")
def download_cftc_commodities_archives():
os.makedirs(CFTC_COMMODITIES_SOURCE_DIR, exist_ok=True)
gamecat_ascii()
def generate_urls(start_date, end_date):
url_list = []
current_date = start_date
base_url = "https://pddata.dtcc.com/ppd/api/report/cumulative/cftc/CFTC_CUMULATIVE_COMMODITIES_"
while current_date <= end_date:
date_str = current_date.strftime('%Y_%m_%d')
url_list.append(f"{base_url}{date_str}.zip")
current_date += timedelta(days=1)
return url_list
def download_zip_with_rate_limit(url):
zip_filename = url.split('/')[-1]
temp_zip_path = os.path.join(CFTC_COMMODITIES_SOURCE_DIR, zip_filename)
print(f"Attempting to download: {zip_filename}")
if os.path.exists(temp_zip_path):
logging.info(f"Skipping download of {zip_filename} as it already exists.")
return
try:
req = requests.get(url, stream=True)
req.raise_for_status() # Raise an exception for bad status codes
file_size = int(req.headers.get('Content-Length', 0))
with open(temp_zip_path, 'wb') as f:
for chunk in req.iter_content(chunk_size=8192):
f.write(chunk)
file_size_downloaded = os.path.getsize(temp_zip_path)
download_time = datetime.now()
logging.info(f"Downloaded: {url}")
logging.info(f"Destination: {temp_zip_path}")
logging.info(f"Timestamp: {download_time}")
logging.info(f"Size: {file_size_downloaded} bytes")
logging.info(f"Expected Size: {file_size} bytes")
logging.info(f"File size match: {file_size == file_size_downloaded}")
print(f"Successfully downloaded: {zip_filename}")
# Add a small delay to avoid overwhelming the server
time.sleep(1) # Sleep for 1 second
except requests.RequestException as e:
logging.error(f"Failed to download {url}: {e}")
print(f"Failed to download: {zip_filename}")
end_date = datetime.now().date()
start_date = end_date - timedelta(days=2*365) # Approximately 2 years back, accounting for leap years
urls = generate_urls(start_date, end_date)
with ThreadPoolExecutor(max_workers=16) as executor:
futures = [executor.submit(download_zip_with_rate_limit, url) for url in urls]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
logging.error(f"Error in thread: {e}")
print("Downloads completed.")
# Display numbered prompt for archive type selection
#print("Would you like to search? (y)es or (n)o?:")
creditquery = input("Would you like to search? (y)es or (n)o?:").strip()
if creditquery == 'y':
CFTC_commodities_second()
else:
print("Y not pushed. exiting.")
exit(1)
def CFTC_commodities_second():
gamecat_ascii()
def parse_zips_in_batches(batch_size=100):
master = pd.DataFrame() # Start with an empty dataframe
zip_files = sorted(glob.glob(os.path.join(CFTC_COMMODITIES_SOURCE_DIR, '*.zip')), key=lambda x: os.path.basename(x))
total_files = len(zip_files)
results_count = 0
print(f"\nStarting to process {total_files} zip files...")
for i in range(0, total_files, batch_size):
batch = zip_files[i:i+batch_size]
for index, zip_file in enumerate(batch, 1):
print(f"\nProcessing file {i + index}/{total_files}: {zip_file}")
try:
with ZipFile(zip_file, 'r') as zip_ref:
csv_filename = zip_ref.namelist()[0] # Assuming only one CSV per zip
print(f"Reading CSV file: {csv_filename}")
with zip_ref.open(csv_filename) as csv_file:
df = pd.read_csv(csv_file, low_memory=False)
match_found = False
for column in df.columns:
if column in df.columns and df[column].astype(str).str.contains(search_term, case=False, na=False).any():
print(f"Matches found in column: {column}")
matching_rows = df[df[column].astype(str).str.contains(search_term, case=False, na=False)]
master = pd.concat([master, matching_rows], ignore_index=True)
results_count += len(matching_rows)
match_found = True
print(f"Added {len(matching_rows)} matching rows. Total matches so far: {results_count}")
break # We've found a match, no need to check other columns
if not match_found:
print(f"No matches found in {csv_filename}")
except Exception as e:
logging.error(f"Error processing {zip_file}: {e}")
print(f"Error occurred while processing {zip_file}. Continuing to next file.")
print(f"Current matches count: {results_count}")
# Optionally, save or perform operations on 'master' here if it's getting too large
return master, results_count
print("Press Enter when you are ready to parse the files, or type 'q' to quit.")
user_input = input()
if user_input.lower() != 'q':
search_term = input("Enter the search term: ").strip()
master, final_count = parse_zips_in_batches()
master_csv_path = os.path.join(CFTC_CREDIT_SOURCE_DIR, f"filtered_{search_term.replace(' ', '_')}.csv")
master.to_csv(master_csv_path, index=False)
print(f"\nSaving results to: {master_csv_path}")
print(f"Total Matches Found: {final_count}")
logging.info(f"Parsing completed. Master file saved as {master_csv_path}")
else:
print("Exiting script.")
def download_cftc_rates_archives():
os.makedirs(CFTC_RATES_SOURCE_DIR, exist_ok=True)
def generate_urls(start_date, end_date):
url_list = []
current_date = start_date
base_url = "https://pddata.dtcc.com/ppd/api/report/cumulative/cftc/CFTC_CUMULATIVE_RATES_"
while current_date <= end_date:
date_str = current_date.strftime('%Y_%m_%d')
url_list.append(f"{base_url}{date_str}.zip")
current_date += timedelta(days=1)
return url_list
def download_zip(url):
zip_filename = url.split('/')[-1]
temp_zip_path = os.path.join(CFTC_RATES_SOURCE_DIR, zip_filename)
if os.path.exists(temp_zip_path):
logging.info(f"Skipping download of {zip_filename} as it already exists.")
return
try:
req = requests.get(url, stream=True)
req.raise_for_status() # Raise an exception for bad status codes
file_size = int(req.headers.get('Content-Length', 0))
with open(temp_zip_path, 'wb') as f:
for chunk in req.iter_content(chunk_size=8192):
f.write(chunk)
file_size_downloaded = os.path.getsize(temp_zip_path)
download_time = datetime.now()
logging.info(f"Downloaded: {url}")
logging.info(f"Destination: {temp_zip_path}")
logging.info(f"Timestamp: {download_time}")
logging.info(f"Size: {file_size_downloaded} bytes")
logging.info(f"Expected Size: {file_size} bytes")
logging.info(f"File size match: {file_size == file_size_downloaded}")
except requests.RequestException as e:
logging.error(f"Failed to download {url}: {e}")
end_date = datetime.now().date()
start_date = end_date - timedelta(days=2*365) # Approximately 2 years back, accounting for leap years
urls = generate_urls(start_date, end_date)
with ThreadPoolExecutor(max_workers=16) as executor:
list(executor.map(download_zip, urls)) # Use list() to ensure all tasks are completed before moving on
print("Downloads completed.")
equitytquery = input("Would you like to search? (y)es or (n)o?:").strip()
if equitytquery == 'y':
cftc_rates_second()
else:
print("Y not pushed. exiting.")
exit(1)
def cftc_rates_second():
gamecat_ascii()
def process_zip(zip_file):
results = []
try:
with ZipFile(zip_file, 'r') as zip_ref:
csv_filename = zip_ref.namelist()[0] # Assuming only one CSV per zip
with zip_ref.open(csv_filename) as csv_file:
df = pd.read_csv(csv_file, low_memory=False)
for column in df.columns:
if column in df.columns and df[column].astype(str).str.contains(search_term, case=False, na=False).any():
matching_rows = df[df[column].astype(str).str.contains(search_term, case=False, na=False)]
results.extend(matching_rows.to_dict('records')) # Convert matching rows to list of dicts
break # We've found a match, no need to check other columns
except Exception as e:
logging.error(f"Error processing {zip_file}: {e}")
return results
def write_to_csv(results, csv_writer):
for row in results:
csv_writer.writerow(row)
# Main execution
print("Press Enter when you are ready to parse the files, or type 'q' to quit.")
user_input = input()
if user_input.lower() != 'q':
search_term = input("Enter the search term: ").strip()
zip_files = sorted(glob.glob(os.path.join(CFTC_RATES_SOURCE_DIR, '*.zip')), key=lambda x: os.path.basename(x))
total_files = len(zip_files)
print(f"\nStarting to process {total_files} zip files with 7 worker threads...")
master_csv_path = os.path.join(CFTC_RATES_SOURCE_DIR, f"filtered_{search_term.replace(' ', '_')}.csv")
with open(master_csv_path, 'w', newline='') as csvfile:
csv_writer = csv.DictWriter(csvfile, fieldnames=None) # We'll define fieldnames later
header_written = False
with ThreadPoolExecutor(max_workers=7) as executor:
# Process zip files
future_to_zip = {executor.submit(process_zip, zip_file): zip_file for zip_file in zip_files}
for future in as_completed(future_to_zip):
zip_file = future_to_zip[future]
try:
results = future.result()
if results and not header_written:
# Write header only once
csv_writer.fieldnames = results[0].keys()
csv_writer.writeheader()
header_written = True
write_to_csv(results, csv_writer)
print(f"Processed: {zip_file}")
except Exception as e:
print(f"Exception in {zip_file}: {e}")
print(f"\nSaving results to: {master_csv_path}")
logging.info(f"Parsing completed. Master file saved as {master_csv_path}")
else:
print("Exiting script.")
def download_cftc_equities_archives():
os.makedirs(CFTC_EQUITY_SOURCE_DIR, exist_ok=True)
def generate_urls(start_date, end_date):
url_list = []
current_date = start_date
base_url = "https://pddata.dtcc.com/ppd/api/report/cumulative/cftc/CFTC_CUMULATIVE_EQUITIES_"
while current_date <= end_date:
date_str = current_date.strftime('%Y_%m_%d')
url_list.append(f"{base_url}{date_str}.zip")
current_date += timedelta(days=1)
return url_list
def download_zip(url):
zip_filename = url.split('/')[-1]
temp_zip_path = os.path.join(CFTC_EQUITY_SOURCE_DIR, zip_filename)
if os.path.exists(temp_zip_path):
logging.info(f"Skipping download of {zip_filename} as it already exists.")
return
try:
req = requests.get(url, stream=True)
req.raise_for_status() # Raise an exception for bad status codes
file_size = int(req.headers.get('Content-Length', 0))
with open(temp_zip_path, 'wb') as f:
for chunk in req.iter_content(chunk_size=8192):
f.write(chunk)
file_size_downloaded = os.path.getsize(temp_zip_path)
download_time = datetime.now()
logging.info(f"Downloaded: {url}")
logging.info(f"Destination: {temp_zip_path}")
logging.info(f"Timestamp: {download_time}")
logging.info(f"Size: {file_size_downloaded} bytes")
logging.info(f"Expected Size: {file_size} bytes")
logging.info(f"File size match: {file_size == file_size_downloaded}")
except requests.RequestException as e:
logging.error(f"Failed to download {url}: {e}")
end_date = datetime.now().date()
start_date = end_date - timedelta(days=2*365) # Approximately 2 years back, accounting for leap years
urls = generate_urls(start_date, end_date)
with ThreadPoolExecutor(max_workers=16) as executor:
list(executor.map(download_zip, urls)) # Use list() to ensure all tasks are completed before moving on
print("Downloads completed.")
equitytquery = input("Would you like to search? (y)es or (n)o?:").strip()
if equitytquery == 'y':
cftc_equities_second()
else:
print("Y not pushed. exiting.")
exit(1)
def cftc_equities_second():
gamecat_ascii()
def process_zip(zip_file):
results = []
try:
with ZipFile(zip_file, 'r') as zip_ref:
csv_filename = zip_ref.namelist()[0] # Assuming only one CSV per zip
with zip_ref.open(csv_filename) as csv_file:
df = pd.read_csv(csv_file, low_memory=False)
for column in df.columns:
if column in df.columns and df[column].astype(str).str.contains(search_term, case=False, na=False).any():
matching_rows = df[df[column].astype(str).str.contains(search_term, case=False, na=False)]
results.extend(matching_rows.to_dict('records')) # Convert matching rows to list of dicts
break # We've found a match, no need to check other columns
except Exception as e:
logging.error(f"Error processing {zip_file}: {e}")
return results
def write_to_csv(results, csv_writer):
for row in results:
csv_writer.writerow(row)
# Main execution
print("Press Enter when you are ready to parse the files, or type 'q' to quit.")
user_input = input()
if user_input.lower() != 'q':
search_term = input("Enter the search term: ").strip()
zip_files = sorted(glob.glob(os.path.join(CFTC_EQUITY_SOURCE_DIR, '*.zip')), key=lambda x: os.path.basename(x))
total_files = len(zip_files)
print(f"\nStarting to process {total_files} zip files with 7 worker threads...")
master_csv_path = os.path.join(CFTC_EQUITY_SOURCE_DIR, f"filtered_{search_term.replace(' ', '_')}.csv")
with open(master_csv_path, 'w', newline='') as csvfile:
csv_writer = csv.DictWriter(csvfile, fieldnames=None) # We'll define fieldnames later
header_written = False
with ThreadPoolExecutor(max_workers=7) as executor:
# Process zip files
future_to_zip = {executor.submit(process_zip, zip_file): zip_file for zip_file in zip_files}
for future in as_completed(future_to_zip):
zip_file = future_to_zip[future]
try:
results = future.result()
if results and not header_written:
# Write header only once
csv_writer.fieldnames = results[0].keys()
csv_writer.writeheader()
header_written = True
write_to_csv(results, csv_writer)
print(f"Processed: {zip_file}")
except Exception as e:
print(f"Exception in {zip_file}: {e}")
print(f"\nSaving results to: {master_csv_path}")
logging.info(f"Parsing completed. Master file saved as {master_csv_path}")
else:
print("Exiting script.")
def download_cftc_forex_archives():
os.makedirs(CFTC_FOREX_SOURCE_DIR, exist_ok=True)
def generate_urls(start_date, end_date):
url_list = []
current_date = start_date
base_url = "https://pddata.dtcc.com/ppd/api/report/cumulative/cftc/CFTC_CUMULATIVE_FOREX_"
while current_date <= end_date:
date_str = current_date.strftime('%Y_%m_%d')
url_list.append(f"{base_url}{date_str}.zip")
current_date += timedelta(days=1)
return url_list
def download_zip(url):
zip_filename = url.split('/')[-1]
temp_zip_path = os.path.join(CFTC_FOREX_SOURCE_DIR, zip_filename)
if os.path.exists(temp_zip_path):
logging.info(f"Skipping download of {zip_filename} as it already exists.")
return
try:
req = requests.get(url, stream=True)
req.raise_for_status() # Raise an exception for bad status codes
file_size = int(req.headers.get('Content-Length', 0))
with open(temp_zip_path, 'wb') as f:
for chunk in req.iter_content(chunk_size=8192):
f.write(chunk)
file_size_downloaded = os.path.getsize(temp_zip_path)
download_time = datetime.now()
logging.info(f"Downloaded: {url}")
logging.info(f"Destination: {temp_zip_path}")
logging.info(f"Timestamp: {download_time}")
logging.info(f"Size: {file_size_downloaded} bytes")
logging.info(f"Expected Size: {file_size} bytes")
logging.info(f"File size match: {file_size == file_size_downloaded}")
except requests.RequestException as e:
logging.error(f"Failed to download {url}: {e}")
end_date = datetime.now().date()
start_date = end_date - timedelta(days=2*365) # Approximately 2 years back, accounting for leap years
urls = generate_urls(start_date, end_date)
with ThreadPoolExecutor(max_workers=16) as executor:
list(executor.map(download_zip, urls)) # Use list() to ensure all tasks are completed before moving on
print("Downloads completed.")
equitytquery = input("Would you like to search? (y)es or (n)o?:").strip()
if equitytquery == 'y':
cftc_forex_second()
else:
print("Y not pushed. exiting.")
exit(1)
def cftc_forex_second():
gamecat_ascii()
def process_zip(zip_file):
results = []
try:
with ZipFile(zip_file, 'r') as zip_ref:
csv_filename = zip_ref.namelist()[0] # Assuming only one CSV per zip
with zip_ref.open(csv_filename) as csv_file:
df = pd.read_csv(csv_file, low_memory=False)
for column in df.columns:
if column in df.columns and df[column].astype(str).str.contains(search_term, case=False, na=False).any():
matching_rows = df[df[column].astype(str).str.contains(search_term, case=False, na=False)]
results.extend(matching_rows.to_dict('records')) # Convert matching rows to list of dicts
break # We've found a match, no need to check other columns
except Exception as e:
logging.error(f"Error processing {zip_file}: {e}")
return results
def write_to_csv(results, csv_writer):
for row in results:
csv_writer.writerow(row)
# Main execution
print("Press Enter when you are ready to parse the files, or type 'q' to quit.")
user_input = input()
if user_input.lower() != 'q':
search_term = input("Enter the search term: ").strip()
zip_files = sorted(glob.glob(os.path.join(CFTC_FOREX_SOURCE_DIR, '*.zip')), key=lambda x: os.path.basename(x))
total_files = len(zip_files)
print(f"\nStarting to process {total_files} zip files with 7 worker threads...")
master_csv_path = os.path.join(CFTC_FOREX_SOURCE_DIR, f"filtered_{search_term.replace(' ', '_')}.csv")
with open(master_csv_path, 'w', newline='') as csvfile:
csv_writer = csv.DictWriter(csvfile, fieldnames=None) # We'll define fieldnames later
header_written = False
with ThreadPoolExecutor(max_workers=7) as executor:
# Process zip files
future_to_zip = {executor.submit(process_zip, zip_file): zip_file for zip_file in zip_files}
for future in as_completed(future_to_zip):
zip_file = future_to_zip[future]
try:
results = future.result()
if results and not header_written:
# Write header only once
csv_writer.fieldnames = results[0].keys()
csv_writer.writeheader()
header_written = True
write_to_csv(results, csv_writer)
print(f"Processed: {zip_file}")
except Exception as e:
print(f"Exception in {zip_file}: {e}")
print(f"\nSaving results to: {master_csv_path}")
logging.info(f"Parsing completed. Master file saved as {master_csv_path}")
else:
print("Exiting script.")
def download_ncen_archives():
BASE_URL = "https://www.sec.gov/files/dera/data/form-n-cen-data-sets/"
urls = [
BASE_URL + "2019q3_ncen.zip",
BASE_URL + "2019q4_ncen.zip",
BASE_URL + "2020q1_ncen.zip",
BASE_URL + "2020q2_ncen.zip",
BASE_URL + "2020q3_ncen.zip",
BASE_URL + "2020q4_ncen.zip",
BASE_URL + "2021q1_ncen.zip",
BASE_URL + "2021q2_ncen.zip",
BASE_URL + "2021q3_ncen.zip",
BASE_URL + "2021q4_ncen.zip",
BASE_URL + "2022q1_ncen.zip",
BASE_URL + "2022q2_ncen.zip",
BASE_URL + "2022q3_ncen.zip",
BASE_URL + "2022q4_ncen.zip",
BASE_URL + "2023q1_ncen.zip",
BASE_URL + "2023q2_ncen.zip",
BASE_URL + "2023q3_ncen.zip",
BASE_URL + "2023q4_ncen.zip",
BASE_URL + "2024q1_ncen.zip",
BASE_URL + "2024q2_ncen.zip",
BASE_URL + "2024q3_ncen.zip",
BASE_URL + "2024q4_ncen.zip",
BASE_URL + "2025q1_ncen.zip",
BASE_URL + "2025q2_ncen.zip",
]
download_archives(NCEN_SOURCE_DIR, FILELIST, urls)
def download_nport_archives():
BASE_URL = "https://www.sec.gov/files/dera/data/form-n-port-data-sets/"
urls = [
BASE_URL + "2019q4_nport.zip",
BASE_URL + "2020q1_nport.zip",
BASE_URL + "2020q2_nport.zip",
BASE_URL + "2020q3_nport.zip",
BASE_URL + "2020q4_nport.zip",
BASE_URL + "2021q1_nport.zip",
BASE_URL + "2021q2_nport.zip",
BASE_URL + "2021q3_nport.zip",
BASE_URL + "2021q4_nport.zip",
BASE_URL + "2022q1_nport.zip",
BASE_URL + "2022q2_nport.zip",
BASE_URL + "2022q3_nport.zip",
BASE_URL + "2022q4_nport.zip",
BASE_URL + "2023q1_nport.zip",
BASE_URL + "2023q2_nport.zip",
BASE_URL + "2023q3_nport.zip",
BASE_URL + "2023q4_nport.zip",
BASE_URL + "2024q1_nport.zip",
BASE_URL + "2024q2_nport.zip",
BASE_URL + "2024q3_nport.zip",
BASE_URL + "2024q4_nport.zip",
# BASE_URL + "2025q1_nport.zip",
# BASE_URL + "2025q2_nport.zip",
]
download_archives(NPORT_SOURCE_DIR, FILELIST, urls)
def download_13F_archives():
BASE_URL = "https://www.sec.gov/files/structureddata/data/form-13f-data-sets/"
urls = [
BASE_URL + "2013q2_form13f.zip",
BASE_URL + "2013q3_form13f.zip",
BASE_URL + "2013q4_form13f.zip",
BASE_URL + "2014q1_form13f.zip",
BASE_URL + "2014q2_form13f.zip",
BASE_URL + "2014q3_form13f.zip",
BASE_URL + "2014q4_form13f.zip",
BASE_URL + "2015q1_form13f.zip",
BASE_URL + "2015q2_form13f.zip",
BASE_URL + "2015q3_form13f.zip",
BASE_URL + "2015q4_form13f.zip",
BASE_URL + "2016q1_form13f.zip",
BASE_URL + "2016q2_form13f.zip",
BASE_URL + "2016q3_form13f.zip",
BASE_URL + "2016q4_form13f.zip",
BASE_URL + "2017q1_form13f.zip",
BASE_URL + "2017q2_form13f.zip",
BASE_URL + "2017q3_form13f.zip",
BASE_URL + "2017q4_form13f.zip",
BASE_URL + "2018q1_form13f.zip",
BASE_URL + "2018q2_form13f.zip",
BASE_URL + "2018q3_form13f.zip",
BASE_URL + "2018q4_form13f.zip",
BASE_URL + "2019q1_form13f.zip",
BASE_URL + "2019q2_form13f.zip",
BASE_URL + "2019q3_form13f.zip",
BASE_URL + "2019q4_form13f.zip",
BASE_URL + "2020q1_form13f.zip",
BASE_URL + "2020q2_form13f.zip",
BASE_URL + "2020q3_form13f.zip",
BASE_URL + "2020q4_form13f.zip",
BASE_URL + "2021q1_form13f.zip",
BASE_URL + "2021q2_form13f.zip",
BASE_URL + "2021q3_form13f.zip",
BASE_URL + "2021q4_form13f.zip",
BASE_URL + "2022q1_form13f.zip",
BASE_URL + "2022q2_form13f.zip",
BASE_URL + "2022q3_form13f.zip",
BASE_URL + "2022q4_form13f.zip",
BASE_URL + "2023q1_form13f.zip",
BASE_URL + "2023q2_form13f.zip",
BASE_URL + "2023q3_form13f.zip",
BASE_URL + "2023q4_form13f.zip",
BASE_URL + "01jan2024-29feb2024_form13f.zip",
BASE_URL + "01mar2024-31may2024_form13f.zip",
BASE_URL + "01jun2024-31aug2024_form13f.zip",
BASE_URL + "01sep2024-30nov2024_form13f.zip",
]
download_archives(THRTNF_SOURCE_DIR, FILELIST, urls)
def download_nmfp_archives():
BASE_URL = "https://www.sec.gov/files/dera/data/form-n-mfp-data-sets/"
urls = [
BASE_URL + "2010q4_nmfp.zip",
BASE_URL + "2011q1_nmfp.zip",
BASE_URL + "2011q2_nmfp.zip",
BASE_URL + "2011q3_nmfp.zip",
BASE_URL + "2011q4_nmfp.zip",
BASE_URL + "2012q1_nmfp.zip",
BASE_URL + "2012q2_nmfp.zip",
BASE_URL + "2012q3_nmfp.zip",
BASE_URL + "2012q4_nmfp.zip",
BASE_URL + "2013q1_nmfp.zip",
BASE_URL + "2013q2_nmfp.zip",
BASE_URL + "2013q3_nmfp.zip",
BASE_URL + "2013q4_nmfp.zip",
BASE_URL + "2014q1_nmfp.zip",
BASE_URL + "2014q2_nmfp.zip",
BASE_URL + "2014q3_nmfp.zip",
BASE_URL + "2014q4_nmfp.zip",
BASE_URL + "2015q1_nmfp.zip",
BASE_URL + "2015q2_nmfp.zip",
BASE_URL + "2015q3_nmfp.zip",
BASE_URL + "2015q4_nmfp.zip",
BASE_URL + "2016q1_nmfp.zip",
BASE_URL + "2016q2_nmfp.zip",
BASE_URL + "2016q3_nmfp.zip",
BASE_URL + "2016q4_nmfp.zip",
BASE_URL + "2017q1_nmfp.zip",
BASE_URL + "2017q2_nmfp.zip",
BASE_URL + "2017q3_nmfp.zip",
BASE_URL + "2017q4_nmfp.zip",
BASE_URL + "2018q1_nmfp.zip",
BASE_URL + "2018q2_nmfp.zip",
BASE_URL + "2018q3_nmfp.zip",
BASE_URL + "2018q4_nmfp.zip",
BASE_URL + "2019q1_nmfp.zip",
BASE_URL + "2019q2_nmfp.zip",
BASE_URL + "2019q3_nmfp.zip",
BASE_URL + "2019q4_nmfp.zip",
BASE_URL + "2020q1_nmfp.zip",
BASE_URL + "2020q2_nmfp.zip",
BASE_URL + "2020q3_nmfp.zip",
BASE_URL + "2020q4_nmfp.zip",
BASE_URL + "2021q1_nmfp.zip",
BASE_URL + "2021q2_nmfp.zip",
BASE_URL + "2021q3_nmfp.zip",
BASE_URL + "2021q4_nmfp.zip",
BASE_URL + "2022q1_nmfp.zip",
BASE_URL + "2022q2_nmfp.zip",
BASE_URL + "20221007_nmfp.zip",
BASE_URL + "20220701-20220710_nmfp",
BASE_URL + "20220808-20220908_nmfp.zip",
BASE_URL + "20221108-20221207_nmfp.zip",
BASE_URL + "20221208-20230109_nmfp.zip",
BASE_URL + "20230110-20230207_nmfp.zip",
BASE_URL + "20230208-20230307_nmfp.zip",
BASE_URL + "20230308-20230410_nmfp.zip",
BASE_URL + "20230411-20230505_nmfp.zip",
BASE_URL + "20230508-20230607_nmfp.zip",
BASE_URL + "20230608-20230711_nmfp.zip",
BASE_URL + "20230712-20230807_nmfp.zip",
BASE_URL + "20230808-20230911_nmfp.zip",
BASE_URL + "20230912-20231006_nmfp.zip",
BASE_URL + "20231010-20231107_nmfp.zip",
BASE_URL + "20231108-20231207_nmfp.zip",
BASE_URL + "20231208-20240108_nmfp.zip",
BASE_URL + "20240109-20240207_nmfp.zip",
BASE_URL + "20240208-20240307_nmfp.zip",
BASE_URL + "20240308-20240405_nmfp.zip",
BASE_URL + "20240408-20240507_nmfp.zip",
BASE_URL + "20240508-20240607_nmfp.zip",
]
download_archives(NMFP_SOURCE_DIR, FILELIST, urls)
def download_formd_archives():
BASE_URL = "https://www.sec.gov/files/structureddata/data/form-d-data-sets/"
urls = [
BASE_URL + "2008q1_d.zip",
BASE_URL + "2008q2_d_0.zip",
BASE_URL + "2008q3_d_0.zip",
BASE_URL + "2008q4_d_0.zip",
BASE_URL + "2009q1_d_0.zip",
BASE_URL + "2009q2_d_0.zip",
BASE_URL + "2009q3_d_0.zip",
BASE_URL + "2009q4_d_0.zip",
BASE_URL + "2010q1_d_0.zip",
BASE_URL + "2010q2_d_0.zip",
BASE_URL + "2010q3_d_0.zip",
BASE_URL + "2010q4_d_0.zip",
BASE_URL + "2011q1_d_0.zip",
BASE_URL + "2011q2_d_0.zip",
BASE_URL + "2011q3_d_0.zip",
BASE_URL + "2011q4_d_0.zip",
BASE_URL + "2012q1_d.zip",
BASE_URL + "2012q2_d_0.zip",
BASE_URL + "2012q3_d_0.zip",
BASE_URL + "2012q4_d_0.zip",
BASE_URL + "2013q1_d_0.zip",
BASE_URL + "2013q2_d_0.zip",
BASE_URL + "2013q3_d_0.zip",
BASE_URL + "2013q4_d_0.zip",
BASE_URL + "2014q1_d.zip",
BASE_URL + "2014q2_d.zip",
BASE_URL + "2014q3_d.zip",
BASE_URL + "2014q4_d.zip",
BASE_URL + "2015q1_d.zip",
BASE_URL + "2015q2_d.zip",
BASE_URL + "2015q3_d.zip",
BASE_URL + "2015q4_d.zip",
BASE_URL + "2016q1_d.zip",
BASE_URL + "2016q2_d.zip",
BASE_URL + "2016q3_d.zip",
BASE_URL + "2016q4_d.zip",
BASE_URL + "2017q1_d.zip",
BASE_URL + "2017q2_d.zip",
BASE_URL + "2017q3_d.zip",
BASE_URL + "2017q4_d.zip",
BASE_URL + "2018q1_d.zip",
BASE_URL + "2018q2_d.zip",
BASE_URL + "2018q3_d.zip",
BASE_URL + "2018q4_d.zip",
BASE_URL + "2019q1_d.zip",
BASE_URL + "2019q2_d.zip",
BASE_URL + "2019q3_d.zip",
BASE_URL + "2019q4_d.zip",
BASE_URL + "2020q1_d.zip",
BASE_URL + "2020q2_d.zip",
BASE_URL + "2020q3_d.zip",
BASE_URL + "2020q4_d.zip",
BASE_URL + "2021q1_d.zip",
BASE_URL + "2021q2_d.zip",
BASE_URL + "2021q3_d.zip",
BASE_URL + "2021q4_d.zip",
BASE_URL + "2022q1_d.zip",
BASE_URL + "2022q2_d.zip",
BASE_URL + "2022q3_d.zip",
BASE_URL + "2022q4_d.zip",
BASE_URL + "2023q1_d.zip",
BASE_URL + "2023q2_d.zip",
BASE_URL + "2023q3_d.zip",
BASE_URL + "2023q4_d.zip",
BASE_URL + "2024q1_d.zip",
BASE_URL + "2024q2_d.zip",
BASE_URL + "2024q3_d.zip",
]
download_archives(FORMD_SOURCE_DIR, FILELIST, urls)
def download_ncsr_filings(start_year=2004, end_year=2025, log_file=None):
"""
Parse master.idx from existing ZIP indexes in ./edgar/, filter for N-CSR,
and download only missing .txt files to prevent duplicates.
Uses sec_download_log.txt (or provided log_file) for logging new downloads.
"""
EDGAR_DIR = "./edgar"
# Define log file path inside the function (specific to this func)
# Assign first to avoid unbound issues, then override if needed
log_file = os.path.join(EDGAR_DIR, "sec_download_log.txt") if log_file is None else log_file
os.makedirs(os.path.dirname(log_file), exist_ok=True) # Ensure dir exists
logging.info(f"Parsing N-CSR from local indexes ({start_year}-{end_year})")
# Find all ZIP files
zip_pattern = os.path.join(EDGAR_DIR, "*_QTR*.zip")
zip_files = glob.glob(zip_pattern)
zip_files.sort() # Chronological order
if not zip_files:
logging.warning("No ZIP files found in %s", EDGAR_DIR)
return
all_ncsr_entries = []
for zip_path in tqdm(zip_files, desc="Parsing ZIP indexes"):
try:
with zipfile.ZipFile(zip_path, 'r') as z:
namelist = z.namelist()
if 'master.idx' not in namelist:
logging.debug(f"No master.idx in {os.path.basename(zip_path)}")
continue
# Read master.idx without extracting
with z.open('master.idx') as f:
content = f.read().decode('utf-8', errors='ignore')
lines = content.splitlines()
if not lines:
continue
# Skip header (first line)
lines = lines[1:]
# Parse ZIP name for year (handles _ or - separator)
basename = os.path.basename(zip_path)
match = re.match(r'(\d{4})[_-]QTR(\d)\.zip', basename)
if not match:
logging.debug(f"Could not parse year/QTR from {basename}")
continue
year = int(match.group(1))
if year < start_year or year > end_year:
continue
for line in lines:
parts = [p.strip() for p in line.split('|')]
if len(parts) < 5:
continue # Skip malformed lines
# Detect format and extract fields
if len(parts) == 6:
# Modern: Accession|Form|Company|CIK|Date|File Name
form_idx = 1
cik_idx = 3
date_idx = 4
path_idx = 5
accession = parts[0].replace('-', '/') # For URL construction
file_path = parts[path_idx]
file_name = os.path.basename(file_path) # Just the .txt filename
txt_url = f"https://www.sec.gov/Archives/edgar/data/{parts[cik_idx].zfill(10)}/{accession}/{file_name}"
elif len(parts) == 5:
# Historical: CIK|Company|Form|Date|File Name (full relative path)
form_idx = 2
cik_idx = 0
date_idx = 3
path_idx = 4
accession = None
file_path = parts[path_idx]
file_name = os.path.basename(file_path)
txt_url = f"https://www.sec.gov/Archives/{file_path}" # Uses full historical path
else:
continue # Unexpected format
form = parts[form_idx]
if form == "N-CSR":
cik = parts[cik_idx].zfill(10)
date_filed = parts[date_idx]
all_ncsr_entries.append({
'cik': cik,
'accession': accession or '', # Empty for historical
'file_name': file_name,
'date_filed': date_filed,
'txt_url': txt_url, # Pre-construct for easy download
'year': year
})
except Exception as e:
logging.warning(f"Error parsing {zip_path}: {e}")
continue
logging.info(f"Found {len(all_ncsr_entries)} N-CSR entries across indexes")
# Download only missing .txt files
downloaded_count = 0
for entry in tqdm(all_ncsr_entries, desc="Downloading missing N-CSR"):
txt_filename = f"{entry['cik']}_{entry['date_filed']}_{entry['file_name']}"
txt_path = os.path.join(NCSR_DIR, txt_filename)
if os.path.exists(txt_path):
logging.debug(f"Skipping (exists): {txt_filename}")
continue
# Robust download with retries and headers (adapted from your proven method)
url = entry['txt_url']
file_name = txt_filename # For logging
success = False
max_attempts = 3
for attempt in range(max_attempts):
try:
headers = {'User-Agent': "FORTHELULZ@anonops.com"} # Use your email/domain here
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=10) as response: # Increased timeout for large files
if response.getcode() == 200:
content = response.read()
file_size = len(content)
file_hash = hashlib.sha256(content).hexdigest()
logging.info(f"Successfully downloaded {file_name}. Size: {file_size} bytes. Hash: {file_hash}")
# Write content to file
with open(txt_path, 'wb') as f:
f.write(content)
downloaded_count += 1
success = True
# Log the download
with open(log_file, 'a', encoding='utf-8') as log:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log.write(f"{timestamp},{file_name},{file_size},{file_hash},{url}\n")
logging.info(f"Logged download of {file_name}")
break
else:
logging.warning(f"Failed to download {file_name}. Status: {response.getcode()}")
if response.getcode() == 429:
retry_after = response.headers.get('Retry-After', '60')
try:
sleep_time = int(retry_after)
except ValueError:
sleep_time = 60 # Default 60s for 429
logging.warning(f"Rate limited (429). Sleeping {sleep_time}s before next attempt.")
time.sleep(sleep_time)
except urllib.error.HTTPError as e:
if e.code == 429:
retry_after = e.headers.get('Retry-After', '60') if e.headers else '60'
try:
sleep_time = int(retry_after)
except ValueError:
sleep_time = 60 # Default 60s for 429
logging.warning(f"Rate limited (429) on attempt {attempt + 1} for {file_name}. Sleeping {sleep_time}s.")
if attempt < max_attempts - 1:
time.sleep(sleep_time)
continue
else:
logging.warning(f"HTTP Error {e.code} on attempt {attempt + 1} for {file_name}: {e}")
if attempt < max_attempts - 1:
time.sleep(2 ** attempt) # Exponential backoff
except urllib.error.URLError as e:
logging.warning(f"URL Error on attempt {attempt + 1} for {file_name}: {e}")
if attempt < max_attempts - 1:
time.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
logging.warning(f"Unexpected error on attempt {attempt + 1} for {file_name}: {e}")
if attempt < max_attempts - 1:
time.sleep(2 ** attempt) # Exponential backoff
if not success:
logging.error(f"Max attempts reached for {file_name}. Moving on.")
# Global rate limiting: Sleep 0.1s after every attempt to stay under 10 req/sec
time.sleep(0.1)
logging.info(f"N-CSR process complete. Downloaded {downloaded_count} new files to {NCSR_DIR}")
logging.info(f"Download history updated in {log_file}")
def download_edgar_archives():
global failed_downloads
global verbose
global edgar_url
global headers
global backup_headers
global files_found_count
global done
global base_path
gamecat_ascii()
# Create a list of all subdirectories from 1993 to 2024, including all four quarters
years = range(1993, 2025)
quarters = ["QTR1", "QTR2", "QTR3", "QTR4"]
base_url = "https://www.sec.gov/Archives/edgar/full-index"
subdirectories = [
f"{base_url}/{year}/{quarter}/master.zip"
for year in years
for quarter in quarters
if not (year == 2024 and quarter in ["QTR3", "QTR4"])
]
failed_downloads = []
processes = []
additional_urls = [
"https://raw.githubusercontent.com/ngshya/pfsm/master/data/sec_edgar_company_info.csv",
"https://www.sec.gov/Archives/edgar/cik-lookup-data.txt"
]
def check_free_space():
total_size = sum(os.path.getsize(os.path.join(EDGAR_SOURCE_DIR, f)) for f in os.listdir(EDGAR_SOURCE_DIR) if f.endswith('.zip'))
free_space = shutil.disk_usage(EDGAR_SOURCE_DIR).free
print(f"Total size needed: {total_size} bytes, Free space available: {free_space} bytes")
return free_space > total_size
def download_edgar_files():
# Download master index files
for url in tqdm(subdirectories, desc="Downloading EDGAR Master Index", unit="file"):
year, quarter = url.split('/')[-3:-1]
filename = f"{year}_{quarter}.zip"
output_path = os.path.join(EDGAR_SOURCE_DIR, filename)
if os.path.exists(output_path):
continue # Skip if already exists
for attempt in range(3):
try:
headers = {'User-Agent': "anonymous/FORTHELULZ@anonyops.com"}
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=10) as response:
with open(output_path, 'wb') as file:
file.write(response.read())
print(f"Downloaded {url} to {output_path}")
break
except (urllib.error.HTTPError, urllib.error.URLError) as e:
print(f"Attempt {attempt + 1} failed for {url}: {e}")
if attempt < 2:
time.sleep(1) # Small delay before retry
else:
print(f"Failed to download {url} after 3 attempts")
failed_downloads.append(url)
# Download additional static files
for url in tqdm(additional_urls, desc="Downloading Additional Files", unit="file"):
filename = url.split('/')[-1]
output_path = os.path.join(EDGAR_SOURCE_DIR, filename)
if os.path.exists(output_path):
continue # Skip if already exists
for attempt in range(3):
try:
headers = {'User-Agent': "anonymous/FORTHELULZ@anonyops.com"}
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=10) as response:
with open(output_path, 'wb') as file:
file.write(response.read())
print(f"Downloaded {url} to {output_path}")
break
except (urllib.error.HTTPError, urllib.error.URLError) as e:
print(f"Attempt {attempt + 1} failed for {url}: {e}")
if attempt < 2:
time.sleep(1) # Small delay before retry
else:
print(f"Failed to download {url} after 3 attempts")
failed_downloads.append(url)
# Daily index files download logic
daily_base_url = "https://www.sec.gov/Archives/edgar/daily-index/"
today = datetime.now()
end_date = today - timedelta(days=1)
daily_index_log = os.path.join(EDGAR_SOURCE_DIR, "daily-index-log.txt")
downloaded_files = {}
# Read existing log to check for downloaded files
if os.path.exists(daily_index_log) and os.path.getsize(daily_index_log) > 0:
try:
with open(daily_index_log, 'r') as log:
for line in log:
parts = line.strip().split(',')
if len(parts) == 4:
downloaded_files[parts[1]] = parts[3]
except IOError as e:
print(f"Error reading log file: {e}")
else:
print("Log file is empty or does not exist.")
# Determine current quarter and year
current_year = end_date.year
current_quarter = (end_date.month - 1) // 3 + 1
# Set start date for the current quarter
start_date = datetime(current_year, (current_quarter - 1) * 3 + 1, 1)
zip_directory = EDGAR_SOURCE_DIR
os.makedirs(zip_directory, exist_ok=True)
zip_path = os.path.join(zip_directory, f"{current_year}-QTR{current_quarter}.zip")
master_idx_file = f"{current_year}-QTR{current_quarter}.idx" # Name for the master index file
skip_dates = [datetime(2024, 7, 3), datetime(2024, 7, 4), datetime(2024, 9, 2)]
with zipfile.ZipFile(zip_path, 'w') as zipf:
master_idx_content = []
current_date = max(start_date, datetime(2024, 7, 1))
total_days = (end_date - current_date).days + 1
pbar = tqdm(total=total_days, desc="Downloading Daily Index", unit="files")
while current_date <= end_date:
if current_date.weekday() >= 5 or current_date in skip_dates:
current_date += timedelta(days=1)
pbar.update(1)
continue
file_name = f"master.{current_date.strftime('%Y%m%d')}.idx"
if file_name in downloaded_files:
current_date += timedelta(days=1)
pbar.update(1)
continue
url = f"{daily_base_url}{current_date.year}/QTR{(current_date.month-1)//3+1}/{file_name}"
max_attempts = 3
print(f"Attempting to download {url}")
for attempt in range(max_attempts):
try:
headers = {'User-Agent': "FORTHELULZ@anonops.com"}
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=3) as response:
if response.getcode() == 200:
content = response.read()
file_size = len(content)
file_hash = hashlib.sha256(content).hexdigest()
print(f"Successfully downloaded {file_name}. Size: {file_size} bytes. Hash: {file_hash}")
# Decode content here to avoid reading twice
idx_content = content.decode('utf-8').split('\n')
if not master_idx_content:
print("Setting up master index header.")
master_idx_content = idx_content[:11]
master_idx_content.extend(idx_content[11:])
# Log the download
with open(daily_index_log, 'a') as log:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log.write(f"{timestamp},{file_name},{file_size},{file_hash}\n")
print(f"Logged download of {file_name}")
break
else:
print(f"Failed to download {file_name}. Status: {response.getcode()}")
except (urllib.error.HTTPError, urllib.error.URLError) as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_attempts - 1:
time.sleep(1) # Delay before retry
else:
print(f"Max attempts reached for {file_name}. Moving on.")
current_date += timedelta(days=1)
pbar.update(1)
# Write the master index content to the zip file
if master_idx_content:
print("Writing master index to ZIP file...")
zipf.writestr(master_idx_file, '\n'.join(master_idx_content))
print(f"Master index file {master_idx_file} written to {zip_path}")
pbar.close()
print(f"\nDaily index files up to {end_date.strftime('%Y-%m-%d')} have been processed and saved to {zip_path}.")
# Main execution within download_edgar_archives
if check_free_space():
print("Enough disk space available. Proceeding with downloads.")
download_edgar_files()
else:
print("Not enough disk space. Downloads aborted.")
print("EDGAR archives download process completed.")
def edgar_second():
global failed_downloads, EDGAR_SOURCE_DIR
gamecat_ascii()
def search_master_archives(search_term, directory):
search_term = search_term.strip()
if not search_term or ' ' in search_term:
print("Invalid search term provided. Please enter a single term.")
return
# Ensure the output directory exists
Path(directory).mkdir(parents=True, exist_ok=True)
results_file = os.path.join(directory, f"{search_term}_edgar_results.csv")
zip_files = [os.path.join(root, file) for root, _, files in os.walk(directory) for file in files if file.endswith(".zip")]
with open(results_file, 'w', newline='') as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["CIK", "Company Name", "Form Type", "Date Filed", "Filename"])
# Wrap the iterable with tqdm for a progress bar
for zip_path in tqdm(zip_files, desc="Searching", unit="file"):
try:
with zipfile.ZipFile(zip_path, 'r') as zip_file:
for zip_info in zip_file.infolist():
if zip_info.filename.endswith(".idx"):
with zip_file.open(zip_info) as idx_file:
raw_data = idx_file.read()
encoding = chardet.detect(raw_data)['encoding']
lines = raw_data.decode(encoding, errors='replace').splitlines()
for line in lines:
parts = line.split('|')
if len(parts) < 5:
continue
company_name = parts[1].strip()
if search_term.lower() in company_name.lower():
csv_writer.writerow(parts)
except Exception as e:
print(f"Error processing file {zip_path}: {e}")
if os.path.exists(results_file) and os.path.getsize(results_file) > 0:
print(f"Search results saved to {results_file}")
else:
print(f"No results found for '{search_term}'")
if os.path.exists(results_file):
os.remove(results_file)
def get_valid_search_term():
forbidden_terms = {'a', 'b', 'c', 'edgar', 'www', 'https', '*', '**'}
special_terms = {'gamestop', 'cohen', 'chewy'}
deep_value_terms = {'citi', 'citigroup', 'salomon', 'lehman', 'stearns', 'barney',
'smith', 'stanley', 'traveler', 'wamu', 'jpm', 'buffet',
'goldman', 'ubs', 'suisse', 'nomura'}
while True:
search_term = input("Enter search term: ").strip().lower()
if len(search_term) == 1 or search_term.isdigit():
return None
if not search_term:
print("why did you enter a blank query? c'mon.")
continue
if (len(search_term) == 1 and search_term.isalnum()) or search_term in forbidden_terms:
print("anon, don't fucking search for that. c'mon.")
if search_term in forbidden_terms:
confirmation = input("THIS IS NOT A GOOD IDEA. YOU SURE? (y/n): ").strip().lower()
if confirmation == 'y':
return search_term
else:
continue
if search_term in deep_value_terms:
print("DOING SOME DEEP FUCKING VALUE DILIGENCE? CAN DO ANON.")
return search_term
if search_term in special_terms:
if search_term == 'gamestop' or search_term == 'cohen':
print("POWER TO THE PLAYERS!")
elif search_term == 'chewy':
print("CHEWY. INVESTMENT ADVICE THAT STICKS")
return search_term
if search_term == 'gill':
print("ONE GILL IS NOT LIKE THE OTHERS. ONE IS NOT A CAT.")
return search_term
return search_term
def search_and_prompt():
if not failed_downloads:
print("All files downloaded successfully.")
while True:
search_term = get_valid_search_term()
if search_term:
search_master_archives(search_term, EDGAR_SOURCE_DIR)
another_search = input("Would you like to search for another term? (yes/no): ").strip().lower()
if another_search not in ["yes", "y"]:
print("Game On Anon")
break
else:
print("Search term cannot be empty..")
else:
print("Some files failed to download. Please check the error list.")
# Run the search and prompt logic in a separate thread
search_thread = threading.Thread(target=search_and_prompt)
search_thread.start()
search_thread.join() # Wait for the thread to complete
def edgar_third(csv_file, method):
def download_from_csv(csv_file):
base_url = "https://www.sec.gov/Archives/"
base_download_dir = EDGAR_SOURCE_DIR
headers = {'User-Agent': "FORTHELULZ@anonops.com"} # From TheDoor.py
retries = 3
delay = 1
full_csv_path = os.path.join(base_download_dir, csv_file)
with open(full_csv_path, newline='', encoding='utf-8') as csvfile:
total_rows = sum(1 for row in csv.reader(csvfile)) - 1
with open(full_csv_path, newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
header = next(reader, None)
rows = list(reader)
if 'Download Location' not in header:
header.append('Download Location')
pbar = tqdm(total=total_rows, desc="Downloading", unit="file")
failed_downloads = []
for row in rows:
if len(row) < 5:
pbar.update(1)
continue
cik = row[0]
relative_url = row[4].lstrip('/')
url = f"{base_url}{relative_url}"
filename = os.path.basename(url)
cik_dir = os.path.join(base_download_dir, cik)
os.makedirs(cik_dir, exist_ok=True)
full_path = os.path.join(cik_dir, filename)
download_success = False
for attempt in range(retries):
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=30) as response:
if response.getcode() != 200:
raise urllib.error.HTTPError(url, response.getcode(), "Non-200 status", {}, None)
content = response.read()
if len(content) == 0:
raise ValueError("No content in response")
with open(full_path, 'wb') as file:
file.write(content)
if os.path.getsize(full_path) == 0:
os.remove(full_path)
raise ValueError("File size is 0 after write")
download_success = True
break
except (urllib.error.HTTPError, urllib.error.URLError, ValueError) as e:
print(f"Attempt {attempt + 1} failed for {url}: {e}")
if attempt < retries - 1:
time.sleep(delay * (2 ** attempt))
if download_success:
row.append(full_path)
else:
failed_downloads.append(url)
row.append('Failed')
pbar.update(1)
pbar.close()
with open(full_csv_path, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(header)
writer.writerows(rows)
base_name = os.path.splitext(os.path.basename(csv_file))[0]
html_file_name = f"{base_name}_index.html"
with open(html_file_name, 'w', encoding='utf-8') as htmlfile:
htmlfile.write('<!DOCTYPE html><html><head><title>Download Index</title></head><body><table border="1">')
htmlfile.write('<tr>' + ''.join(f'<th>{h}</th>' for h in header) + '</tr>')
for row in rows:
htmlfile.write('<tr>')
for item in row:
if item.startswith('./edgar') or item == 'Failed':
htmlfile.write(f'<td><a href="file://{os.path.abspath(item)}">{item}</a></td>' if item != 'Failed' else f'<td>{item}</td>')
else:
htmlfile.write(f'<td>{item}</td>')
htmlfile.write('</tr>')
htmlfile.write('</table></body></html>')
print(f"HTML index created: {html_file_name}")
def download_from_crawling(csv_file):
base_download_dir = EDGAR_SOURCE_DIR
ciks = set()
full_csv_path = os.path.join(base_download_dir, csv_file)
print(f"Attempting to read CSV from: {full_csv_path}")
if not os.path.exists(full_csv_path):
print(f"CSV file not found: {full_csv_path}")
return
with open(full_csv_path, newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
next(reader, None) # Skip header
for row in reader:
if len(row) > 0 and row[0].isdigit() and 1 <= len(row[0]) <= 10:
ciks.add(row[0])
if not ciks:
print(f"No valid CIKs found in {csv_file}. Check CSV format or content.")
return
print(f"Extracted CIKs: {ciks}")
def fetch_directory(url):
retries = 3
delay = 1
headers = {'User-Agent': "FORTHELULZ@anonops.com"}
for attempt in range(retries):
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=30) as response:
if response.getcode() != 200:
raise urllib.error.HTTPError(url, response.getcode(), "Non-200 status", {}, None)
return BeautifulSoup(response.read(), 'html.parser')
except (urllib.error.HTTPError, urllib.error.URLError) as e:
print(f"Attempt {attempt + 1} failed for {url}: {e}")
if attempt < retries - 1:
time.sleep(delay * (2 ** attempt))
raise Exception(f"Failed to fetch {url} after {retries} retries")
def download_file(url, directory):
retries = 3
delay = 1
for attempt in range(retries):
try:
headers = {'User-Agent': "FORTHELULZ@anonops.com"}
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=30) as response:
if response.getcode() != 200:
raise urllib.error.HTTPError(url, response.getcode(), "Non-200 status", {}, None)
content = response.read()
if len(content) == 0:
raise ValueError("No content in response")
filename = os.path.basename(url)
full_path = os.path.join(directory, filename)
with open(full_path, 'wb') as file:
file.write(content)
if os.path.getsize(full_path) == 0:
os.remove(full_path)
raise ValueError("File size is 0 after write")
print(f"Downloaded: {full_path}")
return True
except (urllib.error.HTTPError, urllib.error.URLError, ValueError) as e:
print(f"Attempt {attempt + 1} failed for {url}: {e}")
if attempt < retries - 1:
time.sleep(delay * (2 ** attempt))
print(f"Failed to download {url} after {retries} retries")
return False
def process_cik(cik):
sec_url_full = f"https://www.sec.gov/Archives/edgar/data/{cik}/"
print(f"Embarking on the quest for {sec_url_full}...")
folder_name = cik
full_download_directory = os.path.join(base_download_dir, folder_name)
os.makedirs(full_download_directory, exist_ok=True)
print(f"Full download directory: {full_download_directory}")
soup = fetch_directory(sec_url_full)
links = soup.find_all('a', href=True)
for link in links:
href = link['href']
if href.endswith('.txt') and href.startswith(f'/Archives/edgar/data/{cik}/'): # Strict .txt filter
full_url = f"https://www.sec.gov{href}"
print(f"Attempting to download: {full_url}")
download_file(full_url, full_download_directory)
os.makedirs(base_download_dir, exist_ok=True)
header = ['CIK', 'URL', 'Download Location', 'Status']
rows = []
with ThreadPoolExecutor(max_workers=4) as executor:
list(tqdm(executor.map(process_cik, ciks), total=len(ciks), desc="Processing CIKs"))
# Update CSV and HTML (simplified)
with open(csv_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(header)
for cik in ciks:
download_loc = os.path.join(base_download_dir, cik, '*.txt') if any(os.path.exists(f) for f in glob.glob(os.path.join(base_download_dir, cik, '*.txt'))) else 'Failed'
rows.append([cik, f"https://www.sec.gov/Archives/edgar/data/{cik}/", download_loc, 'Success' if download_loc != 'Failed' else 'Failed'])
writer.writerows(rows)
html_file_name = os.path.splitext(csv_file)[0] + '_index.html'
with open(html_file_name, 'w', encoding='utf-8') as htmlfile:
htmlfile.write('<!DOCTYPE html><html><head><title>Download Index</title></head><body><table border="1">')
htmlfile.write('<tr>' + ''.join(f'<th>{h}</th>' for h in header) + '</tr>')
for row in rows:
htmlfile.write('<tr>')
for item in row:
htmlfile.write(f'<td>{item}</td>' if item == 'Failed' else f'<td><a href="file://{os.path.abspath(item)}">{item}</a></td>')
htmlfile.write('</tr>')
htmlfile.write('</table></body></html>')
print(f"Quest completed for {len(ciks)} CIKs. CSV updated and HTML index created.")
if method == 'url':
download_from_csv(csv_file)
elif method == 'crawl':
download_from_crawling(csv_file)
else:
print("Unknown method for CSV extraction.")
def process_cik(cik, rows=None, base_download_dir='./EDGAR/DATA'):
if rows is None:
rows = []
sec_url_full = f"https://www.sec.gov/Archives/edgar/data/{cik}/"
print(f"Embarking on the quest for {sec_url_full}...")
folder_name = sec_url_full.rstrip('/').split('/')[-1]
full_download_directory = os.path.join(base_download_dir, folder_name)
print(f"Full download directory: {full_download_directory} - Here lies our treasure vault")
subdirectories = scrape_subdirectories(sec_url_full)
if not subdirectories:
print(f"No hidden chambers found at {sec_url_full}. Exiting this quest.")
return
full_subdirectory_urls = [f"{sec_url_full.rstrip('/')}/{sub}" for sub in subdirectories]
sanitized_file_path = 'sanitized_subdirectories.txt'
with open(sanitized_file_path, 'w') as sanitized_file:
sanitized_file.write('\n'.join(full_subdirectory_urls))
print(f"Sanitized list created: {sanitized_file_path} - The map to hidden chambers is drawn")
output_file_path = 'completed_subdirectories.txt'
if os.path.exists(output_file_path):
with open(output_file_path, 'r') as file:
completed_subdirectories = [line.strip() for line in file]
else:
completed_subdirectories = []
os.makedirs(full_download_directory, exist_ok=True)
print(f"Download directory created: {full_download_directory} - The vault is ready to receive its riches")
total_subdirectories = len(full_subdirectory_urls)
processed_subdirectories = len(completed_subdirectories)
for subdirectory in full_subdirectory_urls:
if subdirectory in completed_subdirectories:
print(f"Skipping already plundered chamber: {subdirectory}")
continue
print(f"Venturing into the chamber: {subdirectory}")
try:
soup = fetch_directory(subdirectory)
links = soup.find_all('a')
txt_links = [link.get('href') for link in links if link.get('href') and link.get('href').endswith('.txt')]
print(f"Found txt links in {subdirectory}: {txt_links} - Scrolls of lore discovered")
for txt_link in txt_links:
txt_url = "https://www.sec.gov" + txt_link
print(f"Downloading txt file: {txt_url} - Securing the scroll")
download_success = download_file(txt_url, full_download_directory)
download_location = os.path.join(full_download_directory, os.path.basename(txt_url)) if download_success else 'Failed'
rows.append([cik, txt_url, download_location, 'Success' if download_success else 'Failed'])
if download_success:
with open(output_file_path, 'a') as completed_file:
completed_file.write(subdirectory + '\n')
break
time.sleep(0.1)
except Exception as e:
print(f"Failed to access {subdirectory}: {e} - Beware, for this path is cursed!")
with open('error_log.txt', 'a') as error_log_file:
error_log_file.write(f"Failed to access {subdirectory}: {e}\n")
processed_subdirectories += 1
print(f"Progress: {processed_subdirectories}/{total_subdirectories} chambers explored.")
remaining_subdirectories = [sub for sub in full_subdirectory_urls if sub not in completed_subdirectories]
with open(sanitized_file_path, 'w') as sanitized_file:
sanitized_file.write('\n'.join(remaining_subdirectories))
print("Download complete for current CIK - The quest for this treasure trove ends.")
def fetch_directory(url):
retries=3
delay=1
verbose=True
headers = {
'User-Agent': "anonymous/FORTHELULZ@anonyops.com" # Assuming you've defined this header elsewhere
}
for attempt in range(retries):
try:
print(f"Fetching URL: {url}")
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=10) as response:
if response.getcode() != 200:
raise HTTPError(url, response.getcode(), "Non-200 status code", headers, None)
time.sleep(delay) # Slow down to avoid rate limiting
# Here we read the content and then parse it with BeautifulSoup
content = response.read()
return BeautifulSoup(content, 'html.parser')
except (HTTPError, URLError) as e:
print(f"Attempt {attempt + 1} failed for {url}: {e}")
if attempt < retries - 1: # No sleep til brooklyn
time.sleep(delay * (attempt + 1)) # Exponential backoff
raise Exception(f"Failed to fetch {url} after {retries} retries")
def scrape_subdirectories(sec_url):
soup = fetch_directory(sec_url)
rows = soup.find_all('a')
subdirectories = []
for row in rows:
href = row.get('href')
# Check if the href is a subdirectory link with 18-digit numeric names
if href and href.startswith('/Archives/edgar/data/') and len(href.strip('/').split('/')[-1]) == 18:
subdirectories.append(href.strip('/').split('/')[-1])
else:
print(f"Skipping non-matching href: {href}") # Log non-matching hrefs for debugging
print(f"Scraped subdirectories: {subdirectories}\n ")
return subdirectories
def extract_txt_links(soup):
links = soup.find_all('a')
txt_links = [link.get('href') for link in links if link.get('href') and link.get('href').endswith('.txt')]
return txt_links
def download_file(url, directory, retries=3, delay=1):
# The spell to conjure a file from the digital ether
for attempt in range(retries):
try:
headers = {
'User-Agent': "anonymous/FORTHELULZ@anonyops.com"
}
print(f"Attempting to download {url}...")
# The spell to conjure a file from the digital ether
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=10) as response:
if response.getcode() != 200:
raise HTTPError(url, response.getcode(), "Non-200 status code", headers, None)
file_content = response.read() # Store the file content
filename = os.path.join(directory, os.path.basename(url))
with open(filename, 'wb') as file:
file.write(response.read()) # Changed from response.content to response.read()
print(f"Downloaded: {filename}")
md5_hash = hashlib.md5(file_content).hexdigest()
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
log_filename = os.path.join(directory, os.path.splitext(os.path.basename(url))[0] + '-legal-source-log.txt')
with open(log_filename, 'w') as log_file:
log_file.write(f"URL: {url}\nDownloaded at: {timestamp},\n{filename} with MD5 :{md5_hash}\n")
print(f"Logged download details to {log_filename}")
file_size = os.path.getsize(filename)
print(f"File size: {file_size} bytes - the weight of this digital artifact")
return True
except (HTTPError, URLError) as e:
print(f"Attempt {attempt + 1} failed for {url}: {e} - A dragon guards this treasure!")
if attempt < retries - 1: # No need to sleep after the last attempt
time.sleep(delay * (attempt + 1))
print(f"Failed to download {url} after {retries} retries - The treasure remains elusive")
return False
def process_cik(cik):
# The URL where our quest begins
sec_url_full = f"https://www.sec.gov/Archives/edgar/data/{cik}/"
print(f"Embarking on the quest for {sec_url_full}...")
base_download_dir = './EDGAR/DATA'
folder_name = sec_url_full.rstrip('/').split('/')[-1]
full_download_directory = os.path.join(base_download_dir, folder_name)
print(f"Full download directory: {full_download_directory} - Here lies our treasure vault")
# Here we call upon the ancient rites to reveal hidden paths
subdirectories = scrape_subdirectories(sec_url_full)
if not subdirectories:
print(f"No hidden chambers found at {sec_url_full}. Exiting this quest.")
return # Exit function instead of using continue in a loop
full_subdirectory_urls = [f"{sec_url_full.rstrip('/')}/{sub}" for sub in subdirectories]
sanitized_file_path = 'sanitized_subdirectories.txt'
with open(sanitized_file_path, 'w') as sanitized_file:
sanitized_file.write('\n'.join(full_subdirectory_urls))
print(f"Sanitized list created: {sanitized_file_path} - The map to hidden chambers is drawn")
output_file_path = 'completed_subdirectories.txt'
if os.path.exists(output_file_path):
with open(output_file_path, 'r') as file:
completed_subdirectories = [line.strip() for line in file]
else:
completed_subdirectories = []
os.makedirs(full_download_directory, exist_ok=True)
print(f"Download directory created: {full_download_directory} - The vault is ready to receive its riches")
total_subdirectories = len(full_subdirectory_urls)
processed_subdirectories = len(completed_subdirectories)
rows = [] # Initialize rows list here
for subdirectory in full_subdirectory_urls:
if subdirectory in completed_subdirectories:
print(f"Skipping already plundered chamber: {subdirectory}")
continue
print(f"Venturing into the chamber: {subdirectory}")
try:
# Summoning the directory's content with an ancient spell
soup = fetch_directory(subdirectory)
# Extracting the scrolls of knowledge from the chamber
txt_links = extract_txt_links(soup)
print(f"Found txt links in {subdirectory}: {txt_links} - Scrolls of lore discovered")
for txt_link in txt_links:
txt_url = "https://www.sec.gov" + txt_link
print(f"Downloading txt file: {txt_url} - Securing the scroll")
download_success = download_file(txt_url, full_download_directory)
download_location = os.path.join(full_download_directory, os.path.basename(txt_url)) if download_success else 'Failed'
rows.append([cik, txt_url, download_location, 'Success' if download_success else 'Failed'])
if download_success:
with open(output_file_path, 'a') as completed_file:
completed_file.write(subdirectory + '\n')
break
time.sleep(0.1) # A brief rest to avoid angering the digital spirits
except Exception as e:
print(f"Failed to access {subdirectory}: {e} - Beware, for this path is cursed!")
with open('error_log.txt', 'a') as error_log_file:
error_log_file.write(f"Failed to access {subdirectory}: {e}\n")
processed_subdirectories += 1
print(f"Progress: {processed_subdirectories}/{total_subdirectories} chambers explored.")
remaining_subdirectories = [sub for sub in full_subdirectory_urls if sub not in completed_subdirectories]
with open(sanitized_file_path, 'w') as sanitized_file:
sanitized_file.write('\n'.join(remaining_subdirectories))
print("Download complete for current CIK - The quest for this treasure trove ends.")
return rows # Return the rows for further processing if needed
def download_archives(source_dir, filelist_path, urls):
# Ensure the directory exists
print(f"Ensuring directory {source_dir} exists...")
os.makedirs(source_dir, exist_ok=True)
print(f"Directory {source_dir} created or already exists.")
# Verbose step: Checking local files
print("Checking existing local files...")
existing_files = {}
if os.path.exists(filelist_path):
with open(filelist_path, 'r') as filelist:
for line in filelist:
parts = line.strip().split(',')
if len(parts) == 4:
existing_files[parts[1]] = {
'size': int(parts[3]),
'timestamp': datetime.strptime(parts[2], '%Y-%m-%d %H:%M:%S')
}
print(f"Checked {len(existing_files)} existing files.")
# Counters for status
total_attempts = 0
failures = 0
successes = 0
skips = 0
def download_and_record(url):
nonlocal total_attempts, failures, successes, skips
file_name = url.split('/')[-1]
output_path = os.path.join(source_dir, file_name)
# Check if the file exists and matches size in filelist.txt
if output_path in existing_files:
local_size = os.path.getsize(output_path) if os.path.exists(output_path) else -1
if local_size == existing_files[output_path]['size']:
print(f"Skipping download of {url}, local file size matches.")
skips += 1
return
total_attempts += 1
attempts = 0
max_attempts = 3 # Max retries
while attempts < max_attempts:
print(f"Attempting to download {url}, attempt {attempts + 1}")
headers = {'User-Agent': "FORTHELULZ@anonyops.com"}
try:
# Add delay to ensure we don't exceed 10 requests per second with 8 threads
time.sleep(0.8) # 0.8 seconds delay per thread; 8 threads * 0.8 = 6.4 seconds per cycle, which is under 10 requests per second
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req) as response:
if response.getcode() == 200:
with open(output_path, "wb") as file:
file.write(response.read())
print(f"File from {url} downloaded on attempt {attempts + 1} and saved as {output_path}")
successes += 1
break
elif response.getcode() == 403:
print(f"Access denied for {url} on attempt {attempts + 1}, trying fallback User-Agent.")
fallback_headers = {'User-Agent': "anonymous/FORTHELULZ@anonyops.com"}
fallback_req = urllib.request.Request(url, headers=fallback_headers)
with urllib.request.urlopen(fallback_req) as fallback_response:
if fallback_response.getcode() == 200:
with open(output_path, "wb") as file:
file.write(fallback_response.read())
print(f"File from {url} downloaded with fallback on attempt {attempts + 1} and saved as {output_path}")
successes += 1
break
else:
print(f"Failed to download file from {url} on attempt {attempts + 1}. Status code: {response.getcode()}")
attempts += 1
except (urllib.error.HTTPError, urllib.error.URLError, IOError) as e:
print(f"Error occurred for {url} on attempt {attempts + 1}: {e}")
attempts += 1
if attempts == max_attempts:
failures += 1
print(f"Failed to download {url} after {max_attempts} attempts.")
if os.path.exists(output_path):
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
file_size = os.path.getsize(output_path)
with open(filelist_path, 'a') as filelist:
filelist.write(f"{url},{output_path},{timestamp},{file_size}\n")
logging.info(f"Successfully downloaded and recorded: {output_path}")
# Verbose step: Beginning downloads
print("Beginning downloads...")
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(download_and_record, url) for url in urls]
for future in tqdm(futures, total=len(urls), desc="Overall Download Progress", unit="files"):
future.result() # Wait for each task to complete
print(f"\nDownload Summary:")
print(f"Total Attempts: {total_attempts}")
print(f"Successes: {successes}")
print(f"Failures: {failures}")
print(f"Skips: {skips}")
def process_zips(url, max_retries=3, timeout=10):
OUTPUT_DIR = os.path.join(ROOT_DIR, "SecNport") # Adjust based on which archives you're processing
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=timeout, stream=True)
response.raise_for_status()
file_size = int(response.headers.get('Content-Length', 0))
with tqdm(total=file_size, unit='B', unit_scale=True, desc=os.path.basename(url), leave=False) as bar:
content = b''
for chunk in response.iter_content(chunk_size=1024):
if chunk:
content += chunk
bar.update(len(chunk))
zip_filename = os.path.basename(url)
local_path = os.path.join(OUTPUT_DIR, zip_filename)
with open(local_path, 'wb') as file:
file.write(content)
print(f"Successfully downloaded: {zip_filename}")
file_size = os.path.getsize(local_path)
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(FILELIST, 'a') as filelist:
filelist.write(f"{url},{local_path},{timestamp},{file_size}\n")
return local_path
except requests.RequestException as e:
print(f"Download attempt {attempt + 1} failed for {url}: {e}")
if attempt == max_retries - 1:
print(f"Max retries reached for {url}, skipping.")
return None
time.sleep(2 ** attempt)
return None
def process_tsv_file(tsv_name, row, zip_file, verbose=False):
"""Process a single TSV file for a given row and return enriched holding_summary with all fields."""
import zipfile
import pandas as pd
import logging
def log_safe(msg):
if verbose:
logging.info(msg)
holding_summary = {}
try:
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
with zip_ref.open(f'{tsv_name}.tsv') as tsvfile:
df = pd.read_csv(tsvfile, delimiter='\t', low_memory=False, encoding='utf-8', on_bad_lines='skip')
match_col = 'ACCESSION_NUMBER' if tsv_name != 'IDENTIFIERS' else 'HOLDING_ID'
match_value = row['ACCESSION_NUMBER'] if tsv_name != 'IDENTIFIERS' else row['HOLDING_ID']
if tsv_name in ['SUBMISSION', 'REGISTRANT', 'FUND_REPORTED_INFO', 'INTEREST_RATE_RISK', 'BORROWER',
'BORROW_AGGREGATE', 'MONTHLY_TOTAL_RETURN']:
match_row = df[df[match_col] == match_value] if match_col in df.columns else pd.DataFrame()
elif tsv_name == 'MONTHLY_RETURN_CAT_INSTRUMENT':
match_row = df[(df['ACCESSION_NUMBER'] == row['ACCESSION_NUMBER']) &
(df['ASSET_CAT'] == row['ASSET_CAT'])] if 'ASSET_CAT' in df.columns else pd.DataFrame()
elif tsv_name == 'IDENTIFIERS':
match_row = df[df[match_col] == match_value] if match_col in df.columns else pd.DataFrame()
if not match_row.empty:
for col in match_row.columns:
prefixed_col = f"{tsv_name}_{col}"
holding_summary[prefixed_col] = match_row.iloc[0][col]
log_safe(f"Matched {tsv_name} for {match_col} {match_value}")
else:
log_safe(f"No match for {tsv_name} with {match_col} {match_value}")
except Exception as e:
log_safe(f"Error processing {tsv_name}.tsv in {zip_file}: {str(e)}")
return holding_summary
def search_nport_swaps(zip_file, search_terms, verbose=False, debug=False, log_file=None):
import os
import zipfile
import pandas as pd
import tqdm
from datetime import datetime
from concurrent.futures import ProcessPoolExecutor, as_completed
import logging
summary = []
if verbose and log_file:
log_dir = os.path.dirname(log_file) or '.'
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(message)s', filemode='a')
def log_safe(msg):
if verbose:
logging.info(msg)
log_safe(f"Starting {zip_file}")
try:
base_name = os.path.basename(zip_file)
year = base_name[:4]
quarter_char = base_name[4]
quarter = {'q': 1, '1': 1, 'w': 2, '2': 2, 'e': 3, '3': 3, 'r': 4, '4': 4}.get(quarter_char.lower(), None)
if quarter is not None:
quarter_start_date = datetime(int(year), quarter*3 - 2, 1)
timestamp = int(quarter_start_date.timestamp())
else:
timestamp = None
log_safe(f"Warning: Could not parse quarter from {zip_file}")
return summary
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
if 'FUND_REPORTED_HOLDING.tsv' not in zip_ref.namelist():
log_safe(f"Warning: {zip_file} does not contain FUND_REPORTED_HOLDING.tsv")
return summary
chunksize = 100000
with zip_ref.open('FUND_REPORTED_HOLDING.tsv') as tsvfile:
total_rows = sum(1 for _ in tsvfile)
tsvfile.seek(0)
with tqdm.tqdm(total=total_rows, desc=f"Processing {zip_file}", unit="row") as pbar:
for chunk in pd.read_csv(zip_ref.open('FUND_REPORTED_HOLDING.tsv'), delimiter='\t', chunksize=chunksize,
low_memory=False, encoding='utf-8', on_bad_lines='skip'):
if 'FILENAME_TIMESTAMP' not in chunk.columns:
chunk['FILENAME_TIMESTAMP'] = timestamp
string_columns = ['ISSUER_NAME', 'ISSUER_TITLE', 'ACCESSION_NUMBER', 'HOLDING_ID', 'ISSUER_LEI',
'ISSUER_CUSIP', 'UNIT', 'OTHER_UNIT_DESC', 'CURRENCY_CODE', 'PAYOFF_PROFILE',
'ASSET_CAT', 'OTHER_ASSET', 'ISSUER_TYPE', 'OTHER_ISSUER', 'INVESTMENT_COUNTRY',
'IS_RESTRICTED_SECURITY', 'FAIR_VALUE_LEVEL', 'DERIVATIVE_CAT']
chunk[string_columns] = chunk[string_columns].fillna('').astype(str)
def contains_search_term(row):
for term in search_terms:
for col in string_columns:
if pd.notna(row[col]) and term.lower() in str(row[col]).lower():
return True
return False
keyword_holdings = chunk[chunk.apply(contains_search_term, axis=1)]
log_safe(f"Processed chunk with {len(chunk)} rows, found {len(keyword_holdings)} matches for {', '.join(search_terms)} in {zip_file}")
if not keyword_holdings.empty:
tsv_files = ['SUBMISSION', 'REGISTRANT', 'FUND_REPORTED_INFO', 'INTEREST_RATE_RISK', 'BORROWER',
'BORROW_AGGREGATE', 'MONTHLY_TOTAL_RETURN', 'MONTHLY_RETURN_CAT_INSTRUMENT', 'IDENTIFIERS']
with ProcessPoolExecutor() as executor:
for index, row in keyword_holdings.iterrows():
holding_summary = {
'ACCESSION_NUMBER': row['ACCESSION_NUMBER'] if 'ACCESSION_NUMBER' in row else None,
'HOLDING_ID': row['HOLDING_ID'] if 'HOLDING_ID' in row else None,
'FILENAME_TIMESTAMP': timestamp,
'FILING_DATE': None, # To be populated from SUBMISSION_FILING_DATE
'ISSUER_NAME': row['ISSUER_NAME'] if 'ISSUER_NAME' in row else None,
'ISSUER_LEI': row['ISSUER_LEI'] if 'ISSUER_LEI' in row else None,
'ISSUER_TITLE': row['ISSUER_TITLE'] if 'ISSUER_TITLE' in row else None,
'ISSUER_CUSIP': row['ISSUER_CUSIP'] if 'ISSUER_CUSIP' in row else None,
'BALANCE': row['BALANCE'] if 'BALANCE' in row else None,
'UNIT': row['UNIT'] if 'UNIT' in row else None,
'OTHER_UNIT_DESC': row['OTHER_UNIT_DESC'] if 'OTHER_UNIT_DESC' in row else None,
'CURRENCY_CODE': row['CURRENCY_CODE'] if 'CURRENCY_CODE' in row else None,
'CURRENCY_VALUE': row['CURRENCY_VALUE'] if 'CURRENCY_VALUE' in row else None,
'EXCHANGE_RATE': row['EXCHANGE_RATE'] if 'EXCHANGE_RATE' in row else None,
'PERCENTAGE': row['PERCENTAGE'] if 'PERCENTAGE' in row else None,
'PAYOFF_PROFILE': row['PAYOFF_PROFILE'] if 'PAYOFF_PROFILE' in row else None,
'ASSET_CAT': row['ASSET_CAT'] if 'ASSET_CAT' in row else None,
'OTHER_ASSET': row['OTHER_ASSET'] if 'OTHER_ASSET' in row else None,
'ISSUER_TYPE': row['ISSUER_TYPE'] if 'ISSUER_TYPE' in row else None,
'OTHER_ISSUER': row['OTHER_ISSUER'] if 'OTHER_ISSUER' in row else None,
'INVESTMENT_COUNTRY': row['INVESTMENT_COUNTRY'] if 'INVESTMENT_COUNTRY' in row else None,
'IS_RESTRICTED_SECURITY': row['IS_RESTRICTED_SECURITY'] if 'IS_RESTRICTED_SECURITY' in row else None,
'FAIR_VALUE_LEVEL': row['FAIR_VALUE_LEVEL'] if 'FAIR_VALUE_LEVEL' in row else None,
'DERIVATIVE_CAT': row['DERIVATIVE_CAT'] if 'DERIVATIVE_CAT' in row else None,
}
# Parallel TSV processing
futures = [executor.submit(process_tsv_file, tsv_name, row, zip_file, verbose)
for tsv_name in tsv_files]
for future in as_completed(futures):
tsv_data = future.result()
holding_summary.update(tsv_data)
# Populate FILING_DATE from SUBMISSION
if 'SUBMISSION_FILING_DATE' in holding_summary:
holding_summary['FILING_DATE'] = pd.to_datetime(holding_summary['SUBMISSION_FILING_DATE'], format='%d-%b-%Y', errors='coerce')
# Derive YYYYQQ from SUBMISSION_REPORT_ENDING_PERIOD
if 'SUBMISSION_REPORT_ENDING_PERIOD' in holding_summary:
report_date = pd.to_datetime(holding_summary['SUBMISSION_REPORT_ENDING_PERIOD'], format='%d-%b-%Y', errors='coerce')
if not pd.isna(report_date):
holding_summary['YYYYQQ'] = f"{report_date.year}Q{((report_date.month-1)//3) + 1}"
else:
holding_summary['YYYYQQ'] = None
elif 'FUND_REPORTED_INFO_REPORT_DATE' in holding_summary:
report_date = pd.to_datetime(holding_summary['FUND_REPORTED_INFO_REPORT_DATE'], format='%d-%b-%Y', errors='coerce')
if not pd.isna(report_date):
holding_summary['YYYYQQ'] = f"{report_date.year}Q{((report_date.month-1)//3) + 1}"
else:
holding_summary['YYYYQQ'] = None
summary.append(holding_summary)
if verbose and index % 10 == 0:
log_safe(f"Processed {index} holdings for {zip_file}")
pbar.update(chunksize)
if debug:
result = chunk.apply(contains_search_term, axis=1)
log_safe(f"Type of result: {type(result)}")
log_safe(f"Result dtype: {result.dtype}")
log_safe(f"First few values of result:\n{result.head()}")
except Exception as e:
log_safe(f"Error processing {zip_file}: {str(e)}")
return summary
def search_nport(search_keywords, verbose=False):
import os
import pandas as pd
import tqdm
from datetime import datetime
import gc
import logging
secnport_path = os.path.join(ROOT_DIR, "SecNport")
os.makedirs(secnport_path, exist_ok=True)
# Get and sort ZIP files by date
zip_files = [os.path.join(secnport_path, f) for f in os.listdir(secnport_path) if f.endswith('.zip')]
zip_files = [os.path.normpath(path) for path in zip_files]
def get_file_date(file):
base_name = os.path.basename(file)
year = base_name[:4]
quarter_char = base_name[4]
quarter = {'q': 1, '1': 1, 'w': 2, '2': 2, 'e': 3, '3': 3, 'r': 4, '4': 4}.get(quarter_char.lower(), None)
if quarter is not None:
return datetime(int(year), quarter*3 - 2, 1)
return datetime.min # Fallback for invalid dates
zip_files.sort(key=get_file_date)
search_terms = [term.strip() for term in search_keywords.split(',')]
output_file = os.path.join(ROOT_DIR, "SecNport", f"{search_keywords.replace(',', '_')}_summary_results.csv")
log_file = os.path.join(ROOT_DIR, "SecNport", f"{search_keywords.replace(',', '_')}_process.log")
# Core headers from FUND_REPORTED_HOLDING and derived
core_headers = [
'ACCESSION_NUMBER', 'HOLDING_ID', 'FILENAME_TIMESTAMP', 'FILING_DATE',
'ISSUER_NAME', 'ISSUER_LEI', 'ISSUER_TITLE', 'ISSUER_CUSIP', 'BALANCE',
'UNIT', 'OTHER_UNIT_DESC', 'CURRENCY_CODE', 'CURRENCY_VALUE', 'EXCHANGE_RATE',
'PERCENTAGE', 'PAYOFF_PROFILE', 'ASSET_CAT', 'OTHER_ASSET', 'ISSUER_TYPE',
'OTHER_ISSUER', 'INVESTMENT_COUNTRY', 'IS_RESTRICTED_SECURITY', 'FAIR_VALUE_LEVEL',
'DERIVATIVE_CAT', 'YYYYQQ'
]
# All fields from provided schema, prefixed by TSV source
all_tsv_fields = {
'SUBMISSION': [
'FILE_NUM', 'SUB_TYPE', 'IS_LAST_FILING', 'REPORT_ENDING_PERIOD', 'REPORT_DATE', 'FILING_DATE'
],
'REGISTRANT': [
'CIK', 'REGISTRANT_NAME', 'FILE_NUM', 'LEI', 'ADDRESS1', 'ADDRESS2', 'CITY', 'STATE',
'COUNTRY', 'ZIP', 'PHONE'
],
'FUND_REPORTED_INFO': [
'SERIES_NAME', 'SERIES_ID', 'SERIES_LEI', 'TOTAL_ASSETS', 'TOTAL_LIABILITIES', 'NET_ASSETS',
'ASSETS_ATTRBT_TO_MISC_SECURITY', 'ASSETS_INVESTED', 'BORROWING_PAY_WITHIN_1YR',
'CTRLD_COMPANIES_PAY_WITHIN_1YR', 'OTHER_AFFILIA_PAY_WITHIN_1YR', 'OTHER_PAY_WITHIN_1YR',
'BORROWING_PAY_AFTER_1YR', 'CTRLD_COMPANIES_PAY_AFTER_1YR', 'OTHER_AFFILIA_PAY_AFTER_1YR',
'OTHER_PAY_AFTER_1YR', 'DELAYED_DELIVERY', 'STANDBY_COMMITMENT', 'LIQUIDATION_PREFERENCE',
'CASH_NOT_RPTD_IN_C_OR_D', 'CREDIT_SPREAD_3MON_INVEST', 'CREDIT_SPREAD_1YR_INVEST',
'CREDIT_SPREAD_5YR_INVEST', 'CREDIT_SPREAD_10YR_INVEST', 'CREDIT_SPREAD_30YR_INVEST',
'CREDIT_SPREAD_3MON_NONINVEST', 'CREDIT_SPREAD_1YR_NONINVEST', 'CREDIT_SPREAD_5YR_NONINVEST',
'CREDIT_SPREAD_10YR_NONINVEST', 'CREDIT_SPREAD_30YR_NONINVEST', 'IS_NON_CASH_COLLATERAL',
'NET_REALIZE_GAIN_NONDERIV_MON1', 'NET_UNREALIZE_AP_NONDERIV_MON1',
'NET_REALIZE_GAIN_NONDERIV_MON2', 'NET_UNREALIZE_AP_NONDERIV_MON2',
'NET_REALIZE_GAIN_NONDERIV_MON3', 'NET_UNREALIZE_AP_NONDERIV_MON3',
'SALES_FLOW_MON1', 'REINVESTMENT_FLOW_MON1', 'REDEMPTION_FLOW_MON1',
'SALES_FLOW_MON2', 'REINVESTMENT_FLOW_MON2', 'REDEMPTION_FLOW_MON2',
'SALES_FLOW_MON3', 'REINVESTMENT_FLOW_MON3', 'REDEMPTION_FLOW_MON3'
],
'INTEREST_RATE_RISK': [
'INTEREST_RATE_RISK_ID', 'CURRENCY_CODE', 'INTRST_RATE_CHANGE_3MON_DV01',
'INTRST_RATE_CHANGE_1YR_DV01', 'INTRST_RATE_CHANGE_5YR_DV01', 'INTRST_RATE_CHANGE_10YR_DV01',
'INTRST_RATE_CHANGE_30YR_DV01', 'INTRST_RATE_CHANGE_3MON_DV100', 'INTRST_RATE_CHANGE_1YR_DV100',
'INTRST_RATE_CHANGE_5YR_DV100', 'INTRST_RATE_CHANGE_10YR_DV100', 'INTRST_RATE_CHANGE_30YR_DV100'
],
'BORROWER': [
'BORROWER_ID', 'NAME', 'LEI', 'AGGREGATE_VALUE'
],
'BORROW_AGGREGATE': [
'BORROW_AGGREGATE_ID', 'AMOUNT', 'COLLATERAL', 'INVESTMENT_CAT', 'OTHER_DESC'
],
'MONTHLY_TOTAL_RETURN': [
'MONTHLY_TOTAL_RETURN_ID', 'CLASS_ID', 'MONTHLY_TOTAL_RETURN1', 'MONTHLY_TOTAL_RETURN2',
'MONTHLY_TOTAL_RETURN3'
],
'MONTHLY_RETURN_CAT_INSTRUMENT': [
'ASSET_CAT', 'INSTRUMENT_KIND', 'NET_REALIZED_GAIN_MON1', 'NET_UNREALIZED_AP_MON1',
'NET_REALIZED_GAIN_MON2', 'NET_UNREALIZED_AP_MON2', 'NET_REALIZED_GAIN_MON3',
'NET_UNREALIZED_AP_MON3'
],
'IDENTIFIERS': [
'IDENTIFIERS_ID', 'IDENTIFIER_ISIN', 'IDENTIFIER_TICKER', 'OTHER_IDENTIFIER', 'OTHER_IDENTIFIER_DESC'
]
}
# Build full headers: core + prefixed TSV fields
headers = core_headers.copy()
for tsv, fields in all_tsv_fields.items():
for field in fields:
prefixed = f"{tsv}_{field}"
if prefixed not in headers:
headers.append(prefixed)
# Initialize CSV with all headers and log the action
pd.DataFrame(columns=headers).to_csv(output_file, index=False, header=True, mode='w', encoding='utf-8')
if verbose and log_file:
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(message)s')
logging.info(f"Initialized CSV with {len(headers)} headers: {output_file}")
# Process ZIP files sequentially
for zip_file in tqdm.tqdm(zip_files, desc="Files Processed", unit="file"):
try:
results = search_nport_swaps(zip_file, search_terms, verbose, log_file=log_file)
if results:
df = pd.DataFrame(results)
df = df.reindex(columns=headers, fill_value=None)
df.to_csv(output_file, index=False, header=False, mode='a', encoding='utf-8')
if verbose and log_file:
logging.info(f"Wrote {len(df)} items to CSV for {zip_file} (total columns: {len(df.columns)})")
except Exception as e:
if verbose and log_file:
logging.info(f"An error occurred while processing {zip_file}: {str(e)}")
gc.collect()
return output_file
def process_ncen_tsv_file(tsv_name, row, zip_file, verbose=False):
"""Process a single N-CEN TSV file for a given row and return enriched fund_summary with all fields."""
import zipfile
import pandas as pd
import logging
def log_safe(msg):
if verbose:
logging.info(msg)
fund_summary = {}
try:
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
with zip_ref.open(f'{tsv_name}.tsv') as tsvfile:
df = pd.read_csv(tsvfile, delimiter='\t', low_memory=False)
# Handle TSVs with ACCESSION_NUMBER (e.g., SUBMISSION, REGISTRANT)
if tsv_name in ['SUBMISSION', 'REGISTRANT', 'REGISTRANT_WEBSITE', 'LOCATION_BOOKS_RECORD',
'TERMINATED_ORGANIZATION', 'DIRECTOR', 'DIRECTOR_FILE_NUMBER', 'CHIEF_COMPLIANCE_OFFICER',
'CCO_EMPLOYER', 'REGISTRANT_REPORTING_SERIES', 'RELEASE_NUMBER', 'PRINCIPAL_UNDERWRITER',
'PUBLIC_ACCOUNTANT', 'VALUATION_METHOD_CHANGE', 'VALUATION_METHOD_CHANGE_SERIES',
'UIT', 'SERIES_CIK', 'SPONSOR', 'TRUSTEE', 'CONTRACT_SECURITY', 'DIVESTMENT',
'REGISTRANT_HELDS_SECURITY', 'DEPOSITOR', 'UIT_ADMIN']:
match_row = df[df['ACCESSION_NUMBER'] == row['ACCESSION_NUMBER']]
if not match_row.empty:
for col in match_row.columns:
prefixed_col = f"{tsv_name}_{col}"
fund_summary[prefixed_col] = match_row.iloc[0][col]
# Handle TSVs with FUND_ID (e.g., FUND_REPORTED_INFO, SHARES_OUTSTANDING)
elif tsv_name in ['FUND_REPORTED_INFO', 'SHARES_OUTSTANDING', 'FEEDER_FUNDS', 'MASTER_FUNDS',
'FOREIGN_INVESTMENT', 'SECURITY_LENDING', 'SEC_LENDING_INDEMNITY_PROVIDER',
'COLLATERAL_MANAGER', 'ADVISER', 'TRANSFER_AGENT', 'PRICING_SERVICE', 'CUSTODIAN',
'SHAREHOLDER_SERVICING_AGENT', 'ADMIN', 'BROKER_DEALER', 'BROKER', 'PRINCIPAL_TRANSACTION',
'LINE_OF_CREDIT_DETAIL', 'LINE_OF_CREDIT_INSTITUTION', 'CREDIT_USER',
'INTER_FUND_LENDING_DETAIL', 'INTER_FUND_BORROWING_DETAIL', 'SECURITY_RELATED_ITEM',
'RIGHTS_OFFERING_FUND', 'LONGTERM_DEBT_DEFAULT', 'DIVIDENDS_IN_ARREAR', 'SECURITY_EXCHANGE',
'AUTHORIZED_PARTICIPANT', 'ETF']:
match_row = df[df['FUND_ID'] == row['FUND_ID']]
if not match_row.empty:
for col in match_row.columns:
prefixed_col = f"{tsv_name}_{col}"
fund_summary[prefixed_col] = match_row.iloc[0][col]
except KeyError:
if verbose:
log_safe(f"Could not find {tsv_name} for ACCESSION_NUMBER {row['ACCESSION_NUMBER']} or FUND_ID {row.get('FUND_ID', 'N/A')} in {zip_file}")
return fund_summary
def search_ncen_swaps(zip_file, search_terms, verbose=False, debug=False, log_file=None):
import os
import zipfile
import pandas as pd
import tqdm
from datetime import datetime
from concurrent.futures import ProcessPoolExecutor, as_completed
import logging
summary = []
# Set up logging
if verbose and log_file:
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(message)s')
def log_safe(msg):
if verbose:
logging.info(msg)
log_safe(f"Starting {zip_file}")
try:
base_name = os.path.basename(zip_file)
year = base_name[:4]
quarter_char = base_name[4]
quarter = {'q': 1, '1': 1, 'w': 2, '2': 2, 'e': 3, '3': 3, 'r': 4, '4': 4}.get(quarter_char.lower(), None)
if quarter is not None:
quarter_start_date = datetime(int(year), quarter*3 - 2, 1)
timestamp = int(quarter_start_date.timestamp())
else:
timestamp = None
log_safe(f"Warning: Could not parse quarter from {zip_file}")
return summary
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
if 'FUND_REPORTED_INFO.tsv' not in zip_ref.namelist():
log_safe(f"Warning: {zip_file} does not contain FUND_REPORTED_INFO.tsv")
return summary
chunksize = 100000
with zip_ref.open('FUND_REPORTED_INFO.tsv') as tsvfile:
total_rows = sum(1 for _ in tsvfile)
with tqdm.tqdm(total=total_rows, desc=f"Processing {zip_file}", unit="row") as pbar:
for chunk in pd.read_csv(zip_ref.open('FUND_REPORTED_INFO.tsv'), delimiter='\t', chunksize=chunksize, low_memory=False):
if 'FILENAME_TIMESTAMP' not in chunk.columns:
chunk['FILENAME_TIMESTAMP'] = timestamp
string_columns = ['FUND_ID', 'FUND_NAME', 'SERIES_ID', 'LEI', 'ACCESSION_NUMBER']
chunk[string_columns] = chunk[string_columns].fillna('').astype(str)
def contains_search_term(row):
for term in search_terms:
if row.astype(str).str.contains(term, case=False, regex=True, na=False).any():
return True
return False
keyword_funds = chunk[chunk.apply(contains_search_term, axis=1)]
log_safe(f"Found {len(keyword_funds)} funds related to {', '.join(search_terms)} in chunk of {zip_file}")
if not keyword_funds.empty:
tsv_files = [
'SUBMISSION', 'REGISTRANT', 'REGISTRANT_WEBSITE', 'LOCATION_BOOKS_RECORD',
'TERMINATED_ORGANIZATION', 'DIRECTOR', 'DIRECTOR_FILE_NUMBER', 'CHIEF_COMPLIANCE_OFFICER',
'CCO_EMPLOYER', 'REGISTRANT_REPORTING_SERIES', 'RELEASE_NUMBER', 'PRINCIPAL_UNDERWRITER',
'PUBLIC_ACCOUNTANT', 'VALUATION_METHOD_CHANGE', 'VALUATION_METHOD_CHANGE_SERIES',
'FUND_REPORTED_INFO', 'SHARES_OUTSTANDING', 'FEEDER_FUNDS', 'MASTER_FUNDS',
'FOREIGN_INVESTMENT', 'SECURITY_LENDING', 'SEC_LENDING_INDEMNITY_PROVIDER',
'COLLATERAL_MANAGER', 'ADVISER', 'TRANSFER_AGENT', 'PRICING_SERVICE', 'CUSTODIAN',
'SHAREHOLDER_SERVICING_AGENT', 'ADMIN', 'BROKER_DEALER', 'BROKER', 'PRINCIPAL_TRANSACTION',
'LINE_OF_CREDIT_DETAIL', 'LINE_OF_CREDIT_INSTITUTION', 'CREDIT_USER',
'INTER_FUND_LENDING_DETAIL', 'INTER_FUND_BORROWING_DETAIL', 'SECURITY_RELATED_ITEM',
'RIGHTS_OFFERING_FUND', 'LONGTERM_DEBT_DEFAULT', 'DIVIDENDS_IN_ARREAR', 'SECURITY_EXCHANGE',
'AUTHORIZED_PARTICIPANT', 'ETF', 'DEPOSITOR', 'UIT_ADMIN', 'UIT', 'SERIES_CIK',
'SPONSOR', 'TRUSTEE', 'CONTRACT_SECURITY', 'DIVESTMENT', 'REGISTRANT_HELDS_SECURITY'
]
with ProcessPoolExecutor() as executor:
for index, row in keyword_funds.iterrows():
fund_summary = {
'FUND_ID': row['FUND_ID'],
'FUND_NAME': row['FUND_NAME'],
'SERIES_ID': row['SERIES_ID'],
'FILENAME_TIMESTAMP': timestamp,
'FILING_DATE': None, # Populated from SUBMISSION.tsv
'ACCESSION_NUMBER': row['ACCESSION_NUMBER'],
'IS_ETF': row.get('IS_ETF', ''),
'IS_ETMF': row.get('IS_ETMF', ''),
'IS_INDEX': row.get('IS_INDEX', ''),
'IS_FUND_OF_FUND': row.get('IS_FUND_OF_FUND', ''),
'IS_MASTER_FEEDER': row.get('IS_MASTER_FEEDER', ''),
'IS_MONEY_MARKET': row.get('IS_MONEY_MARKET', ''),
'IS_TARGET_DATE': row.get('IS_TARGET_DATE', ''),
'IS_UNDERLYING_FUND': row.get('IS_UNDERLYING_FUND', ''),
'LEI': row.get('LEI', '')
}
# Parallel TSV processing
futures = [executor.submit(process_ncen_tsv_file, tsv_name, row, zip_file, verbose)
for tsv_name in tsv_files]
for future in as_completed(futures):
tsv_data = future.result()
fund_summary.update(tsv_data)
# Add quarterly data
if 'FUND_INFO_REPORT_DATE' in fund_summary:
report_date = pd.to_datetime(fund_summary['FUND_INFO_REPORT_DATE'], errors='coerce')
if not pd.isna(report_date):
fund_summary['YYYYQQ'] = f"{report_date.year}Q{((report_date.month-1)//3) + 1}"
else:
fund_summary['YYYYQQ'] = None
else:
fund_summary['YYYYQQ'] = None
summary.append(fund_summary)
if verbose and index % 10 == 0:
log_safe(f"Processed {index} funds for {zip_file}")
pbar.update(chunksize)
if debug:
result = chunk.apply(contains_search_term, axis=1)
log_safe(f"Type of result: {type(result)}")
log_safe(f"Result dtype: {result.dtype}")
log_safe(f"First few values of result:\n{result.head()}")
except Exception as e:
log_safe(f"Error processing {zip_file}: {str(e)}")
return summary
def search_ncen(search_keywords, verbose=False):
import os
import pandas as pd
import tqdm
from datetime import datetime
import gc
import logging
secncen_path = os.path.join(ROOT_DIR, "SecNcen")
os.makedirs(secncen_path, exist_ok=True)
# Get and sort ZIP files by date
zip_files = [os.path.join(secncen_path, f) for f in os.listdir(secncen_path) if f.endswith('.zip')]
zip_files = [os.path.normpath(path) for path in zip_files]
def get_file_date(file):
base_name = os.path.basename(file)
year = base_name[:4]
quarter_char = base_name[4]
quarter = {'q': 1, '1': 1, 'w': 2, '2': 2, 'e': 3, '3': 3, 'r': 4, '4': 4}.get(quarter_char.lower(), None)
if quarter is not None:
return datetime(int(year), quarter*3 - 2, 1)
return datetime.min # Fallback for invalid dates
zip_files.sort(key=get_file_date) # Sort files chronologically
search_terms = [term.strip() for term in search_keywords.split(',')]
output_file = os.path.join(ROOT_DIR, "SecNcen", f"{search_keywords.replace(',', '_')}_summary_results.csv")
log_file = os.path.join(ROOT_DIR, "SecNcen", f"{search_keywords.replace(',', '_')}_process.log")
# Core headers from FUND_REPORTED_INFO and derived
core_headers = [
'FUND_ID', 'FUND_NAME', 'SERIES_ID', 'FILENAME_TIMESTAMP', 'FILING_DATE', 'ACCESSION_NUMBER',
'IS_ETF', 'IS_ETMF', 'IS_INDEX', 'IS_FUND_OF_FUND', 'IS_MASTER_FEEDER', 'IS_MONEY_MARKET',
'IS_TARGET_DATE', 'IS_UNDERLYING_FUND', 'LEI', 'YYYYQQ'
]
# All fields from N-CEN README (https://www.sec.gov/files/ncen_readme.pdf), prefixed by TSV source
all_tsv_fields = {
'SUBMISSION': [
'ACCESSION_NUMBER', 'SUBMISSION_TYPE', 'CIK', 'FILING_DATE', 'REPORT_ENDING_PERIOD',
'IS_REPORT_PERIOD_LT_12MONTH', 'FILE_NUM', 'REGISTRANT_SIGNED_NAME', 'DATE_SIGNED',
'SIGNATURE', 'TITLE', 'IS_LEGAL_PROCEEDINGS', 'IS_PROVISION_FINANCIAL_SUPPORT',
'IS_IPA_REPORT_INTERNAL_CONTROL', 'IS_CHANGE_ACC_PRINCIPLES', 'IS_INFO_REQUIRED_EO',
'IS_OTHER_INFO_REQUIRED', 'IS_MATERIAL_AMENDMENTS', 'IS_INST_DEFINING_RIGHTS',
'IS_NEW_OR_AMENDED_INV_ADV_CONT', 'IS_INFO_ITEM405', 'IS_CODE_OF_ETHICS'
],
'REGISTRANT': [
'ACCESSION_NUMBER', 'REGISTRANT_NAME', 'FILE_NUM', 'CIK', 'LEI', 'ADDRESS1', 'ADDRESS2',
'CITY', 'STATE', 'COUNTRY', 'ZIP', 'PHONE', 'IS_FIRST_FILING', 'IS_LAST_FILING',
'IS_FAMILY_INVESTMENT_COMPANY', 'FAMILY_INVESTMENT_COMPANY_NAME', 'INVESTMENT_COMPANY_TYPE',
'TOTAL_SERIES', 'IS_REGISTERED_UNDER_ACT_1933', 'HAS_SECURITY_HOLDER_VOTE',
'HAS_LEGAL_PROCEEDING', 'IS_PROCEEDING_TERMINATED', 'IS_FIDELITY_BOND_CLAIMED',
'FIDELITY_BOND_CLAIMED_AMOUNT', 'HAS_DIRECTOR_INSURANCE_POLICY', 'HAS_DIRECTOR_FILED_CLAIM',
'FINANCIAL_SUPPORT_2REGISTRANT', 'IS_EXEMPTIVE_ORDER', 'IS_UNDERWRITER_HIRED_OR_FIRED',
'IS_PUB_ACCOUNTANT_CHANGED', 'IS_MATERIAL_WEAKNESS_NOTED', 'IS_ACCT_OPINION_QUALIFIED',
'IS_VALUE_METHOD_CHANGED', 'IS_ACCT_PRINCIPLE_CHANGED', 'IS_NAV_ERROR_CORRECTED',
'ANY_DIVIDEND_PAYMENT'
],
'REGISTRANT_WEBSITE': ['ACCESSION_NUMBER', 'WEBPAGE'],
'LOCATION_BOOKS_RECORD': [
'ACCESSION_NUMBER', 'OFFICE_NAME', 'ADDRESS1', 'ADDRESS2', 'CITY', 'STATE', 'COUNTRY',
'ZIP', 'PHONE', 'BOOKS_RECORDS_DESC'
],
'TERMINATED_ORGANIZATION': ['ACCESSION_NUMBER', 'SERIES_NAME', 'SERIES_ID', 'TERMINATION_DATE'],
'DIRECTOR': ['ACCESSION_NUMBER', 'DIRECTOR_SEQNUM', 'DIRECTOR_NAME', 'CRD_NUMBER', 'IS_INTERESTED_PERSON'],
'DIRECTOR_FILE_NUMBER': ['ACCESSION_NUMBER', 'DIRECTOR_SEQNUM', 'FILE_NUMBER'],
'CHIEF_COMPLIANCE_OFFICER': [
'ACCESSION_NUMBER', 'CCO_SEQNUM', 'CCO_NAME', 'CRD_NUMBER', 'CCO_ADDRESS1', 'CCO_ADDRESS2',
'CCO_CITY', 'STATE', 'COUNTRY', 'CCO_ZIP', 'IS_CHANGED_SINCE_LAST_FILING'
],
'CCO_EMPLOYER': ['ACCESSION_NUMBER', 'CCO_SEQNUM', 'CCO_EMPLOYER_NAME', 'CCO_EMPLOYER_ID'],
'REGISTRANT_REPORTING_SERIES': ['ACCESSION_NUMBER', 'SOURCE', 'SERIES_NAME', 'SERIES_ID'],
'RELEASE_NUMBER': ['ACCESSION_NUMBER', 'RELEASE_NUMBER'],
'PRINCIPAL_UNDERWRITER': [
'ACCESSION_NUMBER', 'UNDERWRITER_NAME', 'FILE_NUM', 'CRD_NUM', 'UNDERWRITER_LEI',
'STATE', 'COUNTRY', 'IS_AFFILIATED'
],
'PUBLIC_ACCOUNTANT': [
'ACCESSION_NUMBER', 'PUB_ACCOUNTANT_NAME', 'PCAOB_NUM', 'PUB_ACCOUNTANT_LEI',
'STATE', 'COUNTRY'
],
'VALUATION_METHOD_CHANGE': [
'ACCESSION_NUMBER', 'VALUATION_METHOD_CHANGE_SEQNUM', 'DATE_OF_CHANGE', 'CHANGE_EXPLANATION',
'ASSET_TYPE', 'ASSET_TYPE_OTHER_DESC', 'INVESTMENT_TYPE', 'STATUTORY_REGULATORY_BASIS'
],
'VALUATION_METHOD_CHANGE_SERIES': [
'ACCESSION_NUMBER', 'VALUATION_METHOD_CHANGE_SEQNUM', 'SERIES_NAME', 'SERIES_ID'
],
'FUND_REPORTED_INFO': [
'FUND_ID', 'ACCESSION_NUMBER', 'FUND_NAME', 'SERIES_ID', 'LEI', 'IS_FIRST_FILING',
'AUTHORIZED_SHARES_CNT', 'ADDED_NEW_SHARES_CNT', 'TERMINATED_SHARES_CNT', 'IS_ETF',
'IS_ETMF', 'IS_INDEX', 'IS_MULTI_INVERSE_INDEX', 'IS_INTERVAL', 'IS_FUND_OF_FUND',
'IS_MASTER_FEEDER', 'IS_MONEY_MARKET', 'IS_TARGET_DATE', 'IS_UNDERLYING_FUND',
'IS_FUND_TYPE_NA', 'IS_INDEX_AFFILIATED', 'IS_INDEX_EXCLUSIVE', 'RETURN_B4_FEES_AND_EXPENSES',
'RETURN_AFTR_FEES_AND_EXPENSES', 'STDV_B4_FEES_AND_EXPENSES', 'STDV_AFTR_FEES_AND_EXPENSES',
'IS_NON_DIVERSIFIED', 'IS_FOREIGN_SUBSIDIARY', 'IS_SEC_LENDING_AUTHORIZED', 'DID_LEND_SECURITIES',
'IS_COLLATERAL_LIQUIDATED', 'IS_IMPACTED_ADVERSELY', 'IS_PYMNT_REV_SHARING_SPLIT',
'IS_PYMNT_NON_REV_SHARING_SPLIT', 'IS_PYMNT_ADMIN_FEE', 'IS_PYMNT_CASH_COLLATERAL_FEE',
'IS_PYMNT_INDEMNI_FEE', 'IS_PYMNT_OTHER', 'IS_PYMNT_NA', 'OTHER_FEE_DESC',
'AVG_VALUE_SEC_LOAN', 'NET_INCOME_SEC_LENDING', 'IS_RELYON_RULE_10F_3', 'IS_RELYON_RULE_12D1_1',
'IS_RELYON_RULE_15A_4', 'IS_RELYON_RULE_17A_6', 'IS_RELYON_RULE_17A_7', 'IS_RELYON_RULE_17A_8',
'IS_RELYON_RULE_17E_1', 'IS_RELYON_RULE_22D_1', 'IS_RELYON_RULE_23C_1', 'IS_RELYON_RULE_32A_4',
'IS_RELYON_RULE_6C_11', 'IS_RELYON_RULE_12D1_4', 'IS_RELYON_RULE_12D1G', 'IS_RELYON_RULE_18F_4',
'IS_RELYON_RULE_18F_4C4', 'IS_RELYON_RULE_18F_4C2', 'IS_RELYON_RULE_18F_4DI', 'IS_RELYON_RULE_18F_4DII',
'IS_RELYON_RULE_18F_4E', 'IS_RELYON_RULE_18F_4F', 'IS_RELYON_RULE_NA', 'HAS_EXP_LIMIT',
'HAS_EXP_REDUCED_WAIVED', 'HAS_EXP_SUBJ_RECOUP', 'HAS_EXP_RECOUPED', 'HAS_XAGENT_HIRED_FIRED_MI',
'HAS_PRICING_SRVC_HIRED_FIRED', 'HAS_CUSTODIAN_HIRED_FIRED_MI', 'HAS_SH_SRVC_HIRED_FIRED',
'HAS_ADMIN_HIRED_FIRED', 'AGG_COMMISSION', 'AGG_PRINCIPAL', 'DID_PAY_BROKER_RESEARCH',
'MONTHLY_AVG_NET_ASSETS', 'DAILY_AVG_NET_ASSETS', 'HAS_LINE_OF_CREDIT', 'HAS_INTERFUND_LENDING',
'HAS_INTERFUND_BORROWING', 'HAS_SWING_PRICING', 'SWING_FACTOR_UPPER_LIMIT',
'DID_MAKE_RIGHTS_OFFERING', 'DID_MAKE_SECOND_OFFERING', 'IS_SECONDARY_COMMON',
'IS_SECONDARY_PREFERRED', 'IS_SECONDARY_WARRANTS', 'IS_SECONDARY_CONVERTIBLES',
'IS_SECONDARY_BONDS', 'IS_SECONDARY_OTHER', 'OTHER_SECONDARY_DESC', 'DID_REPURCHASE_SECURITY',
'IS_REPUR_COMMON', 'IS_REPUR_PREFERRED', 'IS_REPUR_WARRANTS', 'IS_REPUR_CONVERTIBLES',
'IS_REPUR_BONDS', 'IS_REPUR_OTHER', 'OTHER_REPUR_DESC', 'IS_LONG_TERM_DEBT_DEFAULT',
'IS_ACCUM_DIVIDEND_IN_ARREARS', 'IS_SECURITY_MAT_MODIFIED', 'MANAGEMENT_FEE',
'NET_OPERATING_EXPENSES', 'MARKET_PRICE_PER_SHARE', 'NAV_PER_SHARE',
'HAS_XAGENT_HIRED_FIRED_CE', 'HAS_CUSTODIAN_HIRED_FIRED_CE'
],
'SHARES_OUTSTANDING': ['FUND_ID', 'CLASS_NAME', 'CLASS_ID', 'TICKER'],
'FEEDER_FUNDS': [
'FUND_ID', 'FUND_NAME', 'REGISTERED_FILE_NUM', 'REGISTERED_SERIES_ID',
'REGISTERED_FUND_LEI', 'UNREGISTERED_FILE_NUM', 'UNREGISTERED_FUND_LEI'
],
'MASTER_FUNDS': ['FUND_ID', 'FUND_NAME', 'FILE_NUM', 'SEC_FILE_NUM', 'FUND_LEI'],
'FOREIGN_INVESTMENT': ['FUND_ID', 'FOREIGN_SUBSIDIARY_NAME', 'FOREIGN_SUBSIDIARY_LEI'],
'SECURITY_LENDING': [
'FUND_ID', 'SECURITY_LENDING_SEQNUM', 'SECURITIES_AGENT_NAME', 'SECURITIES_AGENT_LEI',
'IS_AFFILIATED', 'SECURITY_AGENT_IDEMNITY', 'DID_INDEMNIFICATION_RIGHTS'
],
'SEC_LENDING_INDEMNITY_PROVIDER': [
'FUND_ID', 'SECURITY_LENDING_SEQNUM', 'INDEMNITY_PROVIDER_NAME', 'INDEMNITY_PROVIDER_LEI'
],
'COLLATERAL_MANAGER': [
'FUND_ID', 'COLLATERAL_MANAGER_NAME', 'COLLATERAL_MANAGER_LEI', 'IS_AFFILIATED',
'IS_AFFILIATED_WITH_FUND'
],
'ADVISER': [
'FUND_ID', 'SOURCE', 'ADVISER_TYPE', 'ADVISER_NAME', 'FILE_NUM', 'CRD_NUM',
'ADVISER_LEI', 'STATE', 'COUNTRY', 'IS_AFFILIATED', 'IS_ADVISOR_HIRED',
'ADVISOR_START_DATE', 'ADVISOR_TERMINATED_DATE'
],
'TRANSFER_AGENT': [
'FUND_ID', 'SOURCE', 'TRANSFERAGENT_NAME', 'FILE_NUM', 'TRANSFERAGENT_LEI',
'STATE', 'COUNTRY', 'IS_AFFILIATED', 'IS_SUBTRANSFER_AGENT'
],
'PRICING_SERVICE': [
'FUND_ID', 'PRICING_SERVICE_NAME', 'PRICING_SERVICE_LEI', 'OTHER_IDENTIFYING_NUM_DESC',
'STATE', 'COUNTRY', 'IS_AFFILIATED'
],
'CUSTODIAN': [
'FUND_ID', 'SOURCE', 'CUSTODIAN_NAME', 'CUSTODIAN_LEI', 'STATE', 'COUNTRY',
'IS_AFFILIATED', 'IS_SUB_CUSTODIAN', 'CUSTODY_TYPE', 'OTHER_CUSTODIAN_DESC'
],
'SHAREHOLDER_SERVICING_AGENT': [
'FUND_ID', 'AGENT_NAME', 'AGENT_LEI', 'OTHER_IDENTIFYING_NUM_DESC', 'STATE',
'COUNTRY', 'IS_AFFILIATED', 'IS_SUBSHARE'
],
'ADMIN': [
'FUND_ID', 'ADMIN_NAME', 'ADMIN_LEI', 'OTHER_IDENTIFYING_NUM', 'STATE',
'COUNTRY', 'IS_AFFILIATED', 'IS_SUB_ADMIN'
],
'BROKER_DEALER': [
'FUND_ID', 'BROKER_DEALER_NAME', 'FILE_NUM', 'CRD_NUM', 'BROKER_DEALER_LEI',
'STATE', 'COUNTRY', 'COMMISSION'
],
'BROKER': [
'FUND_ID', 'BROKER_NAME', 'FILE_NUM', 'CRD_NUM', 'BROKER_LEI', 'STATE',
'COUNTRY', 'GROSS_COMMISSION'
],
'PRINCIPAL_TRANSACTION': [
'FUND_ID', 'PRINCIPAL_NAME', 'FILE_NUM', 'CRD_NUM', 'PRINCIPAL_LEI', 'STATE',
'COUNTRY', 'PRINCIPAL_TOTAL_PURCHASE_SALE'
],
'LINE_OF_CREDIT_DETAIL': [
'FUND_ID', 'LINE_OF_CREDIT_SEQNUM', 'IS_CREDIT_LINE_COMMITTED', 'LINE_OF_CREDIT_SIZE',
'CREDIT_TYPE', 'IS_CREDIT_LINE_USED', 'AVERAGE_CREDIT_LINE_USED', 'DAYS_CREDIT_USED'
],
'LINE_OF_CREDIT_INSTITUTION': ['FUND_ID', 'LINE_OF_CREDIT_SEQNUM', 'CREDIT_INSTITUTION_NAME'],
'CREDIT_USER': ['FUND_ID', 'LINE_OF_CREDIT_SEQNUM', 'FUND_NAME', 'SEC_FILE_NUM'],
'INTER_FUND_LENDING_DETAIL': ['FUND_ID', 'LENDING_LOAN_AVERAGE', 'LENDING_DAYS_OUTSTANDING'],
'INTER_FUND_BORROWING_DETAIL': ['FUND_ID', 'BORROWING_LOAN_AVERAGE', 'BORROWING_DAYS_OUTSTANDING'],
'SECURITY_RELATED_ITEM': [
'FUND_ID', 'SECURITY_RELATED_ITEM_SEQNUM', 'DESCRIPTION', 'SECURITY_CLASS_TITLE',
'OTHER_SECURITY_DESCRIPTION', 'EXCHANGE', 'TICKER_SYMBOL'
],
'RIGHTS_OFFERING_FUND': [
'FUND_ID', 'IS_RIGHTS_OFFER_COMMON', 'IS_RIGHTS_OFFER_PREFERRED', 'IS_RIGHTS_OFFER_WARRANTS',
'IS_RIGHTS_OFFER_CONVERTIBLES', 'IS_RIGHTS_OFFER_BONDS', 'IS_RIGHTS_OFFER_OTHER',
'RIGHTS_OFFER_DESC', 'PCT_PARTCI_PRIMARY_OFFERING'
],
'LONGTERM_DEBT_DEFAULT': [
'FUND_ID', 'DEFAULT_NATURE', 'DEFAULT_DATE', 'DEFAULT_AMNT_PER_1000', 'TOTAL_DEFAULT_AMNT'
],
'DIVIDENDS_IN_ARREAR': ['FUND_ID', 'ISSUE_TITLE', 'AMOUNT_PER_SHARE_IN_ARREAR'],
'SECURITY_EXCHANGE': ['FUND_ID', 'FUND_EXCHANGE', 'FUND_TICKER_SYMBOL'],
'AUTHORIZED_PARTICIPANT': [
'FUND_ID', 'PARTICIPANT_NAME', 'FILE_NUM', 'CRD_NUM', 'PARTICIPANT_LEI',
'PURCHASE_VALUE', 'REDEEM_VALUE'
],
'ETF': [
'FUND_ID', 'FUND_NAME', 'SERIES_ID', 'IS_COLLATERAL_REQUIRED', 'NUM_SHARES_PER_CREATION_UNIT',
'PURCHASED_AVG_PCT_CASH', 'PURCHASED_STDV_PCT_CASH', 'PURCHASED_AVG_PCT_NON_CASH',
'PURCHASED_STDV_PCT_NON_CASH', 'REDEEMED_AVG_PCT_CASH', 'REDEEMED_STDV_PCT_CASH',
'REDEEMED_AVG_PCT_NON_CASH', 'REDEEMED_STDV_PCT_NON_CASH', 'PURCH_AVG_FEE_PER_UNIT',
'PURCH_AVG_FEE_SAME_DAY', 'PURCH_AVG_FEE_PERCENTAGE', 'PURCH_AVG_FEE_CASH_PER_UNIT',
'PURCH_AVG_FEE_CASH_SAME_DAY', 'PURCH_AVG_FEE_CASH_PERCENTAGE', 'REDEEM_AVG_FEE_PER_UNIT',
'REDEEM_AVG_FEE_SAME_DAY', 'REDEEM_AVG_FEE_PERCENTAGE', 'REDEEM_AVG_FEE_CASH_PER_UNIT',
'REDEEM_AVG_FEE_CASH_SAME_DAY', 'REDEEM_AVG_FEE_CASH_PERCENTAGE',
'IS_PERF_TRACKED_AFFILIA_PERSON', 'IS_PERF_TRACKED_EXCLUSIVELY',
'ANNUAL_DIFF_B4_FEE_EXPENSE', 'ANNUAL_DIFF_AFTER_FEE_EXPENSE',
'ANNUAL_STDV_B4_FEE_EXPENSE', 'ANNUAL_STDV_AFTER_FEE_EXPENSE', 'IS_FUND_IN_KIND_ETF'
],
'DEPOSITOR': [
'ACCESSION_NUMBER', 'DEPOSITOR_NAME', 'CRD_NUM', 'DEPOSITOR_LEI', 'STATE',
'COUNTRY', 'ULTIMATE_PARENT_NAME'
],
'UIT_ADMIN': [
'ACCESSION_NUMBER', 'UIT_ADMIN_NAME', 'UIT_ADMIN_LEI', 'OTHER_IDENTIFYING_NUM',
'STATE', 'COUNTRY', 'IS_AFFILIATED', 'IS_SUB_ADMIN'
],
'UIT': [
'ACCESSION_NUMBER', 'IS_ADMIN_HIRED_FIRED', 'IS_SEPERATE_ACCT', 'EXISTING_SERIES_CNT',
'NEW_SERIES_CNT', 'NEW_SERIES_AGG_VALUE', 'SERIES_CURRENT_PROSPECTUS',
'SERIES_CNT_ADDITIONAL_UNITS', 'TOTAL_VALUE_ADDITIONAL_UNIT', 'VALUE_UNIT_PLACED_SUBSEQUENT',
'TOTAL_ASSET_FOR_ALL_SERIES', 'SERIES_ID', 'NUM_CONTRACTS', 'IS_RELYON_RULE_6C_7',
'IS_RELYON_RULE_11A_2', 'IS_RELYON_RULE_12D1_4', 'IS_RELYON_RULE_12D1G'
],
'SERIES_CIK': ['ACCESSION_NUMBER', 'SERIES_CIK'],
'SPONSOR': ['ACCESSION_NUMBER', 'SPONSOR_NAME', 'CRD_NUM', 'SPONSOR_LEI', 'STATE', 'COUNTRY'],
'TRUSTEE': ['ACCESSION_NUMBER', 'TRUSTEE_NAME', 'STATE', 'COUNTRY'],
'CONTRACT_SECURITY': [
'ACCESSION_NUMBER', 'SECURITY_NAME', 'CONTRACT_ID', 'TOTAL_ASSET', 'NUM_CONTRACT_SOLD',
'GROSS_PREMIUM_RECEIVED', 'GROSS_PREMIUM_RECEIVED_SEC1035', 'NUM_CONTRACT_AFFECTED_PAID',
'CONTRACT_VALUE_REDEEMED', 'CONTRAC_VALUE_REDEEMED_SEC1035', 'NUM_CONTRACT_AFFECTED_REDEEMED'
],
'DIVESTMENT': [
'ACCESSION_NUMBER', 'ISSUER_NAME', 'TICKER', 'CUSIP', 'DIVESTED_NUM_SHARES',
'DIVESTED_DATE', 'STATUTE_NAME'
],
'REGISTRANT_HELDS_SECURITY': ['ACCESSION_NUMBER', 'TICKER', 'CUSIP', 'TOTAL_NUM_SHARES']
}
# Build full headers: core + prefixed TSV fields
headers = core_headers.copy()
for tsv, fields in all_tsv_fields.items():
for field in fields:
prefixed = f"{tsv}_{field}"
if prefixed not in headers: # Avoid duplicates
headers.append(prefixed)
# Initialize CSV with all headers and log the action
pd.DataFrame(columns=headers).to_csv(output_file, index=False, header=True, mode='w', encoding='utf-8')
if verbose and log_file:
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(message)s')
logging.info(f"Initialized CSV with {len(headers)} headers: {output_file}")
# Process ZIP files sequentially
for zip_file in tqdm.tqdm(zip_files, desc="Files Processed", unit="file"):
try:
results = search_ncen_swaps(zip_file, search_terms, verbose, log_file=log_file)
if results:
df = pd.DataFrame(results)
# Ensure DataFrame columns match all headers order
df = df.reindex(columns=headers, fill_value=None)
df.to_csv(output_file, index=False, header=False, mode='a', encoding='utf-8')
if verbose and log_file:
logging.info(f"Wrote {len(df)} items to CSV for {zip_file} (total columns: {len(df.columns)})")
except Exception as e:
if verbose and log_file:
logging.info(f"An error occurred while processing {zip_file}: {str(e)}")
gc.collect()
return output_file
def process_nmfp_tsv_file(tsv_name, row, zip_file, verbose=False):
import zipfile
import pandas as pd
import logging
def log_safe(msg):
if verbose:
logging.info(msg)
holding_summary = {}
try:
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
tsv_filename = f"NMFP_{tsv_name}.tsv"
if tsv_filename not in zip_ref.namelist():
log_safe(f"Warning: {tsv_filename} not found in {zip_file}")
return holding_summary
with zip_ref.open(tsv_filename) as tsvfile:
df = pd.read_csv(tsvfile, delimiter='\t', low_memory=False)
# Handle TSVs with ACCESSION_NUMBER
if tsv_name in ['SUBMISSION', 'FUND', 'SERIESLEVELINFO', 'MASTERFEEDERFUND', 'ADVISER', 'ADMINISTRATOR', 'TRANSFERAGENT',
'SERIESSHADOWPRICE_L', 'CLASSLEVELINFO', 'NETASSETVALUEPERSHARE_L', 'LIQUIDASSETSDETAILS',
'SEVENDAYGROSSYIELD', 'DLYNETASSETVALUEPERSHARS']:
match_row = df[df['ACCESSION_NUMBER'] == row['ACCESSION_NUMBER']]
if not match_row.empty:
for col in match_row.columns:
prefixed_col = f"{tsv_name}_{col}"
holding_summary[prefixed_col] = match_row.iloc[0][col]
log_safe(f"Matched {tsv_name} for ACCESSION_NUMBER {row['ACCESSION_NUMBER']}")
else:
log_safe(f"No match for {tsv_name} with ACCESSION_NUMBER {row['ACCESSION_NUMBER']}")
# Handle TSVs with SECURITY_ID
elif tsv_name in ['SCHPORTFOLIOSECURITIES', 'COLLATERALISSUERS', 'NRSRO', 'DEMANDFEATURE', 'GUARANTOR', 'ENHANCEMENTPROVIDER']:
match_row = df[df['SECURITY_ID'] == row['SECURITY_ID']]
if not match_row.empty:
for col in match_row.columns:
prefixed_col = f"{tsv_name}_{col}"
holding_summary[prefixed_col] = match_row.iloc[0][col]
log_safe(f"Matched {tsv_name} for SECURITY_ID {row['SECURITY_ID']}")
else:
log_safe(f"No match for {tsv_name} with SECURITY_ID {row['SECURITY_ID']}")
except Exception as e:
log_safe(f"Error processing {tsv_filename} in {zip_file}: {str(e)}")
return holding_summary
def search_nmfp_swaps(zip_file, search_terms, verbose=False, debug=False, log_file="SecNmfp/repo_process.log"):
import os
import zipfile
import pandas as pd
import tqdm
from datetime import datetime
from concurrent.futures import ProcessPoolExecutor, as_completed
import logging
# Set up logging with default file
if verbose and log_file:
log_dir = os.path.dirname(log_file) or '.'
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(message)s', filemode='a')
summary = []
def log_safe(msg):
if verbose:
logging.info(msg)
log_safe(f"Starting {zip_file}")
try:
base_name = os.path.basename(zip_file)
year = base_name[:4]
quarter_char = base_name[4]
quarter = {'q': 1, '1': 1, 'w': 2, '2': 2, 'e': 3, '3': 3, 'r': 4, '4': 4}.get(quarter_char.lower(), None)
if quarter is not None:
quarter_start_date = datetime(int(year), quarter*3 - 2, 1)
timestamp = int(quarter_start_date.timestamp())
else:
timestamp = None
log_safe(f"Warning: Could not parse quarter from {zip_file}")
return summary
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
if 'NMFP_SCHPORTFOLIOSECURITIES.tsv' not in zip_ref.namelist():
log_safe(f"Warning: {zip_file} does not contain NMFP_SCHPORTFOLIOSECURITIES.tsv")
return summary
chunksize = 100000
with zip_ref.open('NMFP_SCHPORTFOLIOSECURITIES.tsv') as tsvfile:
total_rows = sum(1 for _ in tsvfile)
tsvfile.seek(0) # Reset file pointer after counting
with tqdm.tqdm(total=total_rows, desc=f"Processing {zip_file}", unit="row") as pbar:
for chunk in pd.read_csv(zip_ref.open('NMFP_SCHPORTFOLIOSECURITIES.tsv'), delimiter='\t', chunksize=chunksize,
low_memory=False, encoding='utf-8', on_bad_lines='skip'):
if 'FILENAME_TIMESTAMP' not in chunk.columns:
chunk['FILENAME_TIMESTAMP'] = timestamp
string_columns = ['NAMEOFISSUER', 'TITLEOFISSUER', 'BRIEFDESCRIPTION', 'ACCESSION_NUMBER', 'SECURITY_ID', 'CUSIP_NUMBER']
log_safe(f"Chunk columns: {chunk.columns.tolist()}")
log_safe(f"Sample NAMEOFISSUER: {chunk['NAMEOFISSUER'].head().tolist() if 'NAMEOFISSUER' in chunk.columns else 'Column missing'}")
chunk[string_columns] = chunk[string_columns].fillna('').astype(str)
def contains_search_term(row):
for term in search_terms:
for col in string_columns:
if pd.notna(row[col]) and term.lower() in str(row[col]).lower():
return True
return False
keyword_securities = chunk[chunk.apply(contains_search_term, axis=1)]
log_safe(f"Processed chunk with {len(chunk)} rows, found {len(keyword_securities)} matches for {', '.join(search_terms)} in {zip_file}")
if not keyword_securities.empty:
tsv_files = [
'SUBMISSION', 'FUND', 'SERIESLEVELINFO', 'MASTERFEEDERFUND', 'ADVISER', 'ADMINISTRATOR', 'TRANSFERAGENT',
'SERIESSHADOWPRICE_L', 'CLASSLEVELINFO', 'NETASSETVALUEPERSHARE_L', 'LIQUIDASSETSDETAILS',
'SEVENDAYGROSSYIELD', 'DLYNETASSETVALUEPERSHARS', 'SCHPORTFOLIOSECURITIES', 'COLLATERALISSUERS',
'NRSRO', 'DEMANDFEATURE', 'GUARANTOR', 'ENHANCEMENTPROVIDER'
]
with ProcessPoolExecutor() as executor:
for index, row in keyword_securities.iterrows():
holding_summary = {
'ACCESSION_NUMBER': row['ACCESSION_NUMBER'],
'SECURITY_ID': row['SECURITY_ID'],
'FILENAME_TIMESTAMP': timestamp,
'FILING_DATE': None,
'NAMEOFISSUER': row['NAMEOFISSUER'] if 'NAMEOFISSUER' in row else None,
'TITLEOFISSUER': row['TITLEOFISSUER'] if 'TITLEOFISSUER' in row else None,
'CUSIP_NUMBER': row['CUSIP_NUMBER'] if 'CUSIP_NUMBER' in row else None,
'LEI': row['LEI'] if 'LEI' in row else None,
'ISIN': row['ISIN'] if 'ISIN' in row else None,
'CIK': row['CIK'] if 'CIK' in row else None,
'INVESTMENTCATEGORY': row['INVESTMENTCATEGORY'] if 'INVESTMENTCATEGORY' in row else None,
'BRIEFDESCRIPTION': row['BRIEFDESCRIPTION'] if 'BRIEFDESCRIPTION' in row else None,
'REPURCHASEAGREEMENTOPENFLAG': row['REPURCHASEAGREEMENTOPENFLAG'] if 'REPURCHASEAGREEMENTOPENFLAG' in row else None,
'INVESTMENTMATURITYDATEWAM': row['INVESTMENTMATURITYDATEWAM'] if 'INVESTMENTMATURITYDATEWAM' in row else None,
'YIELDOFTHESECURITYASOFREPORTIN': row['YIELDOFTHESECURITYASOFREPORTIN'] if 'YIELDOFTHESECURITYASOFREPORTIN' in row else None,
'INCLUDINGVALUEOFANYSPONSORSUPP': row['INCLUDINGVALUEOFANYSPONSORSUPP'] if 'INCLUDINGVALUEOFANYSPONSORSUPP' in row else None,
'EXCLUDINGVALUEOFANYSPONSORSUPP': row['EXCLUDINGVALUEOFANYSPONSORSUPP'] if 'EXCLUDINGVALUEOFANYSPONSORSUPP' in row else None,
'PERCENTAGEOFMONEYMARKETFUNDNET': row['PERCENTAGEOFMONEYMARKETFUNDNET'] if 'PERCENTAGEOFMONEYMARKETFUNDNET' in row else None,
'DAILYLIQUIDASSETSECURITYFLAG': row['DAILYLIQUIDASSETSECURITYFLAG'] if 'DAILYLIQUIDASSETSECURITYFLAG' in row else None,
'WEEKLYLIQUIDASSETSECURITYFLAG': row['WEEKLYLIQUIDASSETSECURITYFLAG'] if 'WEEKLYLIQUIDASSETSECURITYFLAG' in row else None,
'YYYYQQ': None
}
futures = [executor.submit(process_nmfp_tsv_file, tsv_name, row, zip_file, verbose)
for tsv_name in tsv_files]
for future in as_completed(futures):
tsv_data = future.result()
holding_summary.update(tsv_data)
if 'SUBMISSION_REPORT_ENDING_PERIOD' in holding_summary:
report_date = pd.to_datetime(holding_summary['SUBMISSION_REPORT_ENDING_PERIOD'], errors='coerce')
if not pd.isna(report_date):
holding_summary['YYYYQQ'] = f"{report_date.year}Q{((report_date.month-1)//3) + 1}"
else:
holding_summary['YYYYQQ'] = None
else:
holding_summary['YYYYQQ'] = None
summary.append(holding_summary)
if verbose and index % 10 == 0:
log_safe(f"Processed {index} securities for {zip_file}")
pbar.update(chunksize)
except Exception as e:
log_safe(f"Error processing {zip_file}: {str(e)}")
log_safe(f"Returning {len(summary)} results for {zip_file}")
return summary
def search_nmfp(search_keywords, verbose=False):
import os
import pandas as pd
import tqdm
from datetime import datetime
import gc
import logging
secnmfp_path = os.path.join(ROOT_DIR, "SecNmfp")
os.makedirs(secnmfp_path, exist_ok=True)
# Get and sort ZIP files by date
zip_files = [os.path.join(secnmfp_path, f) for f in os.listdir(secnmfp_path) if f.endswith('.zip')]
zip_files = [os.path.normpath(path) for path in zip_files]
def log_safe(msg):
if verbose:
logging.info(msg)
def get_file_date(file):
base_name = os.path.basename(file)
year = base_name[:4]
quarter_char = base_name[4]
quarter = {'q': 1, '1': 1, 'w': 2, '2': 2, 'e': 3, '3': 3, 'r': 4, '4': 4}.get(quarter_char.lower(), None)
if quarter is not None:
return datetime(int(year), quarter*3 - 2, 1)
return datetime.min # Fallback for invalid dates
zip_files.sort(key=get_file_date) # Sort files chronologically
search_terms = [term.strip() for term in search_keywords.split(',')]
output_file = os.path.join(ROOT_DIR, "SecNmfp", f"{search_keywords.replace(',', '_')}_summary_results.csv")
log_file = os.path.join(ROOT_DIR, "SecNmfp", f"{search_keywords.replace(',', '_')}_process.log")
# Core headers from SCHPORTFOLIOSECURITIES and derived
core_headers = [
'ACCESSION_NUMBER', 'SECURITY_ID', 'FILENAME_TIMESTAMP', 'FILING_DATE', 'ISSUER_NAME',
'SECURITY_DESCRIPTION', 'CUSIP', 'TICKER', 'VALUE', 'AMOUNT', 'CURRENCY', 'MATURITY_DATE',
'IS_DAILY_LIQUID', 'IS_WEEKLY_LIQUID', 'IS_REDEEMABLE', 'IS_REPURCHASE_AGREEMENT',
'RATE', 'PORTFOLIO_PERCENTAGE', 'FAIR_VALUE', 'CATEGORY', 'COUNTRY', 'INDUSTRY',
'RATING', 'IS_DERIVATIVE', 'DERIVATIVE_TYPE', 'YYYYQQ'
]
# All fields from NMFP README
all_tsv_fields = {
'SUBMISSION': [
'ACCESSION_NUMBER', 'FILING_DATE', 'REPORT_ENDING_PERIOD', 'IS_LAST_FILING', 'SUB_TYPE'
],
'FUND': [
'ACCESSION_NUMBER', 'FILENUMBER', 'SERIESID', 'FUND_TYPE', 'SUCCESSOR_FUND_NAME',
'SUCCESSOR_FUND_LEI', 'ACQUIRED_FUND_NAME', 'ACQUIRED_FUND_LEI'
],
'SERIESLEVELINFO': [
'ACCESSION_NUMBER', 'SERIESID', 'FUND_NAME', 'FUND_LEI', 'FUND_ADDRESS1', 'FUND_ADDRESS2',
'FUND_CITY', 'FUND_STATE', 'FUND_COUNTRY', 'FUND_ZIP', 'FUND_PHONE', 'IS_MONEY_MARKET',
'IS_GOVERNMENT', 'IS_PRIME', 'IS_TAX_EXEMPT', 'IS_RETAIL', 'IS_INSTITUTIONAL',
'IS_STABLE_NAV', 'IS_STABLE_VALUE', 'IS_MASTER_FEEDER', 'IS_FEEDER', 'IS_ETF',
'TOTAL_ASSETS', 'TOTAL_LIABILITIES', 'NET_ASSETS', 'WAM', 'WAL', 'DAILY_LIQUID_ASSETS',
'WEEKLY_LIQUID_ASSETS', 'SHARES_OUTSTANDING', 'NAV_PER_SHARE', 'SEVEN_DAY_LIQUIDITY',
'SEVEN_DAY_YIELD', 'WEEKLY_GROSS_YIELD', 'REPORT_DATE'
],
'MASTERFEEDERFUND': [
'ACCESSION_NUMBER', 'SERIESID', 'FUND_TYPE', 'MASTER_FUND_NAME', 'MASTER_FUND_LEI',
'MASTER_FUND_FILE_NUMBER', 'MASTER_FUND_SERIES_ID'
],
'ADVISER': [
'ACCESSION_NUMBER', 'ADVISORFILENUMBER', 'ADVISER_TYPE', 'ADVISER_NAME', 'ADVISER_LEI',
'ADVISER_ADDRESS1', 'ADVISER_ADDRESS2', 'ADVISER_CITY', 'ADVISER_STATE', 'ADVISER_COUNTRY',
'ADVISER_ZIP', 'ADVISER_PHONE'
],
'ADMINISTRATOR': [
'ACCESSION_NUMBER', 'ADMINISTRATORNAME', 'ADMINISTRATOR_LEI', 'ADMINISTRATOR_ADDRESS1',
'ADMINISTRATOR_ADDRESS2', 'ADMINISTRATOR_CITY', 'ADMINISTRATOR_STATE', 'ADMINISTRATOR_COUNTRY',
'ADMINISTRATOR_ZIP', 'ADMINISTRATOR_PHONE'
],
'TRANSFERAGENT': [
'ACCESSION_NUMBER', 'FILENUMBER', 'TRANSFERAGENT_NAME', 'TRANSFERAGENT_LEI',
'TRANSFERAGENT_ADDRESS1', 'TRANSFERAGENT_ADDRESS2', 'TRANSFERAGENT_CITY',
'TRANSFERAGENT_STATE', 'TRANSFERAGENT_COUNTRY', 'TRANSFERAGENT_ZIP', 'TRANSFERAGENT_PHONE'
],
'SERIESSHADOWPRICE_L': [
'ACCESSION_NUMBER', 'SHADOW_PRICE_DATE', 'SHADOW_NAV', 'MARKET_BASED_NAV'
],
'CLASSLEVELINFO': [
'ACCESSION_NUMBER', 'CLASSESID', 'CLASS_NAME', 'CLASS_LEI', 'SHARES_OUTSTANDING',
'NAV_PER_SHARE', 'IS_STABLE_NAV', 'IS_STABLE_VALUE'
],
'NETASSETVALUEPERSHARE_L': [
'ACCESSION_NUMBER', 'CLASSESID', 'TYPE', 'NAV_DATE', 'NAV_PER_SHARE'
],
'SCHPORTFOLIOSECURITIES': [
'ACCESSION_NUMBER', 'SECURITY_ID', 'ISSUER_NAME', 'SECURITY_DESCRIPTION', 'CUSIP',
'TICKER', 'VALUE', 'AMOUNT', 'CURRENCY', 'MATURITY_DATE', 'IS_DAILY_LIQUID',
'IS_WEEKLY_LIQUID', 'IS_REDEEMABLE', 'IS_REPURCHASE_AGREEMENT', 'RATE',
'PORTFOLIO_PERCENTAGE', 'FAIR_VALUE', 'CATEGORY', 'COUNTRY', 'INDUSTRY',
'RATING', 'IS_DERIVATIVE', 'DERIVATIVE_TYPE', 'AMORTIZED_COST', 'PRINCIPAL_AMOUNT',
'YIELD_TO_MATURITY', 'EFFECTIVE_MATURITY', 'MAXIMUM_MATURITY', 'IS_GUARANTEED'
],
'COLLATERALISSUERS': [
'ACCESSION_NUMBER', 'SECURITY_ID', 'NAMEOFCOLLATERALISSUER', 'COLLATERALMATURITYDATE',
'VALUEOFCOLLATERALTOTHENEARESTC'
],
'NRSRO': [
'ACCESSION_NUMBER', 'SECURITY_ID', 'IDENTITY', 'TYPE', 'NAMEOFNRSRO', 'RATING'
],
'DEMANDFEATURE': [
'ACCESSION_NUMBER', 'SECURITY_ID', 'IDENTITYOFDEMANDFEATUREISSUER', 'DEMAND_FEATURE_TYPE'
],
'GUARANTOR': [
'ACCESSION_NUMBER', 'SECURITY_ID', 'IDENTITYOFTHEGUARANTOR', 'GUARANTOR_TYPE'
],
'ENHANCEMENTPROVIDER': [
'ACCESSION_NUMBER', 'SECURITY_ID', 'IDENTITYOFENHANCEMENTPROVIDER', 'TYPEOFENHANCEMENT'
],
'LIQUIDASSETSDETAILS': [
'ACCESSION_NUMBER', 'TOTLIQUIDASSETSNEARPCTDATE', 'DAILY_LIQUID_ASSETS', 'WEEKLY_LIQUID_ASSETS',
'TOTAL_ASSETS', 'DAILY_LIQUID_PERCENTAGE', 'WEEKLY_LIQUID_PERCENTAGE'
],
'SEVENDAYGROSSYIELD': [
'ACCESSION_NUMBER', 'SEVENDAYGROSSYIELDDATE', 'SEVEN_DAY_GROSS_YIELD'
],
'DLYNETASSETVALUEPERSHARS': [
'ACCESSION_NUMBER', 'NAV_DATE', 'NAV_PER_SHARE'
]
}
# Build full headers
headers = core_headers.copy()
for tsv, fields in all_tsv_fields.items():
for field in fields:
prefixed = f"{tsv}_{field}"
if prefixed not in headers:
headers.append(prefixed)
# Initialize CSV with all headers
pd.DataFrame(columns=headers).to_csv(output_file, index=False, header=True, mode='w', encoding='utf-8')
if verbose and log_file:
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(message)s')
logging.info(f"Initialized CSV with {len(headers)} headers: {output_file}")
# Process ZIP files sequentially
for zip_file in tqdm.tqdm(zip_files, desc="Files Processed", unit="file"):
try:
results = search_nmfp_swaps(zip_file, search_terms, verbose, log_file=log_file)
if results:
log_safe(f"Appending {len(results)} rows for {zip_file}")
df = pd.DataFrame(results)
df = df.reindex(columns=headers, fill_value=None)
df.to_csv(output_file, index=False, header=False, mode='a', encoding='utf-8')
logging.info(f"Wrote {len(df)} rows to CSV for {zip_file} (total columns: {len(df.columns)})")
else:
logging.info(f"No results for {zip_file}")
except Exception as e:
logging.info(f"An error occurred while processing {zip_file}: {str(e)}")
gc.collect()
return output_file
def main_search(zip_file, search_keyword, verbose=False, looking_for_swaps=False):
import tqdm, pandas as pd
if verbose:
print(f"Starting {zip_file}")
summary = []
try:
base_name = os.path.basename(zip_file)
year = base_name[:4]
quarter_char = base_name[4]
quarter = {
'q': 1, '1': 1,
'w': 2, '2': 2,
'e': 3, '3': 3,
'r': 4, '4': 4
}.get(quarter_char.lower(), None)
if quarter is not None:
quarter_start_date = datetime(int(year), quarter*3 - 2, 1)
timestamp = int(quarter_start_date.timestamp())
else:
timestamp = None
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
if 'FUND_REPORTED_HOLDING.tsv' not in zip_ref.namelist():
if verbose:
print(f"Warning: {zip_file} does not contain FUND_REPORTED_HOLDING.tsv")
return summary
chunksize = 100000 # Adjust based on memory usage
total_rows = 0 # Estimate total rows, this might need adjustment based on file size
with zip_ref.open('FUND_REPORTED_HOLDING.tsv') as tsvfile:
total_rows = sum(1 for _ in tsvfile) # Count lines for progress estimation
with tqdm.tqdm(total=total_rows, desc=f"Processing {zip_file}", unit="row") as pbar:
for chunk in pd.read_csv(zip_ref.open('FUND_REPORTED_HOLDING.tsv'), delimiter='\t', chunksize=chunksize, low_memory=False):
# Ensure 'FILENAME_TIMESTAMP' column exists, using timestamp from filename
if 'FILENAME_TIMESTAMP' not in chunk.columns:
chunk['FILENAME_TIMESTAMP'] = f"{year}{quarter_char}" # Add column with the year and quarter from the filename
# Ensure all columns are treated as strings for string operations
string_columns = ['ISSUER_NAME', 'ISSUER_TITLE', 'ACCESSION_NUMBER', 'HOLDING_ID', 'FILENAME_TIMESTAMP',
'ISSUER_LEI', 'ISSUER_CUSIP', 'BALANCE', 'UNIT', 'OTHER_UNIT_DESC', 'CURRENCY_CODE',
'CURRENCY_VALUE', 'EXCHANGE_RATE', 'PERCENTAGE', 'PAYOFF_PROFILE', 'ASSET_CAT',
'OTHER_ASSET', 'ISSUER_TYPE', 'OTHER_ISSUER', 'INVESTMENT_COUNTRY',
'IS_RESTRICTED_SECURITY', 'FAIR_VALUE_LEVEL', 'DERIVATIVE_CAT']
chunk[string_columns] = chunk[string_columns].fillna('').astype(str)
pbar.update(len(chunk))
if search_keyword == 'SWAPS$':
# Special case for SWAPS$ to search across all columns
return search_nport_swaps(zip_file, verbose, debug=True)
else:
# Escape special regex characters in each search term
search_terms = [re.escape(term.strip()) for term in search_keyword.split(',')]
conditions = []
for term in search_terms:
condition = False
for column in string_columns:
if column in chunk.columns:
condition = condition | chunk[column].str.contains(term, case=False, na=False, regex=True)
conditions.append(condition)
if conditions:
keyword_holdings = chunk[pd.concat(conditions, axis=1).any(axis=1)]
else:
keyword_holdings = pd.DataFrame(columns=chunk.columns) # Empty DataFrame with same columns
if looking_for_swaps:
keyword_holdings = keyword_holdings[keyword_holdings['DERIVATIVE_CAT'].str.contains('swap', case=False, na=False, regex=True)]
if verbose:
print(f"Found {len(keyword_holdings)} holdings related to '{search_keyword}' in {zip_file}")
if not keyword_holdings.empty:
for index, row in keyword_holdings.iterrows():
holding_summary = {
'ACCESSION_NUMBER': row['ACCESSION_NUMBER'],
'HOLDING_ID': row['HOLDING_ID'],
'FILENAME_TIMESTAMP': timestamp,
'ISSUER_NAME': row['ISSUER_NAME'],
'ISSUER_LEI': row['ISSUER_LEI'],
'ISSUER_TITLE': row['ISSUER_TITLE'],
'ISSUER_CUSIP': row['ISSUER_CUSIP'],
'BALANCE': row['BALANCE'],
'UNIT': row['UNIT'],
'OTHER_UNIT_DESC': row['OTHER_UNIT_DESC'],
'CURRENCY_CODE': row['CURRENCY_CODE'],
'CURRENCY_VALUE': row['CURRENCY_VALUE'],
'EXCHANGE_RATE': row['EXCHANGE_RATE'],
'PERCENTAGE': row['PERCENTAGE'],
'PAYOFF_PROFILE': row['PAYOFF_PROFILE'],
'ASSET_CAT': row['ASSET_CAT'],
'OTHER_ASSET': row['OTHER_ASSET'],
'ISSUER_TYPE': row['ISSUER_TYPE'],
'OTHER_ISSUER': row['OTHER_ISSUER'],
'INVESTMENT_COUNTRY': row['INVESTMENT_COUNTRY'],
'IS_RESTRICTED_SECURITY': row['IS_RESTRICTED_SECURITY'],
'FAIR_VALUE_LEVEL': row['FAIR_VALUE_LEVEL'],
'DERIVATIVE_CAT': row['DERIVATIVE_CAT'],
}
# Process additional TSV files for each holding
for tsv_name in ['REGISTRANT', 'FUND_REPORTED_INFO', 'INTEREST_RATE_RISK', 'BORROWER', 'BORROW_AGGREGATE', 'MONTHLY_TOTAL_RETURN', 'MONTHLY_RETURN_CAT_INSTRUMENT', 'IDENTIFIERS']:
try:
with zip_ref.open(f'{tsv_name}.tsv') as tsvfile:
df = pd.read_csv(tsvfile, delimiter='\t', low_memory=False)
if tsv_name == 'REGISTRANT':
reg_row = df[df['ACCESSION_NUMBER'] == row['ACCESSION_NUMBER']]
if not reg_row.empty:
for col in ['CIK', 'REGISTRANT_NAME', 'FILE_NUM', 'LEI', 'ADDRESS1', 'ADDRESS2', 'CITY', 'STATE', 'COUNTRY', 'ZIP', 'PHONE']:
if col in reg_row.columns:
holding_summary[col] = reg_row.iloc[0][col]
elif tsv_name == 'FUND_REPORTED_INFO':
fund_row = df[df['ACCESSION_NUMBER'] == row['ACCESSION_NUMBER']]
if not fund_row.empty:
for col in fund_row.columns:
if col not in holding_summary:
holding_summary[col] = fund_row.iloc[0][col]
elif tsv_name == 'INTEREST_RATE_RISK':
intrst_rate_row = df[df['ACCESSION_NUMBER'] == row['ACCESSION_NUMBER']]
if not intrst_rate_row.empty:
for col in intrst_rate_row.columns:
if col not in holding_summary:
holding_summary[col] = intrst_rate_row.iloc[0][col]
elif tsv_name == 'BORROWER':
borrower_row = df[df['ACCESSION_NUMBER'] == row['ACCESSION_NUMBER']]
if not borrower_row.empty:
for col in ['NAME', 'LEI', 'AGGREGATE_VALUE']:
if col in borrower_row.columns:
holding_summary[f"BORROWER_{col}"] = borrower_row.iloc[0][col]
elif tsv_name == 'BORROW_AGGREGATE':
borrow_agg_row = df[df['ACCESSION_NUMBER'] == row['ACCESSION_NUMBER']]
if not borrow_agg_row.empty:
for col in ['AMOUNT', 'COLLATERAL', 'INVESTMENT_CAT', 'OTHER_DESC']:
if col in borrow_agg_row.columns:
holding_summary[f"BORROW_AGGREGATE_{col}"] = borrow_agg_row.iloc[0][col]
elif tsv_name == 'MONTHLY_TOTAL_RETURN':
mtr_row = df[df['ACCESSION_NUMBER'] == row['ACCESSION_NUMBER']]
if not mtr_row.empty:
for i in range(1, 4): # 1, 2, 3 for the three months
holding_summary[f'MONTHLY_TOTAL_RETURN_{i}'] = mtr_row.iloc[0][f'MONTHLY_TOTAL_RETURN{i}']
elif tsv_name == 'MONTHLY_RETURN_CAT_INSTRUMENT':
mrci_row = df[(df['ACCESSION_NUMBER'] == row['ACCESSION_NUMBER']) &
(df['ASSET_CAT'] == row['ASSET_CAT'])]
if not mrci_row.empty:
for i in range(1, 4): # 1, 2, 3 for the three months
for prefix in ['NET_REALIZED_GAIN', 'NET_UNREALIZED_AP']:
holding_summary[f'{prefix}_MON{i}'] = mrci_row.iloc[0][f'{prefix}_MON{i}']
elif tsv_name == 'IDENTIFIERS':
identifiers_row = df[df['HOLDING_ID'] == row['HOLDING_ID']]
if not identifiers_row.empty:
for col in ['IDENTIFIER_ISIN', 'IDENTIFIER_TICKER', 'OTHER_IDENTIFIER', 'OTHER_IDENTIFIER_DESC']:
if col in identifiers_row.columns:
holding_summary[col] = identifiers_row.iloc[0][col]
except KeyError:
if verbose:
print(f"Could not find {tsv_name} for {row['ACCESSION_NUMBER']}")
# Add quarterly data
if 'REPORT_DATE' in holding_summary:
holding_summary['YYYYQQ'] = f"{holding_summary['REPORT_DATE'].year}Q{((holding_summary['REPORT_DATE'].month-1)//3) + 1}"
else:
holding_summary['YYYYQQ'] = None
summary.append(holding_summary)
#if verbose and index % 10 == 0: # Print status every 10 entries
# print(f"Processed {index} holdings for {zip_file}")
if verbose:
print(f"Finished {zip_file}")
except Exception as e:
if verbose:
print(f"Error processing {zip_file}: {str(e)}")
return summary
def process_file(zip_file, search_terms, verbose=False, looking_for_swaps=False):
results = []
for term in search_terms:
if term == 'SWAPS$':
summary, _ = search_nport_swaps(zip_file, verbose, debug=True)
else:
summary = main_search(zip_file, term, verbose, looking_for_swaps)
# Convert to datetime if needed
for item in summary:
date = datetime.fromtimestamp(item['FILENAME_TIMESTAMP']) if 'FILENAME_TIMESTAMP' in item else datetime(1970, 1, 1)
results.append((date, item))
return results
def list_csv_files(directory):
return [f for f in os.listdir(directory) if f.endswith('_results.csv')]
def write_to_csv(queue, output_file, verbose=False):
with open(output_file, 'w', newline='') as csvfile:
writer = pd.DataFrame().to_csv(csvfile, index=False, header=True, mode='w') # Write header once
while True:
try:
date, item = queue.get(timeout=1) # Wait up to 1 second
if date is None: # Sentinel value to signal end of queue
break
df = pd.DataFrame([item])
df.to_csv(csvfile, index=False, header=False, mode='a')
if verbose:
print(f"Wrote item with date {date} to CSV")
except Empty:
if verbose:
print("No more items to write, writer thread exiting.")
break
except Exception as e:
if verbose:
print(f"Error writing to CSV: {e}")
def parse_nport_xml(file_path, search_term="GameStop", cusip="36467W109"):
"""
Parse N-PORT XML for GameStop-related swaps and holdings.
Returns DataFrame with CIK, Accession, Issuer, CUSIP, Counterparty, Notional, Type.
"""
from lxml import etree
try:
tree = etree.parse(file_path)
ns = {'ns': 'http://www.sec.gov/edgar/nport'}
holdings = []
for invstOrSec in tree.findall('.//ns:invstOrSec', ns):
name = invstOrSec.find('ns:name', ns).text if invstOrSec.find('ns:name', ns) is not None else ''
sec_cusip = invstOrSec.find('ns:cusip', ns).text if invstOrSec.find('ns:cusip', ns) is not None else ''
if search_term.lower() in name.lower() or sec_cusip == cusip:
# Check for swap details
deriv = invstOrSec.find('.//ns:derivativeInfo/ns:swapInfo', ns)
entry = {
'CIK': tree.find('.//ns:cik', ns).text if tree.find('.//ns:cik', ns) is not None else 'N/A',
'Accession': os.path.basename(file_path).replace('.xml', ''),
'Issuer': name,
'CUSIP': sec_cusip,
'Counterparty': 'N/A',
'Notional_USD': 0.0,
'Type': 'Holding'
}
if deriv is not None:
entry['Counterparty'] = deriv.find('ns:counterparty/ns:name', ns).text if deriv.find('ns:counterparty/ns:name', ns) is not None else 'N/A'
entry['Notional_USD'] = float(deriv.find('ns:notionalAmt/ns:valUSD', ns).text or 0) if deriv.find('ns:notionalAmt/ns:valUSD', ns) is not None else 0.0
entry['Type'] = 'Swap'
holdings.append(entry)
return pd.DataFrame(holdings)
except Exception as e:
print(f"Error parsing N-PORT {file_path}: {e}")
return pd.DataFrame()
def parse_ncen_file(file_path):
"""
Parse N-CEN XML for fund metadata and exemptions.
Returns DataFrame with CIK, Accession, Fund_Name, Is_Retail, Exemptions.
"""
from lxml import etree
try:
tree = etree.parse(file_path)
ns = {'cen': 'http://www.sec.gov/EDGAR/ncen'}
data = {
'CIK': tree.find('.//cen:cik', ns).text if tree.find('.//cen:cik', ns) is not None else 'N/A',
'Accession': os.path.basename(file_path).replace('.xml', ''),
'Fund_Name': tree.find('.//cen:fundName', ns).text if tree.find('.//cen:fundName', ns) is not None else 'N/A',
'Is_Retail': tree.find('.//cen:itemA10', ns).text == 'Y' if tree.find('.//cen:itemA10', ns) is not None else False,
'Exemptions': []
}
# Parse exemptions (Item C.29, D.15)
for exempt in tree.findall('.//cen:exemptiveOrders/cen:exemptiveOrder', ns):
order_id = exempt.find('cen:orderNumber', ns).text if exempt.find('cen:orderNumber', ns) is not None else ''
rule = exempt.find('cen:ruleReliedOn', ns).text if exempt.find('cen:ruleReliedOn', ns) is not None else ''
data['Exemptions'].append(f"{rule} ({order_id})" if order_id else rule)
# Securities lending (Item C.6)
for lending in tree.findall('.//cen:secLending', ns):
borrower = lending.find('cen:borrower/ns:name', ns).text if lending.find('cen:borrower/ns:name', ns) is not None else ''
if borrower:
data['Exemptions'].append(f"SecLending ({borrower})")
data['Exemptions'] = '; '.join(data['Exemptions']) if data['Exemptions'] else 'None'
return pd.DataFrame([data])
except Exception as e:
print(f"Error parsing N-CEN {file_path}: {e}")
return pd.DataFrame()
def parse_ncsr_file(file_path, search_term="GameStop", cusip="36467W109"):
"""
Parse N-CSR text for GameStop-related swaps/holdings.
Returns DataFrame with CIK, Accession, Issuer, CUSIP, Counterparty, Notional_USD, Type.
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
holdings = []
# Regex for swaps/holdings (adjust for N-CSR table formats)
swap_pattern = re.compile(
r'(?i)(?:swap|derivative)\s+.*?(?:GameStop|36467W109)\s+.*?counterparty.*?(\w+.*?)\s+(\d+\.?\d*)',
re.MULTILINE | re.DOTALL
)
matches = swap_pattern.findall(text)
for counterparty, notional in matches:
holdings.append({
'CIK': file_path.split('/')[-2], # Extract CIK from path or metadata
'Accession': os.path.basename(file_path).replace('.txt', ''),
'Issuer': search_term,
'CUSIP': cusip,
'Counterparty': counterparty.strip(),
'Notional_USD': float(notional.replace(',', '')),
'Type': 'Swap (N-CSR)'
})
return pd.DataFrame(holdings)
except Exception as e:
print(f"Error parsing N-CSR {file_path}: {e}")
return pd.DataFrame()
def parse_all_filings(nport_dir, ncsr_dir, ncen_dir, search_term="GameStop", cusip="36467W109"):
"""
Chain N-PORT, N-CSR, N-CEN parsing for GameStop swaps, merging into one CSV.
Returns unified DataFrame with CIK, Accession, Issuer, CUSIP, Counterparty, Notional_USD, Type, Fund_Name, Is_Retail, Exemptions, Retail_Impact.
"""
try:
nport_dfs = []
ncsr_dfs = []
ncen_dfs = []
# Parse N-PORT
for file in os.listdir(nport_dir):
if file.endswith('.xml'):
df = parse_nport_xml(os.path.join(nport_dir, file), search_term, cusip)
if not df.empty:
nport_dfs.append(df)
# Parse N-CSR
for file in os.listdir(ncsr_dir):
if file.endswith('.txt'):
df = parse_ncsr_file(os.path.join(ncsr_dir, file), search_term, cusip)
if not df.empty:
ncsr_dfs.append(df)
# Parse N-CEN
for file in os.listdir(ncen_dir):
if file.endswith('.xml'):
df = parse_ncen_file(os.path.join(ncen_dir, file))
if not df.empty:
ncen_dfs.append(df)
# Merge DataFrames
nport_df = pd.concat(nport_dfs, ignore_index=True) if nport_dfs else pd.DataFrame()
ncsr_df = pd.concat(ncsr_dfs, ignore_index=True) if ncsr_dfs else pd.DataFrame()
ncen_df = pd.concat(ncen_dfs, ignore_index=True) if ncen_dfs else pd.DataFrame()
# Combine N-PORT and N-CSR, then merge with N-CEN
combined_df = pd.concat([nport_df, ncsr_df], ignore_index=True)
if not combined_df.empty and not ncen_df.empty:
combined_df = combined_df.merge(
ncen_df[['CIK', 'Accession', 'Fund_Name', 'Is_Retail', 'Exemptions']],
on=['CIK', 'Accession'],
how='left'
)
else:
combined_df['Fund_Name'] = 'N/A'
combined_df['Is_Retail'] = False
combined_df['Exemptions'] = 'None'
# Add Retail_Impact
combined_df['Retail_Impact'] = combined_df.apply(
lambda row: 'High' if (row['Is_Retail'] and (row['Notional_USD'] > 1e7 or '12d1-4' in str(row['Exemptions']))) else 'Low',
axis=1
)
# Save to CSV
output_file = f'{search_term}_swaps_unified.csv'
combined_df.to_csv(output_file, index=False)
print(f"Unified results saved to {output_file}")
return combined_df
except Exception as e:
print(f"Error in parse_all_filings: {e}")
return pd.DataFrame()
def edgartotal():
from ratelimit import limits, sleep_and_retry
import backoff, multiprocessing
# Define EDGAR_SOURCE_DIR before running
# EDGAR_SOURCE_DIR = "path/to/zip/files" # Uncomment and set your directory
file_queue = Queue()
idx_file = os.path.join(EDGAR_SOURCE_DIR, "master.idx")
log_file = os.path.join(EDGAR_SOURCE_DIR, "sec_download_log.txt")
# Configure logging
logging.basicConfig(
level=logging.ERROR,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='error_log.txt',
filemode='w'
)
logging.error("This is an error message")
# Track total file size across all URLs
total_size_all = 0
log_buffer = [] # Buffer for log messages
def flush_log_buffer():
if log_buffer:
with open(log_file, 'a') as log:
log.writelines(log_buffer)
log_buffer.clear()
def log_progress(message):
nonlocal log_buffer
timestamp = datetime.now()
entry = f"{timestamp}: {message}\n"
log_buffer.append(entry)
if len(log_buffer) >= 1000: # Flush buffer every 1000 lines
flush_log_buffer()
if "Progress" in message or "Finished" in message or "Total" in message or "Error" in message or "429" in message:
print(message)
def get_dynamic_workers(failed_429_count=0):
"""Calculate the number of ThreadPoolExecutor workers based on CPU threads and 429 errors."""
cpu_count = multiprocessing.cpu_count()
base_workers = min(cpu_count * 2, 16)
# Reduce workers if 429 errors are frequent
if failed_429_count > 10: # Adjust threshold based on testing
max_workers = max(base_workers // 2, 1) # Halve workers, minimum 1
log_progress(f"High 429 errors ({failed_429_count}). Reducing workers to {max_workers}")
else:
max_workers = base_workers
log_progress(f"Detected {cpu_count} CPU threads. Setting ThreadPoolExecutor workers to {max_workers}")
return max_workers
@sleep_and_retry
@limits(calls=10, period=60) # Limit to 10 requests per minute
@backoff.on_exception(backoff.expo, requests.exceptions.HTTPError, max_tries=3)
def check_file_size(url):
nonlocal total_size_all
try:
headers = {'User-Agent': "FORTHELULZ@anonops.com"}
response = requests.head(url, headers=headers, timeout=10)
response.raise_for_status()
size = int(response.headers.get('Content-Length', 0))
log_progress(f"Size retrieved for {url}: {size} bytes")
total_size_all += size
return size
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
retry_after = e.response.headers.get('Retry-After')
log_progress(f"429 Error for {url}: Too Many Requests. Retry-After: {retry_after if retry_after else 'Not provided'}")
else:
log_progress(f"Failed to get size for {url}: {e}")
return None
except requests.RequestException as e:
log_progress(f"Failed to get size for {url}: {e}")
return None
def process_line(line):
parts = line.split('|')
if len(parts) >= 5:
filename = parts[4].strip()
if filename.endswith("Filename"):
filename = filename.rsplit('/', 1)[0]
url = f"https://www.sec.gov/Archives/{filename}"
return url
return None
def extract_idx_from_zip(zip_path):
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
for file_name in zip_ref.namelist():
if file_name.endswith('.idx'):
idx_content = zip_ref.read(file_name).decode('utf-8', errors='ignore')
return '\n'.join(idx_content.split('\n')[12:])
log_progress(f"Warning: No IDX file found in {zip_path}, skipping.")
return None
except Exception as e:
log_progress(f"Error reading {zip_path}: {e}")
return None
def get_user_selection(zip_files):
print("\nEnter a 4-digit year, 'qtr' for specific quarter, 'all' for all archives, or '0' to return to main menu:")
while True:
choice = input("Your choice: ").strip().lower()
if choice == '0':
return None
elif choice == 'all':
return zip_files
elif choice == 'qtr':
print("\nAvailable ZIP files:")
for i, file in enumerate(zip_files, 1):
print(f"{i}. {file}")
while True:
try:
choice = int(input("Enter the number of the ZIP file to process (or 0 to exit): "))
if choice == 0:
break
if 1 <= choice <= len(zip_files):
return [zip_files[choice - 1]]
print("Invalid choice. Please enter a number between 1 and", len(zip_files))
except ValueError:
print("Please enter a valid number.")
elif choice.isdigit() and len(choice) == 4:
year = choice
print(f"Processing files for year {year}. Enter a quarter (1-4) or press Enter for all quarters:")
quarter = input("Quarter (or press Enter for all): ").strip()
if quarter and quarter.isdigit() and 1 <= int(quarter) <= 4:
year_files = [f for f in zip_files if f.startswith(year) and f.endswith(f"_QTR{quarter}.zip")]
else:
year_files = [f for f in zip_files if f.startswith(year)]
if year_files:
print(f"Processing files for year {year}, quarter {quarter if quarter else 'all'}:")
return year_files
print(f"No files found for year {year}, quarter {quarter if quarter else 'all'}.")
else:
print("Only 4-digit year, 'qtr', 'all', or '0' accepted. For example: 1999, qtr, all")
def process_zip(zip_path, failed_429_count=0):
log_progress(f"Processing {zip_path}")
idx_content = extract_idx_from_zip(zip_path)
if not idx_content:
return 0, 0
urls = [process_line(line) for line in idx_content.split('\n') if process_line(line)]
checked = 0
failed = 0
failed_429 = 0
total_files = len(urls)
zip_total_size = 0
failed_urls = []
with ThreadPoolExecutor(max_workers=get_dynamic_workers(failed_429_count)) as executor:
futures = {executor.submit(check_file_size, url): url for url in urls}
for future in tqdm(concurrent.futures.as_completed(futures), total=total_files, desc=f"Processing {os.path.basename(zip_path)}"):
url = futures[future]
size = future.result()
if size is not None:
checked += 1
zip_total_size += size
else:
failed += 1
if "429" in str(future.exception()):
failed_429 += 1
failed_urls.append(url)
log_progress(f"Progress: Checked {checked}/{total_files}, Failed {failed}, 429 Errors {failed_429}")
log_progress(f"Finished processing {zip_path}. Checked {checked}/{total_files}, Failed {failed}, 429 Errors {failed_429}, Total Size: {zip_total_size} bytes")
if failed_urls:
log_progress(f"Failed URLs due to 429: {len(failed_urls)}. Consider retrying these URLs after a delay.")
flush_log_buffer()
return zip_total_size, failed_429
def remove_top_lines(file_path, lines_to_remove=11):
with open(file_path, 'r') as file:
lines = file.readlines()
with open(file_path, 'w') as file:
file.writelines(lines[lines_to_remove:])
def compile_urls(zip_directory, idx_file):
log_progress(f"Starting URL compilation from {zip_directory} into {idx_file}")
total_zips = len([f for f in os.listdir(zip_directory) if f.endswith('.zip')])
with tqdm(total=total_zips, desc="Compiling URLs") as pbar:
for file in os.listdir(zip_directory):
if file.endswith('.zip'):
zip_path = os.path.join(zip_directory, file)
idx_content = extract_idx_from_zip(zip_path)
if idx_content:
try:
with open(idx_file, 'a', encoding='utf-8') as master_file:
for line in idx_content.split('\n'):
if line.strip():
master_file.write(line + '\n')
log_progress(f"Processed ZIP file: {file}")
except Exception as e:
log_progress(f"Error writing to {idx_file} from {file}: {e}")
pbar.update(1)
log_progress(f"URL compilation completed. Processed {total_zips} ZIP files")
flush_log_buffer()
def scrape_sec(idx_file, failed_429_count=0):
log_progress(f"Starting SEC size checking from {idx_file}")
with open(idx_file, 'r', encoding='utf-8', errors='ignore') as file:
lines = file.readlines()
urls = [process_line(line) for line in lines if process_line(line) is not None]
total_urls = len(urls)
log_progress(f"Found {total_urls} URLs to check")
failed_urls = []
failed_429 = 0
sec_total_size = 0
checked = 0
with ThreadPoolExecutor(max_workers=get_dynamic_workers(failed_429_count)) as executor:
with tqdm(total=total_urls, desc="Checking SEC sizes") as pbar:
future_to_url = {executor.submit(check_file_size, url): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
size = future.result()
if size is not None:
sec_total_size += size
checked += 1
else:
failed_urls.append(url)
if "429" in str(future.exception()):
failed_429 += 1
log_progress(f"Progress: Checking {url} {'successfully' if size is not None else 'with errors'}")
pbar.update(1)
log_progress(f"Checked {checked} file sizes successfully, Failed {len(failed_urls)}, 429 Errors {failed_429}, Total Size: {sec_total_size} bytes")
if failed_urls:
log_progress(f"Failed URLs due to 429: {len(failed_urls)}. Consider retrying these URLs after a delay.")
flush_log_buffer()
return sec_total_size, failed_429
try:
with open(idx_file, 'w') as master_file:
master_file.write("")
zip_files = [f for f in os.listdir(EDGAR_SOURCE_DIR) if f.endswith('.zip')]
total_files_all = 0
zip_total_sizes = 0
total_failed_429 = 0
while True:
selected_zips = get_user_selection(zip_files)
if not selected_zips and zip_files:
selected_zips = zip_files
if not selected_zips:
break
total_files = sum(len([process_line(line) for line in extract_idx_from_zip(os.path.join(EDGAR_SOURCE_DIR, zip)).split('\n') if process_line(line)]) for zip in selected_zips)
log_progress(f"Total files to check across {len(selected_zips)} ZIPs: {total_files}")
total_files_all += total_files
for zip_file in selected_zips:
zip_path = os.path.join(EDGAR_SOURCE_DIR, zip_file)
zip_total_size, failed_429 = process_zip(zip_path, total_failed_429)
zip_total_sizes += zip_total_size
total_failed_429 += failed_429
log_progress(f"SEC size checking pipeline completed. Total files checked across all ZIPs: {total_files_all}, Total Size: {zip_total_sizes} bytes, Total 429 Errors: {total_failed_429}")
except Exception as e:
log_progress(f"An error occurred: {e}")
try:
with open(idx_file, 'w') as master_file:
master_file.write("")
zip_files = [f for f in os.listdir(EDGAR_SOURCE_DIR) if f.endswith('.zip')]
total_failed_429 = 0
for zip_file in zip_files:
zip_path = os.path.join(EDGAR_SOURCE_DIR, zip_file)
try:
log_progress(f"Processing ZIP file: {zip_file}")
idx_file_path = extract_idx_from_zip(zip_path)
if idx_file_path:
remove_top_lines(idx_file_path)
with open(idx_file_path, 'r') as f:
content = f.read()
file_queue.put(content)
os.remove(idx_file_path)
log_progress(f"Successfully processed ZIP file: {zip_file}")
except Exception as e:
log_progress(f"Error processing {zip_file}: {e}")
def write_to_master_file():
while not file_queue.empty():
content = file_queue.get()
with open(idx_file, 'a') as master_file:
for line in content.split('\n'):
if line.strip():
master_file.write(line + '\n')
write_to_master_file()
log_progress("Compilation complete! uwu")
log_progress("Starting to compile URLs from ZIP files...")
start_time = time.time()
compile_urls(EDGAR_SOURCE_DIR, idx_file)
end_time = time.time()
log_progress(f"URL compilation completed in {end_time - start_time:.2f} seconds")
log_progress("Starting to check SEC file sizes...")
start_time = time.time()
sec_total_size, failed_429 = scrape_sec(idx_file, total_failed_429)
total_failed_429 += failed_429
end_time = time.time()
log_progress(f"SEC size checking completed in {end_time - start_time:.2f} seconds")
log_progress(f"Final Total Size for all URLs: {total_size_all} bytes, Total 429 Errors: {total_failed_429}")
flush_log_buffer()
except Exception as e:
log_progress(f"An error occurred: {e}")
def count():
from collections import defaultdict
import os
import glob
from zipfile import ZipFile
import pandas as pd
import logging
from datetime import datetime
import gc
gamecat_ascii()
# Set up logging
log_file = os.path.join(EQUITY_SOURCE_DIR, 'count_process.log')
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def parse_zips(search_term=None):
from collections import defaultdict
import os
import glob
from zipfile import ZipFile
import pandas as pd
import logging
from datetime import datetime
import gc
# Set up logging
log_file = os.path.join(EQUITY_SOURCE_DIR, 'count_process.log')
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Initialize output CSV with headers
master_csv_path = os.path.join(EQUITY_SOURCE_DIR, 'date_notional_summary_by_all_currencies.csv')
known_currencies = {'EUR', 'USD', 'JPY', 'CAD', 'AUD', 'CHF'}
all_currencies = set(known_currencies) # Will grow with unknown currencies
# Define headers based on search term
is_perpetual = search_term == '9999-12-31'
if is_perpetual:
initial_columns = ['Count', 'NEWT', 'MODI', 'Date'] + sorted(list(all_currencies))
else:
initial_columns = ['Count', 'Date'] + sorted(list(all_currencies))
# Initialize CSV
pd.DataFrame(columns=initial_columns).to_csv(master_csv_path, index=False, mode='w')
logging.info(f"Initialized CSV with {len(initial_columns)} headers: {master_csv_path}")
# Get and sort ZIP files
zip_files = sorted(glob.glob(os.path.join(EQUITY_SOURCE_DIR, '*.zip')), key=lambda x: os.path.basename(x))
total_files = len(zip_files)
files_processed = 0
# Load processed files from log to skip them if resuming
processed_files = set()
if os.path.exists(log_file):
with open(log_file, 'r') as f:
for line in f:
if 'Successfully processed' in line:
file_path = line.split('Successfully processed ')[-1].split(' at')[0].strip()
processed_files.add(os.path.normpath(file_path))
print(f"\nStarting to process {total_files} zip files...")
logging.info(f"Starting to process {total_files} zip files")
for index, zip_file in enumerate(zip_files, 1):
zip_file = os.path.normpath(zip_file)
if zip_file in processed_files:
print(f"Skipping already processed file {index}/{total_files}: {zip_file}")
logging.info(f"Skipped already processed file: {zip_file}")
continue
print(f"\nProcessing file {index}/{total_files}: {zip_file}")
logging.info(f"Processing file: {zip_file}")
# Initialize aggregates for this file
date_aggregates = {}
try:
with ZipFile(zip_file, 'r') as zip_ref:
csv_filename = zip_ref.namelist()[0] # Assuming one CSV per zip
print(f"Reading CSV file: {csv_filename}")
logging.info(f"Reading CSV file: {csv_filename}")
with zip_ref.open(csv_filename) as csv_file:
df = pd.read_csv(csv_file, low_memory=False)
# Check if required columns exist (C:2, L:11, M:12, T:19, V:21)
if len(df.columns) <= 21: # Need at least 22 columns
print(f"Warning: {zip_file} has fewer than 22 columns. Skipping.")
logging.warning(f"{zip_file} has fewer than 22 columns. Skipping.")
continue
column_c = df.iloc[:, 2] # Column C (TYPE: NEWT, MODI, etc.)
column_l = df.iloc[:, 11] # Column L (effective dates)
column_m = df.iloc[:, 12] # Column M (expiry dates)
column_t = df.iloc[:, 19] # Column T (notional amounts)
column_v = df.iloc[:, 21] # Column V (currency type)
# Convert notional amounts to numeric
column_t_numeric = pd.to_numeric(column_t, errors='coerce')
if column_t_numeric.isna().all():
print(f"Warning: No valid numeric values in column T of {zip_file}. Skipping.")
logging.warning(f"No valid numeric values in column T of {zip_file}. Skipping.")
continue
if is_perpetual:
# Filter for perpetual swaps (Column M == '9999-12-31')
perpetual_mask = column_m == '9999-12-31'
df_perpetual = df[perpetual_mask]
if df_perpetual.empty:
print(f"No perpetual swaps found in {zip_file}. Skipping.")
logging.info(f"No perpetual swaps found in {zip_file}.")
continue
# Aggregate by expiry date (Column M)
for date, type_val, notional, currency in zip(
df_perpetual.iloc[:, 12], # Column M
df_perpetual.iloc[:, 2], # Column C
pd.to_numeric(df_perpetual.iloc[:, 19], errors='coerce'), # Column T
df_perpetual.iloc[:, 21] # Column V
):
if pd.notna(date) and isinstance(date, str) and len(date) == 10:
try:
datetime.strptime(date, '%Y-%m-%d') # Validate date
if date not in date_aggregates:
date_aggregates[date] = {
'count': 0,
'newt': 0,
'modi': 0,
'notional': defaultdict(float)
}
date_aggregates[date]['count'] += 1
if type_val == 'NEWT':
date_aggregates[date]['newt'] += 1
elif type_val == 'MODI':
date_aggregates[date]['modi'] += 1
if pd.notna(notional):
currency = str(currency).strip().upper()
date_aggregates[date]['notional'][currency] += notional
all_currencies.add(currency)
except ValueError:
print(f"Warning: Invalid date format in {zip_file} for value {date}. Skipping.")
logging.warning(f"Invalid date format in {zip_file} for value {date}. Skipping.")
else:
print(f"Warning: Skipping invalid date {date} in {zip_file}.")
logging.warning(f"Skipping invalid date {date} in {zip_file}.")
else:
# Default mode: Aggregate by expiry date (Column M)
for date, notional, currency in zip(column_m, column_t_numeric, column_v):
if pd.notna(date) and isinstance(date, str) and len(date) == 10:
try:
datetime.strptime(date, '%Y-%m-%d')
if date not in date_aggregates:
date_aggregates[date] = {
'count': 0,
'notional': defaultdict(float)
}
date_aggregates[date]['count'] += 1
if pd.notna(notional):
currency = str(currency).strip().upper()
date_aggregates[date]['notional'][currency] += notional
all_currencies.add(currency)
except ValueError:
print(f"Warning: Invalid date format in {zip_file} for value {date}. Skipping.")
logging.warning(f"Invalid date format in {zip_file} for value {date}. Skipping.")
else:
print(f"Warning: Skipping invalid date {date} in {zip_file}.")
logging.warning(f"Skipping invalid date {date} in {zip_file}.")
# Convert aggregates to DataFrame for this file
if date_aggregates:
result_data = []
current_columns = ['Count', 'NEWT', 'MODI', 'Date'] if is_perpetual else ['Count', 'Date']
current_columns += sorted(list(all_currencies))
for date, data in date_aggregates.items():
row = {'Date': date, 'Count': data['count']}
if is_perpetual:
row['NEWT'] = data['newt']
row['MODI'] = data['modi']
# Fill all known currencies with 0 if not present
for currency in all_currencies:
row[currency] = data['notional'].get(currency, 0)
result_data.append(row)
result_df = pd.DataFrame(result_data, columns=current_columns)
# Append to CSV with updated header if new currencies are detected
if os.path.exists(master_csv_path):
existing_df = pd.read_csv(master_csv_path, nrows=0)
existing_columns = set(existing_df.columns)
new_columns = set(current_columns) - existing_columns
if new_columns:
logging.info(f"Detected new currencies: {new_columns}. Updating CSV header.")
# Append new columns with 0 to existing data
with open(master_csv_path, 'a') as f:
for _ in range(len(existing_df.index)):
f.write(',' + ','.join(['0'] * len(new_columns)) + '\n')
# Rewrite header with all columns
result_df.to_csv(master_csv_path, index=False, mode='w')
else:
result_df.to_csv(master_csv_path, index=False, header=False, mode='a')
else:
result_df.to_csv(master_csv_path, index=False, mode='w')
print(f"Appended results for {zip_file} to {master_csv_path}")
logging.info(f"Appended results for {zip_file} to {master_csv_path}")
# Log number of unique dates before clearing
num_dates = len(date_aggregates)
# Clear memory
del result_df
del date_aggregates
gc.collect()
else:
num_dates = 0
logging.info(f"No valid data aggregated for {zip_file}.")
files_processed += 1
print(f"Processed {zip_file}. Current aggregates: {num_dates} unique dates.")
logging.info(f"Successfully processed {zip_file} at {datetime.now()}")
except Exception as e:
logging.error(f"Error processing {zip_file}: {e}")
print(f"Error occurred while processing {zip_file}. Continuing to next file.")
print(f"\nProcessing complete. Total files processed: {files_processed}/{total_files}")
logging.info(f"Processing complete. Total files processed: {files_processed}/{total_files}")
# Load final CSV to compute totals
if files_processed > 0:
final_df = pd.read_csv(master_csv_path, on_bad_lines='skip', encoding='utf-8')
if not final_df.empty:
total_count = final_df['Count'].sum()
total_notional_by_currency = {c: final_df[c].sum() for c in all_currencies if c in final_df.columns}
print(f"Total count of dates: {total_count}")
if is_perpetual:
total_newt = final_df['NEWT'].sum() if 'NEWT' in final_df.columns else 0
total_modi = final_df['MODI'].sum() if 'MODI' in final_df.columns else 0
print(f"Total NEWT count: {total_newt}")
print(f"Total MODI count: {total_modi}")
for currency, total in total_notional_by_currency.items():
print(f"Total {currency} notional: {total:.2f}")
logging.info(f"Column M count and Notional T sum by currency completed. Total count: {total_count}")
else:
print("No valid data found.")
logging.info("No valid data found for aggregation.")
return final_df
else:
print("No valid data found.")
logging.info("No valid data found for aggregation.")
return pd.DataFrame(columns=initial_columns)
print("Enter a date to filter (e.g., 2023-01-01) or 9999-12-31 for perpetual swaps, or 'q' to quit:")
user_input = input().strip()
if user_input.lower() != 'q':
search_term = user_input if user_input else None
result_df = parse_zips(search_term=search_term)
if not result_df.empty:
master_csv_path = os.path.join(EQUITY_SOURCE_DIR, 'date_notional_summary_by_all_currencies.csv')
print(f"\nSaving results to: {master_csv_path}")
if search_term == '9999-12-31':
print(f"CSV columns: A (Count), B (NEWT), C (MODI), D (Date), followed by columns for each currency (e.g., E (AUD), F (CAD), ...)")
else:
print(f"CSV columns: A (Count), B (Date), followed by columns for each currency (e.g., C (AUD), D (CAD), ...)")
else:
print("No data to save.")
else:
print("Exiting script.")
def count2():
from collections import defaultdict
gamecat_ascii()
# Set up logging
log_file = os.path.join(EQUITY_SOURCE_DIR, 'count2_process.log')
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def parse_zips():
from collections import defaultdict
# Set up logging
log_file = os.path.join(EQUITY_SOURCE_DIR, 'count2_process.log')
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Initialize output CSV with headers
master_csv_path = os.path.join(EQUITY_SOURCE_DIR, 'date_notional_summary_by_all_currencies_count2.csv')
known_currencies = {'EUR', 'USD', 'JPY', 'CAD', 'AUD', 'CHF'}
all_currencies = set(known_currencies) # Will grow with unknown currencies
# Define headers
initial_columns = ['Count', 'Date'] + sorted(list(all_currencies))
# Initialize CSV if it doesn't exist
if not os.path.exists(master_csv_path):
pd.DataFrame(columns=initial_columns).to_csv(master_csv_path, index=False, mode='w')
logging.info(f"Initialized new CSV with {len(initial_columns)} headers: {master_csv_path}")
else:
logging.info(f"Using existing CSV: {master_csv_path}")
# Get and sort ZIP files
zip_files = sorted(glob.glob(os.path.join(EQUITY_SOURCE_DIR, '*.zip')), key=lambda x: os.path.basename(x))
total_files = len(zip_files)
files_processed = 0
# Load processed files from log to skip them if resuming
processed_files = set()
if os.path.exists(log_file):
with open(log_file, 'r') as f:
for line in f:
if 'Successfully processed' in line:
file_path = line.split('Successfully processed ')[-1].split(' at')[0].strip()
processed_files.add(os.path.normpath(file_path))
print(f"\nStarting to process {total_files} zip files...")
logging.info(f"Starting to process {total_files} zip files")
for index, zip_file in enumerate(zip_files, 1):
zip_file = os.path.normpath(zip_file)
if zip_file in processed_files:
print(f"Skipping already processed file {index}/{total_files}: {zip_file}")
logging.info(f"Skipped already processed file: {zip_file}")
continue
print(f"\nProcessing file {index}/{total_files}: {zip_file}")
logging.info(f"Processing file: {zip_file}")
# Initialize aggregates for this file
date_aggregates = defaultdict(lambda: {'count': 0, 'notional': defaultdict(float)})
try:
with ZipFile(zip_file, 'r') as zip_ref:
csv_filename = zip_ref.namelist()[0] # Assuming one CSV per zip
print(f"Reading CSV file: {csv_filename}")
logging.info(f"Reading CSV file: {csv_filename}")
with zip_ref.open(csv_filename) as csv_file:
df = pd.read_csv(csv_file, low_memory=False)
# Check if required columns exist (L:11, T:19, V:21)
if len(df.columns) <= 21: # Need at least 22 columns
print(f"Warning: {zip_file} has fewer than 22 columns. Skipping.")
logging.warning(f"{zip_file} has fewer than 22 columns. Skipping.")
continue
column_l = df.iloc[:, 11] # Column L (effective dates)
column_t = df.iloc[:, 19] # Column T (notional amounts)
column_v = df.iloc[:, 21] # Column V (currency type)
# Convert notional amounts to numeric
column_t_numeric = pd.to_numeric(column_t, errors='coerce')
if column_t_numeric.isna().all():
print(f"Warning: No valid numeric values in column T of {zip_file}. Skipping.")
logging.warning(f"No valid numeric values in column T of {zip_file}. Skipping.")
continue
# Aggregate by effective date (Column L) for this file
for date, notional, currency in zip(column_l, column_t_numeric, column_v):
if pd.notna(date) and isinstance(date, str) and len(date) == 10:
try:
datetime.strptime(date, '%Y-%m-%d')
date_aggregates[date]['count'] += 1
if pd.notna(notional):
currency = str(currency).strip().upper()
date_aggregates[date]['notional'][currency] += notional
all_currencies.add(currency)
except ValueError:
print(f"Warning: Invalid date format in {zip_file} for value {date}. Skipping.")
logging.warning(f"Invalid date format in {zip_file} for value {date}. Skipping.")
else:
print(f"Warning: Skipping invalid date {date} in {zip_file}.")
logging.warning(f"Skipping invalid date {date} in {zip_file}.")
# Convert aggregates to DataFrame for this file
if date_aggregates:
result_data = []
for date, data in date_aggregates.items():
row = {'Date': date, 'Count': data['count']}
for currency in all_currencies:
row[currency] = data['notional'].get(currency, 0)
result_data.append(row)
new_df = pd.DataFrame(result_data, columns=initial_columns)
# Load existing data and append new data
if os.path.exists(master_csv_path):
existing_df = pd.read_csv(master_csv_path, on_bad_lines='skip', encoding='utf-8')
if not existing_df.empty:
combined_df = pd.concat([existing_df, new_df]).drop_duplicates(subset=['Date'], keep='first')
else:
combined_df = new_df
else:
combined_df = new_df
# Write combined data back to CSV
combined_df.to_csv(master_csv_path, index=False, mode='w')
print(f"Appended results for {zip_file} to: {master_csv_path}")
logging.info(f"Appended results for {zip_file} to: {master_csv_path}")
# Log number of unique dates before clearing
num_dates = len(date_aggregates)
# Clear memory
del date_aggregates
del new_df
del combined_df
gc.collect()
else:
num_dates = 0
logging.info(f"No valid data aggregated for {zip_file}.")
files_processed += 1
print(f"Processed {zip_file}. Current aggregates: {num_dates} unique dates.")
logging.info(f"Successfully processed {zip_file} at {datetime.now()}")
except Exception as e:
logging.error(f"Error processing {zip_file}: {e}")
print(f"Error occurred while processing {zip_file}. Continuing to next file.")
print(f"\nProcessing complete. Total files processed: {files_processed}/{total_files}")
logging.info(f"Processing complete. Total files processed: {files_processed}/{total_files}")
# Load final CSV to compute totals
if files_processed > 0:
final_df = pd.read_csv(master_csv_path, on_bad_lines='skip', encoding='utf-8')
if not final_df.empty:
total_count = final_df['Count'].sum()
total_notional_by_currency = {c: final_df[c].sum() for c in all_currencies if c in final_df.columns}
print(f"Total count of dates: {total_count}")
for currency, total in total_notional_by_currency.items():
print(f"Total {currency} notional: {total:.2f}")
logging.info(f"Column L count and Notional T sum by currency completed. Total count: {total_count}")
else:
print("No valid data found.")
logging.info("No valid data found for aggregation.")
return final_df
else:
print("No valid data found.")
logging.info("No valid data found for aggregation.")
return pd.DataFrame(columns=initial_columns)
# Start processing immediately
result_df = parse_zips()
if not result_df.empty:
master_csv_path = os.path.join(EQUITY_SOURCE_DIR, 'date_notional_summary_by_all_currencies_count2.csv')
print(f"\nSaving results to: {master_csv_path}")
print(f"CSV columns: A (Count), B (Date), followed by columns for each currency (e.g., C (AUD), D (CAD), ...)")
else:
print("No data to save.")
if __name__ == "__main__":
check_and_install_modules()
import_modules()
gamecock_ascii()
# Display numbered prompt for archive type selection
print("Which archives would you like to download?")
print("6: N-PORT archives")
print("9: N-CEN archives")
print("4: Form D archives")
print("2: NMFP archives")
print("0: 13F archives")
print("g: SEC Credit Swap archives")
print("g2: SEC Equity Swap archives")
print("m: CFTC Credit Swap archives")
print("m2: CFTC Equity Swap archives")
print("m3: CFTC Commodity Swap archives")
print("m4: CFTC Foreign Exchange Swap archives")
print("m5: CFTC Interest Rate Swap archives")
print("e: Edgar archives")
print("r: Exchange volume archives")
print("i: Insider trading archives")
print("c: Codex Of Instruments")
print("a: Allyourbasearebelongtous- scrape every edgar filing ever.")
print("n: create an N-CSR archive from edgar indexes")
query = input("Enter the number corresponding to your choice: ").strip()
if query.isdigit() and len(query) == 7:
process_cik(query) # Call it directly
if query == 'chain':
import os
from sec_api import QueryApi
api_key = os.getenv('SEC_API_KEY')
if not api_key:
raise ValueError("SEC_API_KEY environment variable not set")
query_api = QueryApi(api_key=api_key)
search_term = "GameStop"
cusip = "36467W109"
# Assume filings are downloaded to ./edgar/{NPORT,NCSR,NCEN}
nport_dir = './edgar/NPORT'
ncsr_dir = './edgar/NCSR'
ncen_dir = './edgar/NCEN'
# Ensure directories exist
for dir_path in [nport_dir, ncsr_dir, ncen_dir]:
os.makedirs(dir_path, exist_ok=True)
# Run unified parsing
result_df = parse_all_filings(nport_dir, ncsr_dir, ncen_dir, search_term, cusip)
if not result_df.empty:
print(result_df.head())
else:
print("No data found for GameStop swaps.")
if query == 'count':
count()
if query == 'count2':
count2()
if query == '6':
download_nport_archives()
search_keyword = input("Enter the keyword to search for (e.g., 'Gamestop'): ").strip() or 'gamestop'
verbose = input("Enable verbose mode? (y/n): ").lower() == 'y'
search_nport(search_keyword, verbose)
elif query == '9':
download_ncen_archives()
search_keywords = input("Enter the keyword to search for (e.g., 'Gamestop'): ").strip() or 'gamestop'
verbose = input("Enable verbose mode? (y/n): ").lower() == 'y'
search_ncen(search_keywords, verbose=verbose)
elif query == '4':
download_formd_archives()
elif query == '2':
download_nmfp_archives()
search_keywords = input("Enter the keyword to search for (e.g., 'Gamestop'): ").strip() or 'gamestop'
verbose = input("Enable verbose mode? (y/n): ").lower() == 'y'
search_nmfp(search_keywords, verbose=False)
elif query == '0':
download_13F_archives()
elif query == 'g':
download_credit_archives()
elif query == 'g2':
download_equities_archives()
elif query == 'm':
download_cftc_credit_archives()
elif query == 'm2':
download_cftc_equities_archives()
elif query == 'm3':
download_cftc_commodities_archives()
elif query == 'm4':
download_cftc_forex_archives()
elif query == 'm5':
download_cftc_rates_archives()
elif query == 'e':
# Download Edgar Archives
download_edgar_archives()
while True:
print("\nWhat would you like to do with the downloaded Edgar archives?")
print("1: Search the archives for a company name")
print("2: Scrape Edgar for filings based on a CIK")
print("3: Download filings using existing search results")
print("0: Exit to main menu")
choice = input("Enter your choice (0-3): ").strip()
if choice == '0':
break
elif choice == '1':
edgar_second()
elif choice == '2':
cik = input("Enter the CIK to scrape (e.g., '0000320193' for Apple): ").strip()
if cik.isdigit() and 1 <= len(cik) <= 10: # Validate CIK length
# Use original functions for scraping
sec_url_full = f"https://www.sec.gov/Archives/edgar/data/{cik}/"
print(f"Embarking on the quest for {sec_url_full}...")
base_download_dir = EDGAR_SOURCE_DIR
folder_name = sec_url_full.rstrip('/').split('/')[-1]
full_download_directory = os.path.join(base_download_dir, folder_name)
print(f"Full download directory: {full_download_directory} - Here lies our treasure vault")
subdirectories = scrape_subdirectories(sec_url_full)
if not subdirectories:
print(f"No hidden chambers found at {sec_url_full}. Exiting this quest.")
continue
full_subdirectory_urls = [f"{sec_url_full.rstrip('/')}/{sub}" for sub in subdirectories]
sanitized_file_path = 'sanitized_subdirectories.txt'
with open(sanitized_file_path, 'w') as sanitized_file:
sanitized_file.write('\n'.join(full_subdirectory_urls))
print(f"Sanitized list created: {sanitized_file_path} - The map to hidden chambers is drawn")
output_file_path = 'completed_subdirectories.txt'
if os.path.exists(output_file_path):
with open(output_file_path, 'r') as file:
completed_subdirectories = [line.strip() for line in file]
else:
completed_subdirectories = []
os.makedirs(full_download_directory, exist_ok=True)
print(f"Download directory created: {full_download_directory} - The vault is ready to receive its riches")
total_subdirectories = len(full_subdirectory_urls)
processed_subdirectories = len(completed_subdirectories)
rows = [] # List to store download results
for subdirectory in full_subdirectory_urls:
if subdirectory in completed_subdirectories:
print(f"Skipping already plundered chamber: {subdirectory}")
continue
print(f"Venturing into the chamber: {subdirectory}")
try:
soup = fetch_directory(subdirectory)
txt_links = extract_txt_links(soup)
print(f"Found txt links in {subdirectory}: {txt_links} - Scrolls of lore discovered")
for txt_link in txt_links:
txt_url = "https://www.sec.gov" + txt_link
print(f"Downloading txt file: {txt_url} - Securing the scroll")
download_success = download_file(txt_url, full_download_directory)
download_location = os.path.join(full_download_directory, os.path.basename(txt_url)) if download_success else 'Failed'
rows.append([cik, txt_url, download_location, 'Success' if download_success else 'Failed'])
if download_success:
with open(output_file_path, 'a') as completed_file:
completed_file.write(subdirectory + '\n')
break
time.sleep(0.1) # Avoid rate limiting
except Exception as e:
print(f"Failed to access {subdirectory}: {e} - Beware, for this path is cursed!")
with open('error_log.txt', 'a') as error_log_file:
error_log_file.write(f"Failed to access {subdirectory}: {e}\n")
processed_subdirectories += 1
print(f"Progress: {processed_subdirectories}/{total_subdirectories} chambers explored.")
remaining_subdirectories = [sub for sub in full_subdirectory_urls if sub not in completed_subdirectories]
with open(sanitized_file_path, 'w') as sanitized_file:
sanitized_file.write('\n'.join(remaining_subdirectories))
# Create CSV and HTML output
csv_file = os.path.join(EDGAR_SOURCE_DIR, f"{cik}_scraped_results.csv")
with open(csv_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['CIK', 'URL', 'Download Location', 'Status'])
writer.writerows(rows)
html_file = os.path.join(EDGAR_SOURCE_DIR, f"{cik}_scraped_index.html")
with open(html_file, 'w', encoding='utf-8') as htmlfile:
htmlfile.write('<!DOCTYPE html><html><head><title>Scraped Index</title></head><body><table border="1">')
htmlfile.write('<tr>' + ''.join(f'<th>{h}</th>' for h in ['CIK', 'URL', 'Download Location', 'Status']) + '</tr>')
for row in rows:
htmlfile.write('<tr>')
for item in row:
if item.startswith('./edgar') or item == 'Failed':
htmlfile.write(f'<td><a href="file://{os.path.abspath(item)}">{item}</a></td>' if item != 'Failed' else f'<td>{item}</td>')
else:
htmlfile.write(f'<td><a href="{item}">{item}</a></td>')
htmlfile.write('</tr>')
htmlfile.write('</table></body></html>')
print(f"Scraping complete. Results saved to {csv_file} and {html_file}")
else:
print("Invalid CIK. Please enter a 10- or 12-digit number.")
elif choice == '3':
csv_files = list_csv_files(EDGAR_SOURCE_DIR)
if not csv_files:
print("No CSV files found. Please search the archives first (option 1).")
continue
print("Available CSV files (without '_results.csv'):")
for i, file in enumerate(csv_files):
print(f"{i + 1}: {file[:-len('_results.csv')]}")
while True:
file_choice = input("Select a CSV file by number (or '0' to return to menu): ").strip()
if file_choice == '0':
break
try:
file_choice = int(file_choice)
if 1 <= file_choice <= len(csv_files):
csv_file = csv_files[file_choice - 1]
method = input("Use 'url' or 'crawl' method: ").strip().lower()
if method in ['url', 'crawl']:
edgar_third(csv_file, method)
repeat = input("Process another CSV? (yes/no): ").strip().lower()
if repeat != 'yes':
break
else:
print("Please enter 'url' or 'crawl'.")
else:
print("Invalid choice.")
except ValueError:
print("Please enter a valid number.")
else:
print("Invalid choice. Please enter 0-3.")
elif query == 'r':
download_exchange_archives()
elif query == 'i':
download_insider_archives()
elif query == 'c':
codex()
elif query == 'a':
allyourbasearebelongtous()
elif query == 'edgartotal':
edgartotal()
elif query == 'n':
download_ncsr_filings()
else:
print("Invalid input. Please enter one of the following: 69420gmerica.")
exit(1)