Problem
What is Ray?
The Basics
import ray
ray.init(num_cpus=4)
Ray Tasks
# https://docs.ray.io/en/master/ray-overview/index.html
@ray.remote
def square(x):
return x * x
futures = [square.remote(i) for i in range(4)]
print(futures)
print(ray.get(futures))
import time
def expensive_function():
time.sleep(3)
return
start = time.time()
long_call = [expensive_function() for i in range(3)]
end = time.time()
print(f"long_call took {end-start} seconds to complete")
@ray.remote
def expensive_function():
time.sleep(3)
return
start = time.time()
futures = [expensive_function.remote() for i in range(3)]
end = time.time()
print(f"long_call took {end-start} seconds to complete")
start = time.time()
futures = [expensive_function.remote() for i in range(3)]
ray.get(futures)
end = time.time()
print(f"long_call took {end-start} seconds to complete")
@ray.remote
def extend_fib(array):
time.sleep(1)
array.append(array[-1]+array[-2])
return array
def fibbu(n):
fib = [1,1]
for _ in range(2, n):
print(fib)
fib = ray.get(extend_fib.remote(fib))
return fib[-1]
start = time.time()
fibbu(7)
end = time.time()
print(f"long_call took {end-start} seconds to complete")
Sequential Code
import random
iterations = 1000000
def check_point():
rand_x= random.uniform(-1, 1)
rand_y= random.uniform(-1, 1)
pyth= rand_x**2 + rand_y**2
if pyth<= 1:
return 1
return 0
def compute_pi(iterations):
circle_points = [check_point() for i in range(iterations)]
return 4*sum(circle_points)/iterations
start = time.time()
pi = compute_pi(iterations)
end = time.time()
print(f"sequential took {end-start} seconds to complete")
print("Final Estimation of Pi=", pi)
Parallelized with Ray
import random
iterations = 1000000
workers = 4
@ray.remote
def check_point():
guesses = []
for i in range(iterations//workers):
rand_x= random.uniform(-1, 1)
rand_y= random.uniform(-1, 1)
pyth= rand_x**2 + rand_y**2
if pyth<= 1:
guesses.append(1)
else:
guesses.append(0)
pi = sum(guesses)/len(guesses)*4
print(pi)
return pi
def compute_pi(iterations):
circle_points = [check_point.remote() for i in range(4)]
return sum((ray.get(circle_points)))/len(ray.get(circle_points))
start = time.time()
pi = compute_pi(iterations)
end = time.time()
print(f"Ray took {end-start} seconds to complete")
print("Final Estimation of Pi=", pi)
Ray Actors
@ray.remote
class Counter(object):
def __init__(self):
self.n = 0
def increment(self):
self.n += 1
def read(self):
return self.n
counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures))
Real World Example
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.options import Options
import pandas as pd
import datefinder
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chromedrive = Chrome(options=chrome_options) # This parameter would be the path to your chromedriver if you are running this locally
chromedrive.implicitly_wait(3)
@ray.remote
def scrape_wiki(url):
driver = Chrome(options=chrome_options) # This parameter would be the path to your chromedriver if you are running this locally
driver.implicitly_wait(3)
driver.get(url)
try:
title = driver.find_element_by_xpath("//h1[@id='firstHeading']").text
description = driver.find_element_by_xpath('//div[@id = "mw-content-text"]').text
num_links = len(driver.find_elements_by_xpath('//a[@href]'))
num_images = len(driver.find_elements_by_xpath('//img'))
last_edited_text = driver.find_element_by_xpath('//li[@id="footer-info-lastmod"]').text
last_edited = list(datefinder.find_dates(last_edited_text))[0] #we will use the first element
data = {"title":title,"description":description,"num_links":num_links,"num_images":num_images,"last_edited":last_edited}
return data
except NoSuchElementException or IndexError:
return {"title":"","description":"","num_links":"","num_images":"","last_edited":""}
chromedrive.get('https://en.wikipedia.org/wiki/Golden_Raspberry_Awards')
wiki_links = [link.get_attribute('href') for link in chromedrive.find_elements_by_xpath('//a[@href]') if "en.wikipedia.org/wiki" in link.get_attribute('href') and "Raspberry" not in link.get_attribute('href')]
d = [scrape_wiki.remote(link) for link in wiki_links[:15]]
print(wiki_links)
d = ray.get(d)
print(d)
df = pd.DataFrame(d)
Modin Magic
import pandas as pd # old way
import modin.pandas as pd # With ray!