Real-Time AI without the fuss.
Kaskada is a next-generation streaming engine that connects AI models to real-time & historical data.
Kaskada completes the Real-Time AI stack, providing…
Real-time Aggregation
Precompute model inputs from streaming data with robust data connectors, transformations & aggregations.
Event Detection
Trigger pro-active AI behaviors by identifying important activities, as they happen.
History Replay
Backtest and fine-tune from historical data using per-example time travel and point-in-time joins.
Real-time AI in minutes
Connect and compute over databases, streaming data, and data loaded dynamically using Python.. Kaskada is seamlessly integrated with Python’s ecosystem of AI/ML tooling so you can load data, process it, train and serve models all in the same place.
There’s no infrastructure to provision (and no JVM hiding under the covers), so you can jump right in - check out the Quick Start.
Built for scale and reliability
Implemented in Rust using Apache Arrow, Kaskada’s compute engine uses columnar data to efficiently execute large historic and high-throughput streaming queries. Every operation in Kaskada is implemented incrementally, allowing automatic recovery if the process is terminated or killed.
With Kaskada, most jobs are fast enough to run locally, so it’s easy to build and test your real-time queries. As your needs grow, Kaskada’s cloud-native design and support for partitioned execution gives you the volume and throughput you need to scale. Kaskada was built by core contributors to Apache Beam, Google Cloud Dataflow, and Apache Cassandra, and is under active development
Example Real-Time App: BeepGPT
BeepGPT keeps you in the loop without disturbing your focus. Its personalized, intelligent AI continuously monitors your Slack workspace, alerting you to important conversations and freeing you to concentrate on what’s most important.
The core of BeepGPT’s real-time processing requires only a few lines of code using Kaskada:
import asyncio
import kaskada as kd
kd.init_session()
# Bootstrap from historical data
= await kd.sources.PyDict.create(
messages = pyarrow.parquet.read_table("./messages.parquet")
rows
.to_pylist(),= "ts",
time_column = "channel",
key_column
)
# Send each Slack message to Kaskada
def handle_message(client, req):
"event"])
messages.add_rows(req.payload[
slack.socket_mode_request_listeners.append(handle_message)connect()
slack.
# Aggregate multiple messages into a "conversation"
= ( messages
conversations "user", "text")
.select(max=20)
.collect(
)
# Handle each conversation as it occurs
async for row in conversations.run_iter(mode='live'):
# Use a pre-trained model to identify interested users
= "\n\n".join([f'{msg["user"]} --> {msg["text"]}' for msg in row["result"]])
prompt = openai.Completion.create(
res ="davinci:ft-personal:coversation-users-full-kaskada-2023-08-05-14-25-30",
model=prompt + "\n\n###\n\n",
prompt=5,
logprobs=1,
max_tokens=" end",
stop=0.25,
temperature
)
# Notify interested users using the Slack API
for user_id in interested_users(res):
notify_user(row, user_id)
For more details, check out the BeepGPT Github project.
Get Started
Getting started with Kaskda is a pip install kaskada
away. Check out the Quick Start now!