var fs = require('fs') , Stream = require('stream').Stream , clarinet = require('../clarinet') , parse_stream = clarinet.createStream() , previous = '' , buffer = {} , stack = [] , new_thing = false ; function debug_log() { if(process.env.DEBUG) { console.log.apply(null, arguments); } } parse_stream.on('openobject', function(name) { if(new_thing) { console.log(JSON.stringify(buffer, null, 2)); buffer = {}; new_thing = false; } previous = name; stack.push(name); debug_log('=== {', name, buffer); }); parse_stream.on('closeobject', function() { stack.pop(); debug_log('=== }', null, buffer); }); parse_stream.on('key', function(name) { previous = name; stack.pop(); stack.push(name); debug_log('=== ,', name, buffer); }); parse_stream.on('value', function(value) { if(previous === 'event') { value = JSON.parse(value); } var expected = stack.length-1; stack.reduce(function (ac, x, i) { if(i === expected) { ac[x] = value; } ac[x] = ac[x] || {}; return ac[x]; }, buffer); debug_log('=== v', value, buffer); }); parse_stream.on('error', function (e) { new_thing = true; }); function fixLogglyStream() { var log_stream = new Stream(); log_stream.readable = true; log_stream.writable = true; log_stream.write = function (buf) { var as_string = buf.toString('utf-8').replace(/\\\\/g, '\\'); this.emit('data', as_string); }; log_stream.end = function (buf) { if (arguments.length) { log_stream.write(buf); } log_stream.writable = false; }; log_stream.destroy = function () { log_stream.writable = false; }; return log_stream; } fs.createReadStream(__dirname + '/loggly.txt') .pipe(fixLogglyStream()) .pipe(parse_stream) ;