Worker Examples
Complete, copy-pasteable worker examples. Connect the required blocks via edges on the canvas before deploying.
1. Crypto Price Monitor
Connected blocks: Bybit + Monitor
Tracks BTC price, calculates change, displays a live dashboard.
python
def setup(ctx):
ctx.state.set("last_price", None)
ctx.state.set("price_history", [])
ctx.log.info("Crypto Price Monitor started")
def tick(ctx):
import datetime
# Fetch current price
ticker = ctx.bybit.get_tickers("BTCUSDT")
if "error" in ticker:
ctx.log.error(f"Ticker error: {ticker['error']}")
return
price = float(ticker.get("last_price", 0))
last_price = ctx.state.get("last_price")
change_pct = ((price - last_price) / last_price * 100) if last_price else 0
# Update state
ctx.state.set("last_price", price)
history = ctx.state.get("price_history", [])
history.append({"time": str(datetime.datetime.now()), "price": price})
ctx.state.set("price_history", history[-50:]) # keep last 50
# Calculate high/low from history
prices = [h["price"] for h in history]
high = max(prices)
low = min(prices)
# Render dashboard
color = "green" if change_pct >= 0 else "red"
ctx.monitor.render([
ctx.monitor.metric("BTC/USDT", f"${price:,.2f}", color=color, id="price"),
ctx.monitor.metric("Change", f"{change_pct:+.2f}%", color=color),
ctx.monitor.separator(),
ctx.monitor.metric("Session High", f"${high:,.2f}", color="blue"),
ctx.monitor.metric("Session Low", f"${low:,.2f}", color="orange"),
ctx.monitor.separator(),
ctx.monitor.text(f"Tracking since startup | {len(history)} data points"),
])
ctx.log.info(f"BTC ${price:,.2f} ({change_pct:+.2f}%)")
def on_error(ctx, error):
ctx.log.error(f"Monitor error: {error}")2. Trading Bot with Alerts
Connected blocks: Bybit + Telegram + LLM Agent (Claude/Grok) + Monitor
AI-powered trading assistant that monitors positions and sends alerts.
python
def setup(ctx):
ctx.state.set("alerts_sent", 0)
ctx.state.set("last_analysis", "")
ctx.log.info("Trading Bot started")
def tick(ctx):
# Get account balance
balance = ctx.bybit.get_balance("USDT")
if "error" in balance:
ctx.log.error(f"Balance error: {balance['error']}")
return
equity = balance.get("equity", "0")
# Get open positions
positions = ctx.bybit.get_positions()
pos_list = positions.get("list", [])
total_pnl = 0
pos_rows = []
for p in pos_list:
symbol = p.get("symbol", "?")
side = p.get("side", "?")
size = p.get("size", "0")
pnl = float(p.get("unrealisedPnl", 0))
total_pnl += pnl
pos_rows.append([symbol, side, size, f"${pnl:+,.2f}"])
# AI analysis every 10 ticks
tick_count = ctx.state.get("tick_count", 0) + 1
ctx.state.set("tick_count", tick_count)
if tick_count % 10 == 0 and pos_list:
analysis = ctx.llm.ask(
f"Current positions: {pos_list}. Equity: {equity} USDT. "
f"Total unrealized PnL: ${total_pnl:+,.2f}. "
f"Should I hold, take profit, or cut losses? Be brief (2-3 sentences).",
system="You are a crypto trading advisor. Be concise and data-driven.",
temperature=0.3
)
ctx.state.set("last_analysis", analysis)
ctx.log.info(f"AI: {analysis}")
# Alert on big PnL changes
if abs(total_pnl) > 100:
alerts = ctx.state.get("alerts_sent", 0)
emoji = "🟢" if total_pnl > 0 else "🔴"
ctx.telegram.send(
f"{emoji} <b>PnL Alert</b>\n"
f"Total PnL: <code>${total_pnl:+,.2f}</code>\n"
f"Equity: <code>{equity} USDT</code>\n"
f"Positions: {len(pos_list)}",
parse_mode="HTML"
)
ctx.state.set("alerts_sent", alerts + 1)
# Render dashboard
pnl_color = "green" if total_pnl >= 0 else "red"
analysis_text = ctx.state.get("last_analysis", "Waiting for analysis...")
ctx.monitor.render([
ctx.monitor.metric("Equity", f"{equity} USDT", color="blue"),
ctx.monitor.metric("Unrealized PnL", f"${total_pnl:+,.2f}", color=pnl_color),
ctx.monitor.metric("Positions", str(len(pos_list)), color="gray"),
ctx.monitor.separator(),
ctx.monitor.table(
headers=["Symbol", "Side", "Size", "PnL"],
rows=pos_rows if pos_rows else [["--", "--", "--", "--"]]
),
ctx.monitor.separator(),
ctx.monitor.text(f"AI: {analysis_text}"),
])
def on_error(ctx, error):
ctx.log.error(f"Bot error: {error}")
ctx.telegram.send(f"⚠️ Trading bot error: {error}")3. News Feed Aggregator
Connected blocks: File Explorer + Monitor
Fetches external data via HTTP, stores to files, and displays on a dashboard.
python
def setup(ctx):
ctx.state.set("fetch_count", 0)
ctx.log.info("News Feed Aggregator started")
def tick(ctx):
import json
import datetime
# Fetch crypto fear & greed index
resp = ctx.http.get("https://api.alternative.me/fng/?limit=1")
body = resp.get("body", {})
if isinstance(body, str):
body = json.loads(body)
fng_data = body.get("data", [{}])[0]
fng_value = int(fng_data.get("value", 50))
fng_label = fng_data.get("value_classification", "Neutral")
# Fetch BTC price from CoinGecko
price_resp = ctx.http.get(
"https://api.coingecko.com/api/v3/simple/price"
"?ids=bitcoin,ethereum&vs_currencies=usd&include_24hr_change=true"
)
price_body = price_resp.get("body", {})
if isinstance(price_body, str):
price_body = json.loads(price_body)
btc_price = price_body.get("bitcoin", {}).get("usd", 0)
btc_change = price_body.get("bitcoin", {}).get("usd_24h_change", 0)
eth_price = price_body.get("ethereum", {}).get("usd", 0)
eth_change = price_body.get("ethereum", {}).get("usd_24h_change", 0)
# Save to file
record = {
"timestamp": str(datetime.datetime.now()),
"fng": fng_value,
"btc": btc_price,
"eth": eth_price,
}
ctx.files.write(
f"/data/snapshot_{datetime.date.today()}.json",
json.dumps(record, indent=2)
)
count = ctx.state.get("fetch_count", 0) + 1
ctx.state.set("fetch_count", count)
# Determine fear/greed color
if fng_value >= 70:
fng_color = "green"
elif fng_value >= 40:
fng_color = "gold"
else:
fng_color = "red"
# Render dashboard
ctx.monitor.render([
ctx.monitor.metric("Fear & Greed", f"{fng_value} ({fng_label})",
color=fng_color),
ctx.monitor.separator(),
ctx.monitor.metric("BTC", f"${btc_price:,.0f}",
color="green" if btc_change >= 0 else "red",
subtitle=f"{btc_change:+.1f}%"),
ctx.monitor.metric("ETH", f"${eth_price:,.0f}",
color="green" if eth_change >= 0 else "red",
subtitle=f"{eth_change:+.1f}%"),
ctx.monitor.separator(),
ctx.monitor.progress("Fear → Greed", fng_value / 100, color=fng_color),
ctx.monitor.text(f"Fetches: {count} | Data saved to /data/"),
])
ctx.log.info(f"F&G: {fng_value} | BTC: ${btc_price:,} | ETH: ${eth_price:,}")
def on_error(ctx, error):
ctx.log.error(f"Aggregator error: {error}")