Skip to content
Snippets Groups Projects
Commit 6cd397cf authored by Harry Schmidt's avatar Harry Schmidt Committed by GitHub
Browse files

Merge branch 'master' into package-freeze

parents 8bb282ea 8acbfe1f
No related branches found
No related tags found
No related merge requests found
{
"transport": "ws",
"websocket": "wss://steemd.steemit.com",
"websocketdev": "wss://steemd.steemitdev.com",
"uri": "https://steemd.steemit.com",
"url": "",
"dev_uri": "https://steemd.steemitdev.com",
......
......@@ -20,19 +20,42 @@ class Steem extends EventEmitter {
constructor(options = {}) {
super(options);
this._setTransport(options);
this._setLogger(options)
this._setLogger(options);
this.options = options;
this.seqNo = 0; // used for rpc calls
methods.forEach(method => {
const methodName = method.method_name || camelCase(method.method);
const methodParams = method.params || [];
this[`${methodName}With`] = (options, callback) => {
return this.send(method.api, {
method: method.method,
params: methodParams.map(param => options[param])
}, callback);
};
this[methodName] = (...args) => {
const options = methodParams.reduce((memo, param, i) => {
memo[param] = args[i]; // eslint-disable-line no-param-reassign
return memo;
}, {});
const callback = args[methodParams.length];
return this[`${methodName}With`](options, callback);
};
this[`${methodName}WithAsync`] = Promise.promisify(this[`${methodName}With`]);
this[`${methodName}Async`] = Promise.promisify(this[methodName]);
});
}
_setTransport(options) {
if (options.url.match('^((http|https)?:\/\/)')) {
if (options.url && options.url.match('^((http|https)?:\/\/)')) {
options.uri = options.url;
options.transport = 'http';
this._transportType = options.transport;
this.options = options;
this.transport = new transports.http(options);
} else if (options.url.match('^((ws|wss)?:\/\/)')) {
} else if (options.url && options.url.match('^((ws|wss)?:\/\/)')) {
options.websocket = options.url;
options.transport = 'ws';
this._transportType = options.transport;
......@@ -133,9 +156,9 @@ class Steem extends EventEmitter {
setOptions(options) {
Object.assign(this.options, options);
this._setLogger(options)
this._setTransport(this.options);
this.transport.setOptions(this.options);
this._setLogger(options);
this._setTransport(options);
this.transport.setOptions(options);
}
setWebSocket(url) {
......@@ -267,45 +290,8 @@ class Steem extends EventEmitter {
return release;
}
}
// Generate Methods from methods.json
methods.forEach(method => {
const methodName = method.method_name || camelCase(method.method);
const methodParams = method.params || [];
Steem.prototype[`${methodName}With`] = function Steem$$specializedSendWith(
options,
callback,
) {
const params = methodParams.map(param => options[param]);
return this.send(
method.api, {
method: method.method,
params,
},
callback,
);
};
Steem.prototype[methodName] = function Steem$specializedSend(...args) {
const options = methodParams.reduce((memo, param, i) => {
memo[param] = args[i]; // eslint-disable-line no-param-reassign
return memo;
}, {});
const callback = args[methodParams.length];
return this[`${methodName}With`](options, callback);
};
});
/**
* Wrap transaction broadcast: serializes the object and adds error reporting
*/
Steem.prototype.broadcastTransactionSynchronousWith = function Steem$$specializedSendWith(
options,
callback,
) {
broadcastTransactionSynchronousWith(options, callback) {
const trx = options.trx;
return this.send(
'network_broadcast_api', {
......@@ -330,9 +316,9 @@ Steem.prototype.broadcastTransactionSynchronousWith = function Steem$$specialize
}
},
);
};
Promise.promisifyAll(Steem.prototype);
}
}
// Export singleton instance
const steem = new Steem(config);
......
This diff is collapsed.
import Promise from 'bluebird';
import EventEmitter from 'events';
import each from 'lodash/each';
export default class Transport extends EventEmitter {
constructor(options = {}) {
......@@ -10,9 +9,7 @@ export default class Transport extends EventEmitter {
}
setOptions(options) {
each(options, (value, key) => {
this.options[key] = value;
});
Object.assign(this.options, options);
this.stop();
}
......@@ -30,6 +27,7 @@ export default class Transport extends EventEmitter {
send() {}
start() {}
stop() {}
}
Promise.promisifyAll(Transport.prototype);
import Promise from 'bluebird';
import defaults from 'lodash/defaults';
import isNode from 'detect-node';
import newDebug from 'debug';
......@@ -16,214 +15,125 @@ if (isNode) {
const debug = newDebug('steem:ws');
const DEFAULTS = {
apiIds: {
database_api: 0,
login_api: 1,
follow_api: 2,
network_broadcast_api: 4,
},
id: 0,
};
export default class WsTransport extends Transport {
constructor(options = {}) {
defaults(options, DEFAULTS);
super(options);
this.apiIds = options.apiIds;
super(Object.assign({id: 0}, options));
this._requests = new Map();
this.inFlight = 0;
this.currentP = Promise.fulfilled();
this.isOpen = false;
this.releases = [];
this.requests = {};
this.requestsTime = {};
// A Map of api name to a promise to it's API ID refresh call
this.apiIdsP = {};
}
start() {
if (this.startP) {
return this.startP;
}
const startP = new Promise((resolve, reject) => {
if (startP !== this.startP) return;
const url = this.options.websocket;
this.ws = new WebSocket(url);
const releaseOpen = this.listenTo(this.ws, 'open', () => {
debug('Opened WS connection with', url);
if (this.startPromise) {
return this.startPromise;
}
this.startPromise = new Promise((resolve, reject) => {
this.ws = new WebSocket(this.options.websocket);
this.ws.onerror = (err) => {
this.startPromise = null;
reject(err);
};
this.ws.onopen = () => {
this.isOpen = true;
releaseOpen();
this.ws.onerror = this.onError.bind(this);
this.ws.onmessage = this.onMessage.bind(this);
this.ws.onclose = this.onClose.bind(this);
resolve();
});
const releaseClose = this.listenTo(this.ws, 'close', () => {
debug('Closed WS connection with', url);
this.isOpen = false;
delete this.ws;
this.stop();
if (startP.isPending()) {
reject(
new Error(
'The WS connection was closed before this operation was made',
),
);
}
});
const releaseMessage = this.listenTo(this.ws, 'message', message => {
debug('Received message', message.data);
const data = JSON.parse(message.data);
const id = data.id;
const request = this.requests[id];
if (!request) {
debug('Steem.onMessage: unknown request ', id);
}
delete this.requests[id];
this.onMessage(data, request);
});
this.releases = this.releases.concat([
releaseOpen,
releaseClose,
releaseMessage,
]);
});
this.startP = startP;
this.getApiIds();
return startP;
};
});
return this.startPromise;
}
stop() {
debug('Stopping...');
if (this.ws) this.ws.close();
this.apiIdsP = {};
delete this.startP;
delete this.ws;
this.releases.forEach(release => release());
this.releases = [];
}
/**
* Refreshes API IDs, populating the `Steem::apiIdsP` map.
*
* @param {String} [requestName] If provided, only this API will be refreshed
* @param {Boolean} [force] If true the API will be forced to refresh, ignoring existing results
*/
this.startPromise = null;
this.isOpen = false;
this._requests.clear();
getApiIds(requestName, force) {
if (!force && requestName && this.apiIdsP[requestName]) {
return this.apiIdsP[requestName];
if (this.ws) {
this.ws.onerror = this.ws.onmessage = this.ws.onclose = null;
this.ws.close();
this.ws = null;
}
}
const apiNamesToRefresh = requestName
? [requestName]
: Object.keys(this.apiIds);
apiNamesToRefresh.forEach(name => {
this.apiIdsP[name] = this.sendAsync('login_api', {
method: 'get_api_by_name',
params: [name],
}).then(result => {
if (result != null) {
this.apiIds[name] = result;
send(api, data, callback) {
debug('Steem::send', api, data);
return this.start().then(() => {
const deferral = {};
new Promise((resolve, reject) => {
deferral.resolve = (val) => {
resolve(val);
callback(null, val);
};
deferral.reject = (val) => {
reject(val);
callback(val);
}
});
if (this.options.useAppbaseApi) {
api = 'condenser_api';
}
const _request = {
deferral,
startedAt: Date.now(),
message: {
id: data.id || this.id++,
method: 'call',
jsonrpc: '2.0',
params: [api, data.method, data.params]
}
};
this.inFlight++;
this._requests.set(_request.message.id, _request);
this.ws.send(JSON.stringify(_request.message));
return deferral;
});
}
// If `requestName` was provided, only wait for this API ID
if (requestName) {
return this.apiIdsP[requestName];
onError(error) {
for (let _request of this._requests) {
_request.deferral.reject(error);
}
// Otherwise wait for all of them
return Promise.props(this.apiIdsP);
this.stop();
}
send(api, data, callback) {
debug('Steem::send', api, data);
const id = data.id || this.id++;
const startP = this.start();
const apiIdsP = api === 'login_api' && data.method === 'get_api_by_name'
? Promise.fulfilled()
: this.getApiIds(api);
if (api === 'login_api' && data.method === 'get_api_by_name') {
debug('Sending setup message');
} else {
debug('Going to wait for setup messages to resolve');
onClose() {
const error = new Error('Connection was closed');
for (let _request of this._requests) {
_request.deferral.reject(error);
}
this.currentP = Promise.join(startP, apiIdsP)
.then(
() =>
new Promise((resolve, reject) => {
if (!this.ws) {
reject(
new Error(
'The WS connection was closed while this request was pending',
),
);
return;
}
const payload = JSON.stringify({
id,
method: 'call',
params: [this.apiIds[api], data.method, data.params],
});
debug('Sending message', payload);
this.requests[id] = {
api,
data,
resolve,
reject,
start_time: Date.now(),
};
// this.inFlight += 1;
this.ws.send(payload);
}),
)
.nodeify(callback);
return this.currentP;
this._requests.clear();
}
onMessage(message, request) {
const {api, data, resolve, reject, start_time} = request;
onMessage(websocketMessage) {
const message = JSON.parse(websocketMessage.data);
debug('-- Steem.onMessage -->', message.id);
if (!this._requests.has(message.id)) {
throw new Error(`Panic: no request in queue for message id ${message.id}`);
}
const _request = this._requests.get(message.id);
this._requests.delete(message.id);
const errorCause = message.error;
if (errorCause) {
const err = new Error(
// eslint-disable-next-line prefer-template
(errorCause.message || 'Failed to complete operation') +
' (see err.payload for the full error payload)',
' (see err.payload for the full error payload)'
);
err.payload = message;
reject(err);
return;
}
if (api === 'login_api' && data.method === 'login') {
debug(
"network_broadcast_api API ID depends on the WS' session. " +
'Triggering a refresh...',
);
this.getApiIds('network_broadcast_api', true);
_request.deferral.reject(err);
} else {
this.emit('track-performance', _request.message.method, Date.now() - _request.startedAt);
_request.deferral.resolve(message.result);
}
debug('Resolved', api, data, '->', message);
this.emit('track-performance', data.method, Date.now() - start_time);
delete this.requests[message.id];
resolve(message.result);
}
}
......@@ -136,4 +136,16 @@ describe('steem.api:', function () {
});
});
});
describe('useApiOptions', () => {
it('works ok with the dev instances', async() => {
steem.api.setOptions({ useAppbaseApi: true, url: steem.config.get('websocketdev') });
const result = await steem.api.getContentAsync('yamadapc', 'test-1-2-3-4-5-6-7-9');
steem.api.setOptions({ useAppbaseApi: false, url: steem.config.get('websocket') });
result.should.have.properties(testPost);
});
});
});
......@@ -60,7 +60,6 @@ describe('steem.broadcast:', () => {
'test-1-2-3-4-5-6-7-9',
-1000
);
tx.should.have.properties([
'expiration',
'ref_block_num',
......@@ -152,17 +151,13 @@ describe('steem.broadcast:', () => {
});
describe('writeOperations', () => {
it('wrong', (done) => {
it('receives a properly formatted error response', () => {
const wif = steem.auth.toWif('username', 'password', 'posting');
steem.broadcast.vote(wif, 'voter', 'author', 'permlink', 0, (err) => {
if(err && /tx_missing_posting_auth/.test(err.message)) {
should.exist(err.digest);
should.exist(err.transaction);
should.exist(err.transaction_id);
done();
} else {
console.log(err);
}
return steem.broadcast.voteAsync(wif, 'voter', 'author', 'permlink', 0).
then(() => {
throw new Error('writeOperation should have failed but it didn\'t');
}, (e) => {
should.exist(e.message);
});
});
});
......
......@@ -35,8 +35,5 @@ function template(op) {
assert(op.toObject({}, {use_default: true}))
assert(op.toObject({}, {use_default: true, annotate: true}))
// sample json
let obj = op.toObject({}, {use_default: true, annotate: false})
console.log(" ", op.operation_name, "\t", JSON.stringify(obj), "\n")
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment