Reddit Live Example
In this example, we’ll show how you can receive and process Reddit comments using Kaskada.
You can see the full example in the file reddit.py.
Setup Reddit credentials
Follow Reddit’s First Steps guide to create an App and obtain a client ID and secret. The “script” type application is sufficient for this example.
Setup the event data source
Before we can receive events from Reddit, we need to create a data source to tell Kaskada how to handle the events. We’ll provide a schema and configure the time and entity fields.
# Setup the data source.
# This defintes (most of) the schema of the events we'll receive,
# and tells Kaskada which fields to use for time and initial entity.
#
# We'll push events into this source as they arrive in real-time.
comments = kd.sources.PyDict(
schema=pa.schema(
[
pa.field("author", pa.string()),
pa.field("body", pa.string()),
pa.field("permalink", pa.string()),
pa.field("submission_id", pa.string()),
pa.field("subreddit", pa.string()),
pa.field("ts", pa.float64()),
]
),
time_column="ts",
key_column="submission_id",
time_unit="s",
)
Define the incoming event handler
The asyncpraw
python library takes care of requesting and receiving events from Reddit, all you need to do is create a handler to configure what to do with each event. This handler converts Comment messages into a dict, and passes the dict to Kaskada.
# Handler to receive new comments as they're created
async def receive_comments():
# Creat the subreddit handle
sr = await reddit.subreddit(os.getenv("SUBREDDIT", "all"))
# Consume the stream of new comments
async for comment in sr.stream.comments():
# Add each comment to the Kaskada data source
await comments.add_rows(
{
"author": comment.author.name,
"body": comment.body,
"permalink": comment.permalink,
"submission_id": comment.submission.id,
"subreddit_id": comment.subreddit.display_name,
"ts": time.time(),
}
)
Construct a real-time query and result handler
Now we can use Kaskada to transform the events as they arrive. First we’ll use with_key
to regroup events by author, then we’ll apply a simple count
aggregation. Finally, we create a handler for the transformed results - here just printing them out.
# Handler for values emitted by Kaskada.
async def receive_outputs():
# We'll perform a very simple aggregation - key by author and count.
comments_by_author = comments.with_key(comments.col("author"))
# Consume outputs as they're generated and print to STDOUT.
async for row in comments_by_author.count().run_iter(kind="row", mode="live"):
print(f"{row['_key']} has posted {row['result']} times since startup")
Final touches
Now we just need to kick it all off by calling asyncio.gather
on the two handler coroutines. This kicks off all the async processing.
# Kickoff the two async processes concurrently.
await asyncio.gather(receive_comments(), receive_outputs())
Try running it yourself and playing different transformations!
python reddit.py