franz-kafka

v0.7.0
Kafka Client http://incubator.apache.org/kafka/
kafka awesome

franz-kafka

A node client for Kafka

Example

var Kafka = require('franz-kafka')

var kafka = new Kafka({
    zookeeper: 'localhost:2181',
    compression: 'gzip',
    queueTime: 2000,
    batchSize: 200,
    logger: console
})

kafka.on('connect', function () {

    // topics are Streams
    var foo = kafka.topic('foo')
    var bar = kafka.topic('bar')

    // consume with a pipe
    foo.pipe(process.stdout)

    // or with the 'data' event
    foo.on('data', function (data) { console.log(data) })

    // produce with a pipe
    process.stdin.pipe(bar)

    // or just write to it
    bar.write('this is a message')

    // resume your consumer to get it started
    foo.resume()

    // don't forget to handle errors
    foo.on('error', function (err) { console.error("STAY CALM") })

    }
)

To test the example first get kafka running. Follow steps 1 and 2 of the quick start guide

Then you can run node example.js to see messages getting produced and consumed.


API

Kafka

new

var Kafka = require('franz-kafka')

var kafka = new Kafka({
    brokers: [{              // an array of broker connection info
        id: 0                  // the server's broker id
        host: 'localhost',
        port: 9092
    }],

    // producer defaults
    compression: 'none',     // default compression for producing
    maxMessageSize: 1000000, // limits the size of a produced message
    queueTime: 5000,         // milliseconds to buffer batches of messages before producing
    batchSize: 200,          // number of messages to bundle before producing

    // consumer defaults
    minFetchDelay: 0,        // minimum milliseconds to wait between fetches
    maxFetchDelay: 10000,    // maximum milliseconds to wait between fetches
    maxFetchSize: 300*1024,  // limits the size of a fetched message

    logger: null             // a logger that implements global.console (for debugging)
})
brokers

An array of connection info of all the brokers this client can communicate with

compression

The compression used when producing to kafka. May be, 'none', 'gzip', or 'snappy'

maxMessageSize

The largest size of a message produced to kafka. If a message exceeds this size, the Topic will emit an 'error'. Note that batchSize affects the size of messages because batches of messages are bundled as individual messages.

queueTime

The time to buffer messages for bundling before producing to kafka. This option is combined with batchSize. Whichever comes first will trigger a produce.

batchSize

The number of messages to bundle before producing to kafka. This option is combined with queueTime. Whichever comes first will trigger a produce.

minFetchDelay

The minimum time to wait between fetch requests to kafka. When a fetch returns zero messages the client will begin exponential backoff between requests up to maxFetchDelay until messages are available.

maxFetchDelay

The maximum time to wait between fetch requests to kafka after exponential backoff has begun.

maxFetchSize

The maximum size of a fetched message. If a fetched message is larger than this size the Topic will emit an 'error' event.

connect

Connects to the Kafka cluster and runs the callback once connected.

kafka.connect(function () {
    console.log('connected')
    //...
})

topic

Get a Topic for consuming or producing. The first argument is the topic name and the second are the topic options.

var foo = kafka.topic('foo', {
    // default options
    minFetchDelay: 0,      // defaults to the kafka.minFetchDelay
    maxFetchDelay: 10000,  // defaults to the kafka.maxFetchDelay
    maxFetchSize: 1000000, // defaults to the kafka.maxFetchSize
    compression: 'none',   // defaults to the kafka.compression
    batchSize: 200,        // defaults to the kafka.batchSize
    queueTime: 5000,       // defaults to the kafka.queueTime
    partitions: {
        consume: ['0-0:0']   // array of strings with the form 'brokerId-partitionId:startOffset'
        produce: ['0:1']     // array of strings with the form 'brokerId:partitionCount'
    }
})
partitions

This structure describes which brokers and partitions the client will connect to for producing and consuming.

####### consume

An array of partitions to consume and what offset to begin consuming from in the form of 'brokerId-partitionId:startOffset'. For example broker 2 partition 3 offset 5 is '2-3:5'

####### produce

An array of brokers to produce to with the count of partitions in the form of 'brokerId:partitionCount'. For example broker 3 with 8 partitions is '3:8'

events

connect

Fires when the client is connected to a broker.

Topic

A topic is a Stream that may be Readable for consuming and Writable for producing. Retrieve a topic from the kafka instance.

var topic = kafka.topic('a topic')

pause

Pause the consumer stream

resume

Resume the consumer stream

destroy

Destroy the consumer stream

setEncoding

Sets the encoding of the data emitted by the data event

write

Write a message to the topic. Returns false if the message buffer is full.

end

Same as write

pipe

Pipe the stream of messages to the next Writable Stream

events

data

Fires for each message. Data is a Buffer by default or a string if setEncoding was called

drain

Fires when the producer stream can handle more messages

error

Fires when there is a produce or consume error

License

BSD

Metadata

  • BSD
  • Whatever
  • Danny Coates
  • released 11/12/2012

Downloads

Maintainers