# Install Prefect first
!pip install prefect &> /dev/null
!pip install prefect[viz] &> /dev/null # This is optional but required if you want to output a flow diagram here
# gonna put these here to keep the rest clear
from prefect import task, Flow, Parameter, case, Client
import os # for env variables
import requests
import json
import pandas as pd
@task(name='Connect to Trello')
def trello_connect(TRELLO_ORG_ID, TRELLO_API_KEY, TRELLO_TOKEN):
url_org = "https://api.trello.com/1/organizations/"+TRELLO_ORG_ID
querystring = {"key": TRELLO_API_KEY,"token": TRELLO_TOKEN}
response_org = requests.request("GET", url_org, params=querystring)
data_org = json.loads(response_org.text)
org_name = data_org['name']
print(f'Got data for org name: {org_name}')
return {'TRELLO_ORG_ID': TRELLO_ORG_ID, 'querystring': querystring}
@task(name='Get Trello boards')
def get_boards(client):
TRELLO_ORG_ID = client.get('TRELLO_ORG_ID')
querystring = client.get('querystring')
#Get a list of boards based on the organization
url_boards = "https://api.trello.com/1/organizations/"+TRELLO_ORG_ID+"/boards?lists=all"
response_boards = requests.request("GET", url_boards, params=querystring)
# JSON to Dataframe
data_boards = json.loads(response_boards.text)
data_boards = pd.json_normalize(data_boards)
# Format dates
data_boards['dateLastActivity'] = pd.to_datetime(data_boards['dateLastActivity'], format='%Y-%m-%dT%H:%M:%S.%fZ')
data_dict = {"df": data_boards, "path": '/work/files/', "name": 'trello_boards'}
return data_dict
# Self explaintatory
@task(name='Save to csv')
def save_csv(data_dict):
df = data_dict.get('df')
path = data_dict.get('path')
name = data_dict.get('name')
save_to_csv = df.to_csv(path+name+'.csv', index=False) #save to csv
@task(name='Pickle it')
def pickle_it(data_dict):
df = data_dict.get('df')
path = data_dict.get('path')
name = data_dict.get('name')
pickle_it = df.to_pickle(path+name+'.pkl')
def trello_flow():
with Flow("trello-demo-flow") as flow:
client = trello_connect(os.environ['TRELLO_ORG_ID'], os.environ['TRELLO_API_KEY'], os.environ['TRELLO_TOKEN'])
data_boards = get_boards(client) # Get Trello boards
save_csv(data_boards) # Save the board data to a csv
pickle_it(data_boards) # Pickle the board data
return flow
# Set the flow
flow = trello_flow()
# Run the flow
flow.run()
[2021-08-17 10:01:31+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'trello-flow'
[2021-08-17 10:01:31+0000] INFO - prefect.TaskRunner | Task 'Connect to Trello': Starting task run...
Got data for org name: awardforce
[2021-08-17 10:01:31+0000] INFO - prefect.TaskRunner | Task 'Connect to Trello': Finished task run for task with final state: 'Success'
[2021-08-17 10:01:31+0000] INFO - prefect.TaskRunner | Task 'Get Trello boards': Starting task run...
[2021-08-17 10:01:33+0000] INFO - prefect.TaskRunner | Task 'Get Trello boards': Finished task run for task with final state: 'Success'
[2021-08-17 10:01:33+0000] INFO - prefect.TaskRunner | Task 'Save to csv': Starting task run...
[2021-08-17 10:01:33+0000] INFO - prefect.TaskRunner | Task 'Save to csv': Finished task run for task with final state: 'Success'
[2021-08-17 10:01:33+0000] INFO - prefect.TaskRunner | Task 'Pickle it': Starting task run...
[2021-08-17 10:01:33+0000] INFO - prefect.TaskRunner | Task 'Pickle it': Finished task run for task with final state: 'Success'
[2021-08-17 10:01:33+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
client = Client(api_key=os.environ["PREFECT_API_KEY"])
## Putting these here fore the example
from prefect.storage import S3
from prefect.run_configs import LocalRun
from prefect.executors import LocalExecutor
# I want my flow to store it's logs in a S3 bucket
flow.storage = S3(bucket="deepnote-logs", client_options={"aws_access_key_id": os.environ["AWS_ACCESS_KEY"], "aws_secret_access_key": os.environ["AWS_SECRET_KEY"]})
# I think this is optional, but I set it anyways. The exectutor will be "local"
flow.executor = LocalExecutor()
# I also think this is optional/default but I set it anyways. See documentation for more info
flow.run_config = LocalRun()
#Register the flow to your project
flow_id = flow.register(project_name="Deepnote")
[2021-08-17 10:18:10+0000] INFO - prefect.S3 | Uploading trello-flow/2021-08-17t10-17-56-738742-00-00 to deepnote-logs
/root/venv/lib/python3.7/site-packages/prefect/client/client.py:1105: UserWarning: A flow with the same name is already contained in storage; if you changed your Flow since the last build, you might experience unexpected issues and should re-create your storage object.
serialized_flow = flow.serialize(build=build) # type: Any
Flow URL: https://cloud.prefect.io/creative-force/flow/c0e07d88-58f1-4302-bca4-e79e12a99436
└── ID: f9f6ece1-c3a0-43b3-bfe3-008cb00837a4
└── Project: Deepnote
└── Labels: []
flow_run_id = client.create_flow_run(flow_id, labels=['deepnote')
# Just double check it was a success
flow_run_id
import subprocess
# Pass in the flow ID and a label (matching the one previously) for the agent
def run_prefect_agent(flow_run_id, label):
# You need to login to prefect again here
!prefect auth login --key {os.environ["PREFECT_API_KEY"]}
proc = subprocess.Popen(f"prefect agent local start --label {label}", shell=True, stdout=subprocess.PIPE)
while proc.poll() is None:
output = proc.stdout.readline()
print(output)
if f'Completed deployment of flow run {flow_run_id}' in str(output):
print("Flow finished running")
break
proc.kill()
run_prefect_agent(flow_run_id, 'deepnote')
Logged in to Prefect Cloud tenant 'Creative Force' (creative-force)
b'[2021-08-17 10:18:38,494] INFO - agent | Registering agent...\n'
b'[2021-08-17 10:18:38,585] INFO - agent | Registration successful!\n'
b'\n'
b' ____ __ _ _ _\n'
b'| _ \\ _ __ ___ / _| ___ ___| |_ / \\ __ _ ___ _ __ | |_\n'
b"| |_) | '__/ _ \\ |_ / _ \\/ __| __| / _ \\ / _` |/ _ \\ '_ \\| __|\n"
b'| __/| | | __/ _| __/ (__| |_ / ___ \\ (_| | __/ | | | |_\n'
b'|_| |_| \\___|_| \\___|\\___|\\__| /_/ \\_\\__, |\\___|_| |_|\\__|\n'
b' |___/\n'
b'\n'
b"[2021-08-17 10:18:38,654] INFO - agent | Starting LocalAgent with labels ['deepnote', 'p-6de12cd1-a606-41af-bba1-4ff4517d72c9']\n"
b'[2021-08-17 10:18:38,654] INFO - agent | Agent documentation can be found at https://docs.prefect.io/orchestration/\n'
b'[2021-08-17 10:18:38,654] INFO - agent | Waiting for flow runs...\n'
import graphviz
def save_flow_diagram(flow, path, name):
# Generate flow diagram as a .gv and .png to path
flow.visualize().save(f'{path}{name}_diagram.gv')
diag = graphviz.render('dot', 'png', f'{path}{name}_diagram.gv')
# Serialize the flow and save to path
flow_json = flow.serialize()
with open(f'{path}{name}_serialized.json', 'w') as fp:
json.dump(flow_json, fp)
save_flow_diagram(flow, '/work/files/', 'trello_boards')
# Or just do this
flow.visualize()