Commit 13a4186a authored by Fabien's avatar Fabien Committed by GitHub

Merge pull request #209 from steemit/72-http-socket-support

Http socket support
parents 1883be1b ee88e814
{
"transport": "ws",
"websocket": "wss://steemd.steemit.com",
"uri": "https://steemd.steemit.com",
"dev_uri": "https://steemd.steemitdev.com",
"stage_uri": "https://steemd.steemitstage.com",
"address_prefix": "STM",
"chain_id": "0000000000000000000000000000000000000000000000000000000000000000"
}
......@@ -3,6 +3,7 @@
- [Install](#install)
- [Browser](#browser)
- [Config](#config)
- [JSON-RPC](#jsonrpc)
- [Database API](#api)
- [Subscriptions](#subscriptions)
- [Tags](#tags)
......@@ -54,6 +55,16 @@ steem.config.set('address_prefix','STM');
steem.config.get('chain_id');
```
## JSON-RPC
Here is how to activate JSON-RPC transport:
```js
steem.api.setOptions({
transport: 'http',
uri: 'https://steemd.steemitdev.com' // Optional, by default https://steemd.steemit.com is used.
});
```
# API
## Subscriptions
......@@ -517,12 +528,6 @@ steem.api.getFollowCount(account, function(err, result) {
## Broadcast API
### Broadcast Transaction
```
steem.api.broadcastTransaction(trx, function(err, result) {
console.log(err, result);
});
```
### Broadcast Transaction Synchronous
```
steem.api.broadcastTransactionSynchronous(trx, function(err, result) {
......@@ -535,12 +540,7 @@ steem.api.broadcastBlock(b, function(err, result) {
console.log(err, result);
});
```
### Broadcast Transaction With Callback
```
steem.api.broadcastTransactionWithCallback(confirmationCallback, trx, function(err, result) {
console.log(err, result);
});
```
# Broadcast
### Account Create
......
{
"name": "steem",
"version": "0.5.20",
"version": "0.6.0",
"description": "Steem.js the JavaScript API for Steem blockchain",
"main": "lib/index.js",
"scripts": {
......@@ -45,9 +45,10 @@
"debug": "^2.6.8",
"detect-node": "^2.0.3",
"ecurve": "^1.0.5",
"isomorphic-fetch": "^2.2.1",
"lodash": "^4.16.4",
"secure-random": "^1.1.1",
"ws": "^1.1.1"
"ws": "^3.0.0"
},
"devDependencies": {
"babel-cli": "^6.16.0",
......
import EventEmitter from 'events';
import Promise from 'bluebird';
import cloneDeep from 'lodash/cloneDeep';
import defaults from 'lodash/defaults';
import isNode from 'detect-node';
import newDebug from 'debug';
import config from '../config';
import methods from './methods';
import { camelCase } from '../utils';
const debugEmitters = newDebug('steem:emitters');
const debugProtocol = newDebug('steem:protocol');
const debugSetup = newDebug('steem:setup');
const debugApiIds = newDebug('steem:api_ids');
const debugWs = newDebug('steem:ws');
let WebSocket;
if (isNode) {
WebSocket = require('ws'); // eslint-disable-line global-require
} else if (typeof window !== 'undefined') {
WebSocket = window.WebSocket;
} else {
throw new Error('Couldn\'t decide on a `WebSocket` class');
}
const DEFAULTS = {
apiIds: {
database_api: 0,
login_api: 1,
follow_api: 2,
network_broadcast_api: 4,
},
id: 0,
};
const expectedResponseMs = process.env.EXPECTED_RESPONSE_MS || 2000;
import transports from './transports';
import {camelCase} from '../utils';
import {hash} from '../auth/ecc';
import {ops} from '../auth/serializer';
class Steem extends EventEmitter {
constructor(options = {}) {
super(options);
defaults(options, DEFAULTS);
this.options = cloneDeep(options);
this.id = 0;
this.inFlight = 0;
this.currentP = Promise.fulfilled();
this.apiIds = this.options.apiIds;
this.isOpen = false;
this.releases = [];
this.requests = {};
// A Map of api name to a promise to it's API ID refresh call
this.apiIdsP = {};
}
setWebSocket(url) {
console.warn("steem.api.setWebSocket(url) is now deprecated instead use steem.config.set('websocket',url)");
debugSetup('Setting WS', url);
config.set('websocket', url);
this.stop();
}
start() {
if (this.startP) {
return this.startP;
}
const startP = new Promise((resolve, reject) => {
if (startP !== this.startP) return;
const url = config.get('websocket');
this.ws = new WebSocket(url);
const releaseOpen = this.listenTo(this.ws, 'open', () => {
debugWs('Opened WS connection with', url);
this.isOpen = true;
releaseOpen();
resolve();
});
const releaseClose = this.listenTo(this.ws, 'close', () => {
debugWs('Closed WS connection with', url);
this.isOpen = false;
delete this.ws;
this.stop();
if (startP.isPending()) {
reject(new Error(
'The WS connection was closed before this operation was made'
));
}
});
const releaseMessage = this.listenTo(this.ws, 'message', (message) => {
debugWs('Received message', message.data);
const data = JSON.parse(message.data);
const id = data.id;
const request = this.requests[id];
if (!request) {
debugWs('Steem.onMessage error: unknown request ', id);
return;
}
delete this.requests[id];
this.onMessage(data, request);
});
this.releases = this.releases.concat([
releaseOpen,
releaseClose,
releaseMessage,
]);
});
this.startP = startP;
this.getApiIds();
return startP;
this._setTransport(options);
this.options = options;
}
stop() {
debugSetup('Stopping...');
if (this.ws) this.ws.close();
this.apiIdsP = {};
delete this.startP;
delete this.ws;
this.releases.forEach((release) => release());
this.releases = [];
_setTransport(options) {
if (options.transport) {
if (this.transport && this._transportType !== options.transport) {
this.transport.stop();
}
listenTo(target, eventName, callback) {
debugEmitters('Adding listener for', eventName, 'from', target.constructor.name);
if (target.addEventListener) target.addEventListener(eventName, callback);
else target.on(eventName, callback);
this._transportType = options.transport;
return () => {
debugEmitters('Removing listener for', eventName, 'from', target.constructor.name);
if (target.removeEventListener) target.removeEventListener(eventName, callback);
else target.removeListener(eventName, callback);
};
}
/**
* Refreshes API IDs, populating the `Steem::apiIdsP` map.
*
* @param {String} [requestName] If provided, only this API will be refreshed
* @param {Boolean} [force] If true the API will be forced to refresh, ignoring existing results
*/
getApiIds(requestName, force) {
if (!force && requestName && this.apiIdsP[requestName]) {
return this.apiIdsP[requestName];
if (typeof options.transport === 'string') {
if (!transports[options.transport]) {
throw new TypeError(
'Invalid `transport`, valid values are `http`, `ws` or a class',
);
}
const apiNamesToRefresh = requestName ? [requestName] : Object.keys(this.apiIds);
apiNamesToRefresh.forEach((name) => {
debugApiIds('Syncing API ID', name);
this.apiIdsP[name] = this.getApiByNameAsync(name).then((result) => {
if (result != null) {
this.apiIds[name] = result;
this.transport = new transports[options.transport](options);
} else {
debugApiIds('Dropped null API ID for', name, result);
}
});
});
// If `requestName` was provided, only wait for this API ID
if (requestName) {
return this.apiIdsP[requestName];
this.transport = new options.transport(options);
}
// Otherwise wait for all of them
return Promise.props(this.apiIdsP);
} else {
this.transport = new transports.ws(options);
}
onMessage(message, request) {
const {api, data, resolve, reject, start_time} = request;
debugWs('-- Steem.onMessage -->', message.id);
const errorCause = message.error;
if (errorCause) {
const err = new Error(
// eslint-disable-next-line prefer-template
(errorCause.message || 'Failed to complete operation') +
' (see err.payload for the full error payload)'
);
err.payload = message;
reject(err);
return;
}
if (api === 'login_api' && data.method === 'login') {
debugApiIds(
'network_broadcast_api API ID depends on the WS\' session. ' +
'Triggering a refresh...'
);
this.getApiIds('network_broadcast_api', true);
start() {
return this.transport.start();
}
debugProtocol('Resolved', api, data, '->', message);
this.emit('track-performance', data.method, Date.now() - start_time);
delete this.requests[message.id];
resolve(message.result);
stop() {
return this.transport.stop();
}
send(api, data, callback) {
debugSetup('Steem::send', api, data);
const id = data.id || this.id++;
const startP = this.start();
const apiIdsP = api === 'login_api' && data.method === 'get_api_by_name'
? Promise.fulfilled()
: this.getApiIds(api);
if (api === 'login_api' && data.method === 'get_api_by_name') {
debugApiIds('Sending setup message');
} else {
debugApiIds('Going to wait for setup messages to resolve');
return this.transport.send(api, data, callback);
}
this.currentP = Promise.join(startP, apiIdsP)
.then(() => new Promise((resolve, reject) => {
if (!this.ws) {
reject(new Error(
'The WS connection was closed while this request was pending'
));
return;
setOptions(options) {
Object.assign(this.options, options);
this._setTransport(this.options);
this.transport.setOptions(this.options);
}
const payload = JSON.stringify({
id,
method: 'call',
params: [
this.apiIds[api],
data.method,
data.params,
],
});
debugWs('Sending message', payload);
this.requests[id] = {
api,
data,
resolve,
reject,
start_time: Date.now()
};
// this.inFlight += 1;
this.ws.send(payload);
}))
.nodeify(callback);
setWebSocket(url) {
this.setOptions({websocket: url});
}
return this.currentP;
setUri(url) {
this.setOptions({uri: url});
}
streamBlockNumber(mode = 'head', callback, ts = 200) {
......@@ -263,8 +74,8 @@ class Steem extends EventEmitter {
const update = () => {
if (!running) return;
this.getDynamicGlobalPropertiesAsync()
.then((result) => {
this.getDynamicGlobalPropertiesAsync().then(
result => {
const blockId = mode === 'irreversible'
? result.last_irreversible_block_num
: result.head_block_number;
......@@ -286,9 +97,11 @@ class Steem extends EventEmitter {
Promise.delay(ts).then(() => {
update();
});
}, (err) => {
},
err => {
callback(err);
});
},
);
};
update();
......@@ -338,7 +151,7 @@ class Steem extends EventEmitter {
}
if (result && result.transactions) {
result.transactions.forEach((transaction) => {
result.transactions.forEach(transaction => {
callback(null, transaction);
});
}
......@@ -360,7 +173,7 @@ class Steem extends EventEmitter {
return;
}
transaction.operations.forEach((operation) => {
transaction.operations.forEach(operation => {
callback(null, operation);
});
});
......@@ -370,35 +183,71 @@ class Steem extends EventEmitter {
}
// Generate Methods from methods.json
methods.forEach((method) => {
methods.forEach(method => {
const methodName = method.method_name || camelCase(method.method);
const methodParams = method.params || [];
Steem.prototype[`${methodName}With`] =
function Steem$$specializedSendWith(options, callback) {
const params = methodParams.map((param) => options[param]);
return this.send(method.api, {
Steem.prototype[`${methodName}With`] = function Steem$$specializedSendWith(
options,
callback,
) {
const params = methodParams.map(param => options[param]);
return this.send(
method.api,
{
method: method.method,
params,
}, callback);
},
callback,
);
};
Steem.prototype[methodName] =
function Steem$specializedSend(...args) {
Steem.prototype[methodName] = function Steem$specializedSend(...args) {
const options = methodParams.reduce((memo, param, i) => {
memo[param] = args[i]; // eslint-disable-line no-param-reassign
return memo;
}, {});
const callback = args[methodParams.length];
return this[`${methodName}With`](options, callback);
};
});
/**
* Wrap transaction broadcast: serializes the object and adds error reporting
*/
Steem.prototype.broadcastTransactionSynchronousWith = function Steem$$specializedSendWith(
options,
callback,
) {
const trx = options.trx;
return this.send(
'network_broadcast_api',
{
method: 'broadcast_transaction_synchronous',
params: [trx],
},
(err, result) => {
if (err) {
const {signed_transaction} = ops;
// console.log('-- broadcastTransactionSynchronous -->', JSON.stringify(signed_transaction.toObject(trx), null, 2));
// toObject converts objects into serializable types
const trObject = signed_transaction.toObject(trx);
const buf = signed_transaction.toBuffer(trx);
err.digest = hash.sha256(buf).toString('hex');
err.transaction_id = buf.toString('hex');
err.transaction = JSON.stringify(trObject);
callback(err, '');
} else {
callback('', result);
}
},
);
};
Promise.promisifyAll(Steem.prototype);
// Export singleton instance
const steem = new Steem();
const steem = new Steem(config);
exports = module.exports = steem;
exports.Steem = Steem;
exports.Steem.DEFAULTS = DEFAULTS;
import Promise from 'bluebird';
import EventEmitter from 'events';
import each from 'lodash/each';
export default class Transport extends EventEmitter {
constructor(options = {}) {
super(options);
this.options = options;
this.id = 0;
}
setOptions(options) {
each(options, (value, key) => {
this.options[key] = value;
});
this.stop();
}
listenTo(target, eventName, callback) {
if (target.addEventListener) target.addEventListener(eventName, callback);
else target.on(eventName, callback);
return () => {
if (target.removeEventListener)
target.removeEventListener(eventName, callback);
else target.removeListener(eventName, callback);
};
}
send() {}
start() {}
stop() {}
}
Promise.promisifyAll(Transport.prototype);
import fetch from 'isomorphic-fetch';
import newDebug from 'debug';
import Transport from './base';
const debug = newDebug('steem:http');
export default class HttpTransport extends Transport {
send(api, data, callback) {
debug('Steem::send', api, data);
const id = data.id || this.id++;
const payload = {
id,
jsonrpc: '2.0',
method: data.method,
params: data.params,
};
fetch(this.options.uri, {
method: 'POST',
body: JSON.stringify(payload),
})
.then(res => {
debug('Steem::receive', api, data);
return res.json();
})
.then(json => {
const err = json.error || '';
const result = json.result || '';
callback(err, result);
})
.catch(err => {
callback(err, '');
});
}
}
import HttpTransport from './http';
import WsTransport from './ws';
export default {
http: HttpTransport,
ws: WsTransport,
};
import Promise from 'bluebird';
import defaults from 'lodash/defaults';
import isNode from 'detect-node';
import newDebug from 'debug';
import Transport from './base';
let WebSocket;
if (isNode) {
WebSocket = require('ws'); // eslint-disable-line global-require
} else if (typeof window !== 'undefined') {
WebSocket = window.WebSocket;
} else {
throw new Error("Couldn't decide on a `WebSocket` class");
}
const debug = newDebug('steem:ws');
const DEFAULTS = {
apiIds: {
database_api: 0,
login_api: 1,
follow_api: 2,
network_broadcast_api: 4,
},
id: 0,
};
export default class WsTransport extends Transport {
constructor(options = {}) {
defaults(options, DEFAULTS);
super(options);
this.apiIds = options.apiIds;
this.inFlight = 0;
this.currentP = Promise.fulfilled();
this.isOpen = false;
this.releases = [];
this.requests = {};