Moscon.org

Cryptofeed

27 January 2019
Roughly 13 months ago I created a library in python designed to interface with websocket enabled cryptocurrency exchanges and collect realtime market data. I started small with Coinbase (then called GDAX) and a handful of other exchanges and supported basic data feeds (book and trade primarily) with data delivery via user supplied callbacks. Since then the project has grown quite substantially into a nearly full featured library for collecting a wide array of data and allowing the user to store it or transmit it elsewhere. If you've ever thought you might want to collect some crypto market data, look no further than Cryptofeed!

Its quite easy to get up and running with a simple collector. There are several examples here, but I'll run through a few of the key ones.


from cryptofeed.callback import TradeCallback
from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase
from cryptofeed.defines import TRADES


async def trade(feed, pair, order_id, timestamp, side, amount, price):
    print("Timestamp: {} Feed: {} Pair: {} ID: {} Side: {} Amount: {} Price: {}".format(timestamp, feed, pair, order_id, side, amount, price))


def main():
    f = FeedHandler()

    f.add_feed(Coinbase(pairs=['BTC-USD'], channels=[TRADES], callbacks={TRADES: TradeCallback(trade)}))

    f.run()


if __name__ == '__main__':
    main()



The FeedHandler object is the main object in Cryptofeed. Its primary purpose is to connect to different exchanges and handle the information from their data feeds. It expects a few pieces of information:
  • pairs: a list of trading pairs. Cryptofeed uses standardized names that will be converted to exchange specific trading pairs as appropriate
  • channels: a list of data channels. Again these are standardized
  • callbacks: a dictionary of callbacks to handle the data received on the configured channels. In this example we're supplying custom code to handle the data. It only prints, but it could do much more.
Once all exchanges have been registered, you can start the feed handler with run. This will create an event loop with asyncio and start the various internal tasks. This means that the program will essentially block here until it is killed, so if you need to do other things after the run you'll want to create other subprocesses.

Because Cryptofeed uses asyncio, its a single threaded application that can do other work while blocked on I/O. This means user defined callbacks must be lightweight, or make use of asyncio themselves. To reduce development time and make the library as easy to use as possible, I've provided many callbacks in a package that you can use "off the shelf" - backends. These are a set of callbacks that will write the data directly to databases, network sockets, or other transports (like ZMQ). An example of a simple one writing to redis is given below:


from cryptofeed.backends.redis import TradeRedis
from cryptofeed.callback import TradeCallback
from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase
from cryptofeed.defines import TRADES


f = FeedHandler()
f.add_feed(Coinbase(channels=[TRADES], pairs=['BTC-USD'], callbacks={TRADES: TradeRedis()}))
f.run()


The code is very similar to the previous example, except now no custom callback needs to be defined. TradeRedis is set up to write trade data to redis. There are some optional keyword args you can provide to control how and where the data is written:
  • host: the ip/host of the redis server. Defaults to localhost
  • port: redis server port, defaults to redis's default
  • key: the key prefix used to store the data. By default the key is the data type (in this case, trades). The complete key is key-feed-pair, so the key in this example would be trades-coinbase-btc-usd

The data is stored as a timeseries data in a sorted set in redis (the ZSET).
Other backends provide support for writing to the Arctic and InfluxDB databases, TCP, UDP and UDS sockets and ZMQ, and more are planned for other database and transport types.

Cryptofeed also has some support for REST operations on the exchanges for collecting historical data and placing trades.

Its a work in progress, but hopefully it might be useful to anyone interested in collecting and analyzing cryptocurrency data.
Python
Cryptocurrency
MarketData

Watchdog Microservice in Go

29 July 2017
I've been an off and on Go user for several years but have recently started spending a lot more time using it for microservices. A common service (micro or otherwise) always seems to be a 'watchdog' type that tracks the health of a system, process, platform, etc and does something when it detects a problem. I've worked on these sorts of things (usually not in microservice format) at many places I've worked. I recently wrote one in Go that is extremely simplistic, but is a great foundation for a project that could evolve into something much more fully featured. At it's heart, the watchdog is nothing more than a very simple HTTP handler:


func handler(w http.ResponseWriter, r *http.Request) {
	service := r.URL.Query().Get("id")
	log.Printf("Got data from service %s", service)
	if service != "" {
		services[service] = time.Now()
	} else {
		http.Error(w, "invalid data", http.StatusBadRequest)
	}
}

It keeps track of heartbeats from other services on the network. When it receives a heartbeat, it stores a timestamp. It will periodically check its map of timestamps and take an action if any of them are older than a threshold:


func watcher(ticker *time.Ticker) {
	for {
		<-ticker.C
		for name, timestamp := range services {
			if time.Now().Sub(timestamp).Seconds() > 10 {
				delete(services, name)
				log.Printf("Service %s died - restarting", name)
				go restart(name)
			}
		}
	}
}

The watcher is setup to be run off a ticker that ticks every 5 seconds:


ticker := time.NewTicker(5 * time.Second)
go watcher(ticker)

Every 5 seconds a message is sent on the channel on the ticker, unblocking the infinite loop in the watcher. If any of the services in the map haven't sent watchdog a heartbeat within 10 seconds, the code in restart is called. I've chosen to simply restart the dead service. Any action could be taken (message a Slack channel, email someone, restart selectively, etc), and those could be areas to improve and expand the code.


func restart(name string) {
	cmd := exec.Command(name)
	err := cmd.Start()

	if err != nil {
		log.Printf("Error starting service %s: %v", name, err)
	}
}

I've set up a small program that will ping the watchdog and exit, meaning every 10-15 seconds the watchdog will restart the exited process.



The full project/code is here
Golang
Go
Microservices
Watchdog

Mass MOCA

25 February 2017
Mass MOCA
Photo
Mass MOCA
Nick Cave

Winter in Vermont

17 February 2017
Vermont
Photo
Vermont

Well Drilling

23 October 2016
Well Drilling
Photo
Well
Older Posts