Python Azure Durable Function Fan out/fan in pattern
Executing multiple functions concurrently and aggregate the results
In the previous story: Not the usual Python Azure durable function first example I implemented a simple Azure durable function in Python to show you some basics concepts.
In this one we will talk about the Fan-out/fan-in pattern which means to executing multiple functions concurrently and then performing some aggregation on the results.
We will build a function that counts the lines in the Wikipedia page of each programming language in the page https://en.wikipedia.org/wiki/List_of_programming_languages.
We could iterate for each programming language and get the number of lines. We can achieve the same result in a serial way with just a for loop or we can leverage the Durable Task Framework (DTFx) parallel capabilities.
As before I will use Python v2 model which is designed to provide a more code-centric way for authoring functions through decorators.
The requirements.txt file:
azure-functions
azure-functions-durable
requests
beautifulsoup4
To scrape the information from the web pages we will use a dedicated module web_scraper.py:
# Web scraper
#
# python web_scraper.py
import requests
from bs4 import BeautifulSoup
def get_programming_language_wiki_pages():
'''
Get programming languages and their wikipedia page
Arguments:
None
Returns:
programming_language_wiki_pages: dict of programming languages and wikipedia pages
'''
#print('Get programming languages and their wikipedia page')
programming_language_wiki_pages = dict()
response = requests.get("https://en.wikipedia.org/wiki/List_of_programming_languages")
content = response.content
soup = BeautifulSoup(content, 'html.parser')
elements = soup.find_all('h2')
for element in elements:
if 'See also' in element.text:
pass
else:
first_li = element.find_next('li')
if first_li:
programming_language = (first_li.text).strip()
anchor = first_li.find('a') # Locate the <a> element within the <li>
if anchor: # Check if an <a> element is found
href = anchor.get('href') # Get the href attribute
programming_language_wiki_pages[programming_language] = href
else:
pass
else:
pass
for li_element in first_li.next_siblings:
if li_element:
programming_language = (li_element.text).strip()
#print(programming_language)
anchor = li_element.find('a') # Locate the <a> element within the <li>
if anchor == -1:
pass
else:
#print(anchor)
if anchor: # Check if an <a> element is found
href = anchor.get('href') # Get the href attribute
programming_language_wiki_pages[programming_language] = "https://en.wikipedia.org" + href
else:
pass
else:
pass
return programming_language_wiki_pages
def get_wiki_page_newline_count(url):
'''
Get wikipedia page newline count
Arguments:
url: target page where to count
Returns:
count_newlines: number of new lines in the page
'''
#print('Get wikipedia page newline count')
count_newlines = 0
try:
response = requests.get(url)
response_text = response.text
if 'Wikipedia does not have an article with this exact name' in response_text:
count_newlines = 0
else:
count_newlines = response_text.count('\n')
except Exception as e:
count_newlines = 0
else:
pass
finally:
return count_newlines
if __name__ == '__main__':
print('Web scraper')
programming_language_wiki_pages = get_programming_language_wiki_pages()
# for programming_language in programming_language_wiki_pages.keys():
# print(programming_language + " | " + programming_language_wiki_pages[programming_language])
for programming_language in list(programming_language_wiki_pages.keys())[:10]:
payload = {"programming_language": programming_language, "wiki_page": programming_language_wiki_pages[programming_language]}
#print(payload)
print(payload['programming_language'])
print(payload['wiki_page'])
# print(get_wiki_page_newline_count("https://en.wikipedia.org/wiki/Python_(programming_language)"))
# print(get_wiki_page_newline_count("https://en.wikipedia.org/wiki/ABC_(programming_language)"))
else:
pass
The function_app.py file:
import azure.functions as func
import azure.durable_functions as df
import web_scraper
myApp = df.DFApp(http_auth_level = func.AuthLevel.ANONYMOUS)
# http://localhost:7071/api/orchestrators/orchestrator_function
# An HTTP-Triggered Function with a Durable Functions Client binding
@myApp.route(route = "orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
function_name = req.route_params.get('functionName')
instance_id = await client.start_new(function_name)
response = client.create_check_status_response(req, instance_id)
return response
# Orchestrator
@myApp.orchestration_trigger(context_name = "context")
def orchestrator_function(context):
programming_language_wiki_pages = yield context.call_activity('get_programming_languages', None)
# Serial implementation
# listdict_results = []
# for programming_language in programming_language_wiki_pages.keys():
# payload = {"programming_language": programming_language, "wiki_page": programming_language_wiki_pages[programming_language]}
# result = yield context.call_activity('get_counts_num_newlines', payload)
# listdict_results.append(result)
# Parallel implementation
tasks = []
#for programming_language in list(programming_language_wiki_pages.keys())[:2]:
for programming_language in programming_language_wiki_pages.keys():
payload = {"programming_language": programming_language, "wiki_page": programming_language_wiki_pages[programming_language]}
tasks.append(context.call_activity('get_counts_num_newlines', payload))
listdict_results = yield context.task_all(tasks)
results = yield context.call_activity('get_dict_from_listdict', listdict_results)
return results
# Activity
@myApp.activity_trigger(input_name = "input")
def get_programming_languages(input: str):
return web_scraper.get_programming_language_wiki_pages()
# Activity
@myApp.activity_trigger(input_name = "payload")
def get_counts_num_newlines(payload: dict):
count_newlines = 0
programming_language = payload['programming_language']
url = payload['wiki_page']
count_newlines = web_scraper.get_wiki_page_newline_count(url)
return {"programming_language": programming_language, "count_newlines": count_newlines}
# Activity
@myApp.activity_trigger(input_name = "listdict")
def get_dict_from_listdict(listdict: list):
trasformed_results = [{d["programming_language"]: d["count_newlines"]} for d in listdict]
dict_results = {key: value for d in trasformed_results for key, value in d.items()}
return dict_results
Once the function is invoked it will use the web scraper module to get a dictionary with the tuples programming language — url wikipedia page. Then it will loop on each tuple. As you can see in the code the differences between serial and parallel versions are minimal.
When starting the Azure Function we will get its http endpoint:
We can use Postman to do a GET request to the function url:
http://localhost:7071/api/orchestrators/orchestrator_function
By clinking on the statusQueryGetUri link we can check the function status:
As you can see the output from the function are the tuples programming language - lines in the Wikipedia page.
The execution times are circa 19 minutes for the serial version and circa 2 minutes for the parallel one.
As you can see the results are the same but the execution time is drastically reduced using the parallel version.
If you are interested in more articles about Azure functions just let me know.
Outro
I hope the story was interesting and thank you for taking the time to read it. On my Blogspot you can find the same post in Italian. Let me know if you have any question and if you like the content that I create feel free to buy me a coffee.