|
""" |
|
|
|
""" |
|
import logging |
|
|
|
from django.conf import settings |
|
|
|
import requests |
|
from bs4 import BeautifulSoup |
|
from core.models import MutualFund |
|
from core.constants import MONEYCONTROL_TOPFUNDS_URL |
|
from data_pipeline.interfaces.api_client import DataClient |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
settings.MORNINGSTAR_API_HEADERS = { |
|
"X-RapidAPI-Key": settings.MORNINGSTAR_KEY, |
|
"X-RapidAPI-Host": settings.MORNINGSTAR_HOST, |
|
} |
|
|
|
|
|
class MFList(DataClient): |
|
model = MutualFund |
|
|
|
|
|
api_url = "https://lt.morningstar.com/api/rest.svc/g9vi2nsqjb/security/screener?page=1&pageSize=15000&sortOrder=name%20asc&outputType=json&version=1&languageId=en¤cyId=INR&universeIds=FOIND%24%24ALL%7CFCIND%24%24ALL&securityDataPoints=secId%2ClegalName%2CclosePrice%2CclosePriceDate%2Cyield_M12%2CongoingCharge%2CcategoryName%2CMedalist_RatingNumber%2CstarRatingM255%2CreturnD1%2CreturnW1%2CreturnM1%2CreturnM3%2CreturnM6%2CreturnM0%2CreturnM12%2CreturnM36%2CreturnM60%2CreturnM120%2CmaxFrontEndLoad%2CmanagerTenure%2CmaxDeferredLoad%2CexpenseRatio%2Cisin%2CinitialPurchase%2CfundTnav%2CequityStyleBox%2CbondStyleBox%2CaverageMarketCapital%2CaverageCreditQualityCode%2CeffectiveDuration%2CmorningstarRiskM255%2CalphaM36%2CbetaM36%2Cr2M36%2CstandardDeviationM36%2CsharpeM36%2CtrackRecordExtension&filters=&term=" |
|
|
|
def __init__(self) -> None: |
|
self.api_response = None |
|
self.transformed_data = None |
|
|
|
def extract(self): |
|
|
|
super().extract() |
|
|
|
logger.info("Calling Morningstar API") |
|
response = requests.get(self.api_url) |
|
|
|
|
|
if response.status_code == 200: |
|
|
|
|
|
self.api_response = response.json() |
|
logger.info( |
|
f'Morningstar API response received {len(self.api_response["rows"])} funds' |
|
) |
|
|
|
else: |
|
logger.info("Received status code: {response.status_code}") |
|
logger.info(response.json()) |
|
|
|
def transform(self): |
|
""" |
|
Transform the data to the format required by the model |
|
""" |
|
|
|
super().transform() |
|
|
|
self.transformed_data = [ |
|
{ |
|
"fund_name": fund["legalName"], |
|
"isin_number": fund.get("isin"), |
|
"security_id": fund["secId"], |
|
"data": {"list_info": fund}, |
|
} |
|
for fund in self.api_response["rows"] |
|
] |
|
|
|
def load(self): |
|
""" |
|
Load the data into the database |
|
""" |
|
|
|
create_count = 0 |
|
update_count = 0 |
|
for data_dict in self.transformed_data: |
|
try: |
|
mf = self.model.objects.get(isin_number=data_dict["isin_number"]) |
|
mf.data.update(data_dict["data"]) |
|
mf.save() |
|
update_count += 1 |
|
except self.model.DoesNotExist: |
|
mf = self.model(**data_dict) |
|
mf.save() |
|
create_count += 1 |
|
|
|
logger.info( |
|
"Created %s records; Updated %s records", create_count, update_count |
|
) |
|
|
|
|
|
class MFQuote(DataClient): |
|
model = MutualFund |
|
|
|
|
|
api_url = f"https://{settings.MORNINGSTAR_HOST}/etf/get-quote" |
|
|
|
def __init__(self, isin) -> None: |
|
self.api_response = None |
|
self.transformed_data = None |
|
self.isin = isin |
|
self.mf = self.model.objects.get(isin_number=self.isin) |
|
|
|
def extract(self): |
|
|
|
logger.info(f"Calling Morningstar Quote API for quotes with isin {self.isin}") |
|
querystring = {"securityId": self.mf.security_id} |
|
|
|
response = requests.get( |
|
self.api_url, headers=settings.MORNINGSTAR_API_HEADERS, params=querystring |
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
|
self.api_response = response.json() |
|
else: |
|
logger.info(f"API response: %s", response.status_code) |
|
response.raise_for_status() |
|
|
|
def load(self): |
|
self.mf.data.update({"quotes": self.transformed_data}) |
|
self.mf.save() |
|
logger.info(f"Successfully stored data of quotes for {self.mf.fund_name}") |
|
|
|
|
|
class MFHoldings(DataClient): |
|
model = MutualFund |
|
api_url = f"https://{settings.MORNINGSTAR_HOST}/etf/portfolio/get-holdings" |
|
|
|
def __init__(self, isin) -> None: |
|
self.api_response = None |
|
self.transformed_data = None |
|
self.isin = isin |
|
self.mf = self.model.objects.get(isin_number=self.isin) |
|
|
|
def extract(self): |
|
|
|
querystring = {"securityId": self.mf.security_id} |
|
|
|
response = requests.get( |
|
self.api_url, headers=settings.MORNINGSTAR_API_HEADERS, params=querystring |
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
|
self.api_response = response.json() |
|
else: |
|
logger.info(f"received status code {response.status_code} for {self.isin}") |
|
logger.debug(response.content) |
|
response.raise_for_status() |
|
|
|
def load(self): |
|
self.mf.data.update({"holdings": self.transformed_data}) |
|
self.mf.save() |
|
logger.info(f"Successfully stored data of holdings for {self.mf.fund_name}") |
|
|
|
|
|
class MFRiskMeasures(DataClient): |
|
model = MutualFund |
|
api_url = ( |
|
f"https://{settings.MORNINGSTAR_HOST}/etf/risk/get-risk-volatility-measures" |
|
) |
|
|
|
def __init__(self, isin) -> None: |
|
self.api_response = None |
|
self.isin = isin |
|
self.mf = self.model.objects.get(isin_number=self.isin) |
|
|
|
def extract(self): |
|
|
|
querystring = {"securityId": self.mf.security_id} |
|
|
|
response = requests.get( |
|
self.api_url, headers=settings.MORNINGSTAR_API_HEADERS, params=querystring |
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
|
self.api_response = response.json() |
|
else: |
|
logger.info(response.json()) |
|
response.raise_for_status() |
|
|
|
def load(self): |
|
self.mf.data.update({"risk_measures": self.transformed_data}) |
|
self.mf.save() |
|
logger.info( |
|
f"Successfully stored data of risk measures for {self.mf.fund_name}" |
|
) |
|
|
|
|
|
class MFRanking(DataClient): |
|
|
|
api_url = MONEYCONTROL_TOPFUNDS_URL |
|
model = MutualFund |
|
|
|
def __init__(self) -> None: |
|
self.api_response = None |
|
self.transformed_data = None |
|
|
|
def extract(self) -> None: |
|
""" |
|
Fetches the top mutual funds from MoneyControl website based on their returns and |
|
returns a tuple containing lists of fund names, fund types, CRISIL ranks, |
|
INF numbers, and AUM data of top mutual funds. |
|
""" |
|
super().extract() |
|
|
|
logger.info("Fetching top mutual funds from MoneyControl website") |
|
response = requests.get(self.api_url) |
|
|
|
|
|
response.raise_for_status() |
|
|
|
soup = BeautifulSoup(response.text, "html.parser") |
|
|
|
|
|
fund_rows = soup.find_all("tr", class_=lambda x: x and "INF" in x) |
|
logger.info("Found %s rows", len(fund_rows)) |
|
|
|
fund_details = [] |
|
|
|
|
|
for row in fund_rows: |
|
columns = row.find_all("td") |
|
fund_name = columns[0].text.strip() |
|
fund_type = columns[2].text.strip() |
|
crisil_rank = columns[3].text.strip() |
|
aum = columns[4].text.strip() |
|
isin_number = row["class"][0] |
|
|
|
fund_details.append( |
|
{ |
|
"fund_name": fund_name, |
|
"fund_type": fund_type, |
|
"crisil_rank": crisil_rank, |
|
"isin_number": isin_number, |
|
"aum": aum, |
|
} |
|
) |
|
|
|
self.api_response = fund_details |
|
|
|
def load(self) -> None: |
|
""" |
|
Load the data into the database |
|
""" |
|
|
|
|
|
MutualFund.objects.exclude(rank=None).update(rank=None) |
|
|
|
for rank, fund_details in enumerate(self.transformed_data, 1): |
|
mf = MutualFund.objects.get(isin_number=fund_details["isin_number"]) |
|
mf.crisil_rank = ( |
|
fund_details["crisil_rank"] if fund_details["crisil_rank"] != "-" else 0 |
|
) |
|
mf.rank = rank |
|
mf.aum = float(fund_details["aum"].replace(",", "")) |
|
mf.save() |
|
logger.info( |
|
f"Updated {rank=} {mf.fund_name} | {fund_details=} {fund_details['crisil_rank']=} {fund_details['aum']=}" |
|
) |
|
|