text2sql / core /clients /mutual_fund.py
ns-devel
Text2SQL app
38171fa
"""
"""
import logging
from django.conf import settings
import requests
from bs4 import BeautifulSoup
from core.models import MutualFund
from core.constants import MONEYCONTROL_TOPFUNDS_URL
from data_pipeline.interfaces.api_client import DataClient
logger = logging.getLogger(__name__)
settings.MORNINGSTAR_API_HEADERS = {
"X-RapidAPI-Key": settings.MORNINGSTAR_KEY,
"X-RapidAPI-Host": settings.MORNINGSTAR_HOST,
}
class MFList(DataClient):
model = MutualFund
# Monring Star List MF url
api_url = "https://lt.morningstar.com/api/rest.svc/g9vi2nsqjb/security/screener?page=1&pageSize=15000&sortOrder=name%20asc&outputType=json&version=1&languageId=en&currencyId=INR&universeIds=FOIND%24%24ALL%7CFCIND%24%24ALL&securityDataPoints=secId%2ClegalName%2CclosePrice%2CclosePriceDate%2Cyield_M12%2CongoingCharge%2CcategoryName%2CMedalist_RatingNumber%2CstarRatingM255%2CreturnD1%2CreturnW1%2CreturnM1%2CreturnM3%2CreturnM6%2CreturnM0%2CreturnM12%2CreturnM36%2CreturnM60%2CreturnM120%2CmaxFrontEndLoad%2CmanagerTenure%2CmaxDeferredLoad%2CexpenseRatio%2Cisin%2CinitialPurchase%2CfundTnav%2CequityStyleBox%2CbondStyleBox%2CaverageMarketCapital%2CaverageCreditQualityCode%2CeffectiveDuration%2CmorningstarRiskM255%2CalphaM36%2CbetaM36%2Cr2M36%2CstandardDeviationM36%2CsharpeM36%2CtrackRecordExtension&filters=&term="
def __init__(self) -> None:
self.api_response = None
self.transformed_data = None
def extract(self):
super().extract()
logger.info("Calling Morningstar API")
response = requests.get(self.api_url)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse JSON response
self.api_response = response.json()
logger.info(
f'Morningstar API response received {len(self.api_response["rows"])} funds'
)
else:
logger.info("Received status code: {response.status_code}")
logger.info(response.json())
def transform(self):
"""
Transform the data to the format required by the model
"""
super().transform()
self.transformed_data = [
{
"fund_name": fund["legalName"],
"isin_number": fund.get("isin"),
"security_id": fund["secId"],
"data": {"list_info": fund},
}
for fund in self.api_response["rows"]
]
def load(self):
"""
Load the data into the database
"""
create_count = 0
update_count = 0
for data_dict in self.transformed_data:
try:
mf = self.model.objects.get(isin_number=data_dict["isin_number"])
mf.data.update(data_dict["data"])
mf.save()
update_count += 1
except self.model.DoesNotExist:
mf = self.model(**data_dict)
mf.save()
create_count += 1
logger.info(
"Created %s records; Updated %s records", create_count, update_count
)
class MFQuote(DataClient):
model = MutualFund
# Monring Star get quote url
api_url = f"https://{settings.MORNINGSTAR_HOST}/etf/get-quote"
def __init__(self, isin) -> None:
self.api_response = None # {"quotes": None, "holdings": None}
self.transformed_data = None
self.isin = isin
self.mf = self.model.objects.get(isin_number=self.isin)
def extract(self):
logger.info(f"Calling Morningstar Quote API for quotes with isin {self.isin}")
querystring = {"securityId": self.mf.security_id}
response = requests.get(
self.api_url, headers=settings.MORNINGSTAR_API_HEADERS, params=querystring
)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse JSON response
self.api_response = response.json()
else:
logger.info(f"API response: %s", response.status_code)
response.raise_for_status()
def load(self):
self.mf.data.update({"quotes": self.transformed_data})
self.mf.save()
logger.info(f"Successfully stored data of quotes for {self.mf.fund_name}")
class MFHoldings(DataClient):
model = MutualFund
api_url = f"https://{settings.MORNINGSTAR_HOST}/etf/portfolio/get-holdings"
def __init__(self, isin) -> None:
self.api_response = None
self.transformed_data = None
self.isin = isin
self.mf = self.model.objects.get(isin_number=self.isin)
def extract(self):
querystring = {"securityId": self.mf.security_id}
response = requests.get(
self.api_url, headers=settings.MORNINGSTAR_API_HEADERS, params=querystring
)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse JSON response
self.api_response = response.json()
else:
logger.info(f"received status code {response.status_code} for {self.isin}")
logger.debug(response.content)
response.raise_for_status()
def load(self):
self.mf.data.update({"holdings": self.transformed_data})
self.mf.save()
logger.info(f"Successfully stored data of holdings for {self.mf.fund_name}")
class MFRiskMeasures(DataClient):
model = MutualFund
api_url = (
f"https://{settings.MORNINGSTAR_HOST}/etf/risk/get-risk-volatility-measures"
)
def __init__(self, isin) -> None:
self.api_response = None
self.isin = isin
self.mf = self.model.objects.get(isin_number=self.isin)
def extract(self):
querystring = {"securityId": self.mf.security_id}
response = requests.get(
self.api_url, headers=settings.MORNINGSTAR_API_HEADERS, params=querystring
)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse JSON response
self.api_response = response.json()
else:
logger.info(response.json())
response.raise_for_status()
def load(self):
self.mf.data.update({"risk_measures": self.transformed_data})
self.mf.save()
logger.info(
f"Successfully stored data of risk measures for {self.mf.fund_name}"
)
class MFRanking(DataClient):
api_url = MONEYCONTROL_TOPFUNDS_URL
model = MutualFund
def __init__(self) -> None:
self.api_response = None
self.transformed_data = None
def extract(self) -> None:
"""
Fetches the top mutual funds from MoneyControl website based on their returns and
returns a tuple containing lists of fund names, fund types, CRISIL ranks,
INF numbers, and AUM data of top mutual funds.
"""
super().extract()
logger.info("Fetching top mutual funds from MoneyControl website")
response = requests.get(self.api_url)
# Check if the request was successful (status code 200)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Find all rows containing fund information
fund_rows = soup.find_all("tr", class_=lambda x: x and "INF" in x)
logger.info("Found %s rows", len(fund_rows))
fund_details = []
# Extract fund name from each row of sponsored funds
for row in fund_rows:
columns = row.find_all("td")
fund_name = columns[0].text.strip()
fund_type = columns[2].text.strip()
crisil_rank = columns[3].text.strip()
aum = columns[4].text.strip()
isin_number = row["class"][0]
fund_details.append(
{
"fund_name": fund_name,
"fund_type": fund_type,
"crisil_rank": crisil_rank,
"isin_number": isin_number,
"aum": aum,
}
)
self.api_response = fund_details
def load(self) -> None:
"""
Load the data into the database
"""
# clear the rank field
MutualFund.objects.exclude(rank=None).update(rank=None)
for rank, fund_details in enumerate(self.transformed_data, 1):
mf = MutualFund.objects.get(isin_number=fund_details["isin_number"])
mf.crisil_rank = (
fund_details["crisil_rank"] if fund_details["crisil_rank"] != "-" else 0
)
mf.rank = rank
mf.aum = float(fund_details["aum"].replace(",", ""))
mf.save()
logger.info(
f"Updated {rank=} {mf.fund_name} | {fund_details=} {fund_details['crisil_rank']=} {fund_details['aum']=}"
)