25 June 2019
I previously wrote about my cryptocurrency exchange library called Cryptofeed. As a library it requires the end user to implement software to make use of the data (store it, feed it to an algorithm, etc). I had my own unreleased software that used cryptofeed to store data directly to Arctic, a database library built on top of MongoDB. Over a period of several months, many users of cryptofeed asked me questions about how one could utilize cryptofeed to store data to Arctic, so I decided to clean up my code and release it on Github - and so Cryptostore was born!

Cryptostore abstracts away a lot of work involved with collecting data from cryptofeed - users can configure everything in a yaml file. Currently data can be collected/aggregated via kafka or redis, and from there the data can be stored into cloud storage (Google Cloud Storage or AWS S3, in parquet format), local storage, or an Arctic store. User supplied plugins are supported as well - an example is provided that backfills missing trade data. This allows anyone to quickly and easily provide extra functionality without needing to either understand Cryptostore or make significant modifications to its core. I've been running this code on my own servers for just over a year, so this is the end result of a lot of testing and fixing! Its able to handle full book data from multiple exchanges with ease and its reasonably resilient and able to recover from the hiccups many exchanges experience periodically. If you encounter any issues or have any improvements or questions feel free to open an issue or a PR!


24 June 2019


27 January 2019
Roughly 13 months ago I created a library in python designed to interface with websocket enabled cryptocurrency exchanges and collect realtime market data. I started small with Coinbase (then called GDAX) and a handful of other exchanges and supported basic data feeds (book and trade primarily) with data delivery via user supplied callbacks. Since then the project has grown quite substantially into a nearly full featured library for collecting a wide array of data and allowing the user to store it or transmit it elsewhere. If you've ever thought you might want to collect some crypto market data, look no further than Cryptofeed!

Its quite easy to get up and running with a simple collector. There are several examples here, but I'll run through a few of the key ones.

from cryptofeed.callback import TradeCallback
from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase
from cryptofeed.defines import TRADES

async def trade(feed, pair, order_id, timestamp, side, amount, price):
    print("Timestamp: {} Feed: {} Pair: {} ID: {} Side: {} Amount: {} Price: {}".format(timestamp, feed, pair, order_id, side, amount, price))

def main():
    f = FeedHandler()

    f.add_feed(Coinbase(pairs=['BTC-USD'], channels=[TRADES], callbacks={TRADES: TradeCallback(trade)}))

if __name__ == '__main__':

The FeedHandler object is the main object in Cryptofeed. Its primary purpose is to connect to different exchanges and handle the information from their data feeds. It expects a few pieces of information:
  • pairs: a list of trading pairs. Cryptofeed uses standardized names that will be converted to exchange specific trading pairs as appropriate
  • channels: a list of data channels. Again these are standardized
  • callbacks: a dictionary of callbacks to handle the data received on the configured channels. In this example we're supplying custom code to handle the data. It only prints, but it could do much more.
Once all exchanges have been registered, you can start the feed handler with run. This will create an event loop with asyncio and start the various internal tasks. This means that the program will essentially block here until it is killed, so if you need to do other things after the run you'll want to create other subprocesses.

Because Cryptofeed uses asyncio, its a single threaded application that can do other work while blocked on I/O. This means user defined callbacks must be lightweight, or make use of asyncio themselves. To reduce development time and make the library as easy to use as possible, I've provided many callbacks in a package that you can use "off the shelf" - backends. These are a set of callbacks that will write the data directly to databases, network sockets, or other transports (like ZMQ). An example of a simple one writing to redis is given below:

from cryptofeed.backends.redis import TradeRedis
from cryptofeed.callback import TradeCallback
from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase
from cryptofeed.defines import TRADES

f = FeedHandler()
f.add_feed(Coinbase(channels=[TRADES], pairs=['BTC-USD'], callbacks={TRADES: TradeRedis()}))

The code is very similar to the previous example, except now no custom callback needs to be defined. TradeRedis is set up to write trade data to redis. There are some optional keyword args you can provide to control how and where the data is written:
  • host: the ip/host of the redis server. Defaults to localhost
  • port: redis server port, defaults to redis's default
  • key: the key prefix used to store the data. By default the key is the data type (in this case, trades). The complete key is key-feed-pair, so the key in this example would be trades-coinbase-btc-usd

The data is stored as a timeseries data in a sorted set in redis (the ZSET).
Other backends provide support for writing to the Arctic and InfluxDB databases, TCP, UDP and UDS sockets and ZMQ, and more are planned for other database and transport types.

Cryptofeed also has some support for REST operations on the exchanges for collecting historical data and placing trades.

Its a work in progress, but hopefully it might be useful to anyone interested in collecting and analyzing cryptocurrency data.

Watchdog Microservice in Go

29 July 2017
I've been an off and on Go user for several years but have recently started spending a lot more time using it for microservices. A common service (micro or otherwise) always seems to be a 'watchdog' type that tracks the health of a system, process, platform, etc and does something when it detects a problem. I've worked on these sorts of things (usually not in microservice format) at many places I've worked. I recently wrote one in Go that is extremely simplistic, but is a great foundation for a project that could evolve into something much more fully featured. At it's heart, the watchdog is nothing more than a very simple HTTP handler:

func handler(w http.ResponseWriter, r *http.Request) {
	service := r.URL.Query().Get("id")
	log.Printf("Got data from service %s", service)
	if service != "" {
		services[service] = time.Now()
	} else {
		http.Error(w, "invalid data", http.StatusBadRequest)

It keeps track of heartbeats from other services on the network. When it receives a heartbeat, it stores a timestamp. It will periodically check its map of timestamps and take an action if any of them are older than a threshold:

func watcher(ticker *time.Ticker) {
	for {
		for name, timestamp := range services {
			if time.Now().Sub(timestamp).Seconds() > 10 {
				delete(services, name)
				log.Printf("Service %s died - restarting", name)
				go restart(name)

The watcher is setup to be run off a ticker that ticks every 5 seconds:

ticker := time.NewTicker(5 * time.Second)
go watcher(ticker)

Every 5 seconds a message is sent on the channel on the ticker, unblocking the infinite loop in the watcher. If any of the services in the map haven't sent watchdog a heartbeat within 10 seconds, the code in restart is called. I've chosen to simply restart the dead service. Any action could be taken (message a Slack channel, email someone, restart selectively, etc), and those could be areas to improve and expand the code.

func restart(name string) {
	cmd := exec.Command(name)
	err := cmd.Start()

	if err != nil {
		log.Printf("Error starting service %s: %v", name, err)

I've set up a small program that will ping the watchdog and exit, meaning every 10-15 seconds the watchdog will restart the exited process.

The full project/code is here


25 February 2017
Nick Cave
Older Posts