code
var AliMNS;
(function(AliMNS) {
var Account = (function() {
function Account(accountId, keyId, keySecret) {
this._accountId = accountId;
this._keyId = keyId;
this._keySecret = keySecret;
}
Account.prototype.getAccountId = function() {
return this._accountId;
};
Account.prototype.getOwnerId = function() {
return this._accountId;
};
Account.prototype.getKeyId = function() {
return this._keyId;
};
Account.prototype.hmac_sha1 = function(text, encoding) {
var hmacSHA1 = CryptoA.createHmac("sha1", this._keySecret);
return hmacSHA1.update(text).digest(encoding);
};
Account.prototype.b64md5 = function(text) {
var cryptoMD5 = CryptoA.createHash("md5");
var md5HEX = cryptoMD5.update(text).digest("hex");
var buf = new Buffer.Buffer(md5HEX, "utf8");
return buf.toString("base64");
};
return Account;
})();
AliMNS.Account = Account;
})(AliMNS || (AliMNS = {}));
var AliMNS;
(function(AliMNS) {
var MNS = (function() {
function MNS(account, region) {
this._region = "hangzhou";
this._pattern = "http://%s.mns.cn-%s.aliyuncs.com/queues/";
this._account = account;
if (region) this._region = region;
this._url = this.makeURL();
this._openStack = new AliMNS.OpenStack(account);
}
MNS.prototype.listP = function(prefix, pageSize, pageMarker) {
var headers = {};
if (prefix) headers["x-mns-prefix"] = prefix;
if (pageMarker) headers["x-mns-marker"] = pageMarker;
if (pageSize) headers["x-mns-ret-number"] = pageSize;
return this._openStack.sendP("GET", this._url, null, headers);
};
MNS.prototype.createP = function(name, options) {
var body = {
Queue: ""
};
if (options) body.Queue = options;
var url = Url.resolve(this._url, name);
return this._openStack.sendP("PUT", url, body);
};
MNS.prototype.deleteP = function(name) {
var url = Url.resolve(this._url, name);
return this._openStack.sendP("DELETE", url);
};
MNS.prototype.makeURL = function() {
return Util.format(this._pattern, this._account.getAccountId(), this._region);
};
return MNS;
})();
AliMNS.MNS = MNS;
AliMNS.MQS = MNS;
})(AliMNS || (AliMNS = {}));
var AliMNS;
(function(AliMNS) {
var MQ = (function() {
function MQ(name, account, region) {
this._region = "hangzhou";
this._pattern = "http://%s.mns.cn-%s.aliyuncs.com/queues/%s";
this._signalSTOP = true;
this._evStopped = "AliMNS_MQ_NOTIFY_STOPPED";
this._timeoutCount = 0;
this._timeoutMax = 128;
this._name = name;
this._account = account;
if (region) this._region = region;
this._urlAttr = this.makeAttrURL();
this._url = this.makeURL();
this._openStack = new AliMNS.OpenStack(account);
this._emitter = new Events.EventEmitter();
}
MQ.prototype.getAttrsP = function() {
debug("GET " + this._urlAttr);
return this._openStack.sendP("GET", this._urlAttr);
};
MQ.prototype.setAttrsP = function(options) {
var body = {
Queue: options
};
debug("PUT " + this._urlAttr, body);
return this._openStack.sendP("PUT", this._urlAttr + "?metaoverride=true", body);
};
MQ.prototype.sendP = function(msg, priority, delaySeconds) {
var b64 = this.utf8ToBase64(msg);
var body = {
Message: {
MessageBody: b64
}
};
if (!isNaN(priority)) body.Message.Priority = priority;
if (!isNaN(delaySeconds)) body.Message.DelaySeconds = delaySeconds;
debug("PUT " + this._url, body);
return this._openStack.sendP("POST", this._url, body);
};
MQ.prototype.recvP = function(waitSeconds) {
var _this = this;
var url = this._url;
if (waitSeconds) url += "?waitseconds=" + waitSeconds;
debug("GET " + url);
return new Promise(function(resolve, reject) {
var bGotResponse = false;
var timeOutSeconds = 5;
if (waitSeconds) timeOutSeconds += waitSeconds;
setTimeout(function() {
if (!bGotResponse) reject(new Error("timeout"));
}, 1000 * timeOutSeconds);
_this._openStack.sendP("GET", url).done(function(data) {
debug(data);
bGotResponse = true;
if (data && data.Message && data.Message.MessageBody) {
data.Message.MessageBody = _this.base64ToUtf8(data.Message.MessageBody);
}
resolve(data);
}, function(ex) {
debug(ex);
bGotResponse = true;
reject(ex);
});
});
};
MQ.prototype.peekP = function() {
var _this = this;
debug("GET " + this._url);
return this._openStack.sendP("GET", this._url + "?peekonly=true").then(function(data) {
debug(data);
if (data && data.Message && data.Message.MessageBody) {
data.Message.MessageBody = _this.base64ToUtf8(data.Message.MessageBody);
}
return data;
});
};
MQ.prototype.deleteP = function(receiptHandle) {
debug("DELETE " + this._url);
return this._openStack.sendP("DELETE", this._url + "?ReceiptHandle=" + receiptHandle);
};
MQ.prototype.reserveP = function(receiptHandle, reserveSeconds) {
debug("PUT " + this._url);
return this._openStack.sendP("PUT", this._url + "?ReceiptHandle=" + receiptHandle + "&VisibilityTimeout=" + reserveSeconds);
};
MQ.prototype.notifyRecv = function(cb, waitSeconds) {
this._signalSTOP = false;
this._timeoutCount = 0;
this.notifyRecvInternal(cb, waitSeconds || 5);
};
MQ.prototype.notifyRecvInternal = function(cb, waitSeconds) {
var _this = this;
if (this._signalSTOP) {
debug("notifyStopped");
this._emitter.emit(this._evStopped);
return;
}
debug("notifyRecvInternal()");
try {
this.recvP(waitSeconds).done(function(dataRecv) {
try {
debug(dataRecv);
_this._timeoutCount = 0;
if (cb(null, dataRecv)) {
_this.deleteP(dataRecv.Message.ReceiptHandle).done(null, function(ex) {
console.log(ex);
});
}
} catch (ex) {}
_this.notifyRecvInternal(cb, waitSeconds);
}, function(ex) {
debug(ex);
if ((!ex.Error) || (ex.Error.Code !== "MessageNotExist")) {
cb(ex, null);
}
if (ex) {
if (ex.message === "timeout") {
_this._timeoutCount++;
if (_this._timeoutCount > _this._timeoutMax) {
cb(new Error("NetworkBroken"), null);
}
} else if (ex.Error && ex.Error.Code === "MessageNotExist") {
_this._timeoutCount = 0;
}
}
process.nextTick(function() {
_this.notifyRecvInternal(cb, waitSeconds);
});
});
} catch (ex) {
console.log(ex.toString());
debug("Retry after 5 seconds");
setTimeout(function() {
_this.notifyRecvInternal(cb, waitSeconds);
}, 5000);
}
};
MQ.prototype.notifyStopP = function() {
var _this = this;
if (this._signalSTOP) return Promise.resolve(this._evStopped);
this._signalSTOP = true;
return new Promise(function(resolve) {
_this._emitter.once(_this._evStopped, function() {
resolve(_this._evStopped);
});
});
};
MQ.prototype.makeAttrURL = function() {
return Util.format(this._pattern, this._account.getAccountId(), this._region, this._name);
};
MQ.prototype.makeURL = function() {
return this.makeAttrURL() + "/messages";
};
MQ.prototype.utf8ToBase64 = function(src) {
var buf = new Buffer.Buffer(src, 'utf8');
return buf.toString('base64');
};
MQ.prototype.base64ToUtf8 = function(src) {
var buf = new Buffer.Buffer(src, 'base64');
return buf.toString('utf8');
};
return MQ;
})();
AliMNS.MQ = MQ;
})(AliMNS || (AliMNS = {}));
var AliMNS;
(function(AliMNS) {
var OpenStack = (function() {
function OpenStack(account) {
this._patternMNS = "MNS %s:%s";
this._patternSign = "%s\n%s\n%s\n%s\n%s%s";
this._contentType = "text/xml;charset=utf-8";
this._version = "2015-06-06";
this._account = account;
this._xmlBuilder = new Xml2js.Builder();
}
OpenStack.prototype.sendP = function(method, url, body, headers) {
var req = {
method: method,
url: url
};
if (body) req.body = this._xmlBuilder.buildObject(body);
req.headers = this.makeHeaders(method, url, headers, req.body);
return Request.requestP(req).then(function(response) {
return Xml2js.parseStringP(response.body, {
explicitArray: false
}).then(function(bodyJSON) {
response.bodyJSON = bodyJSON;
return response;
}, function() {
response.bodyJSON = response.body;
return response;
});
}).then(function(response) {
if (response.statusCode < 400) {
if (response.bodyJSON) return response.bodyJSON;
else
return response.statusCode;
} else {
if (response.bodyJSON) return Promise.reject(response.bodyJSON);
else
return Promise.reject(response.statusCode);
}
});
};
OpenStack.prototype.makeHeaders = function(mothod, url, headers, body) {
if (!headers) headers = {};
var contentMD5 = "";
var contentType = "";
if (body) {
if (!headers["Content-Length"]) headers["Content-Length"] = body.length;
if (!headers["Content-Type"]) headers["Content-Type"] = this._contentType;
contentType = headers["Content-Type"];
contentMD5 = this._account.b64md5(body);
headers["Content-MD5"] = contentMD5;
}
if (!headers["x-mns-version"]) headers["x-mns-version"] = this._version;
var headsLower = {};
var keys = [];
for (var key in headers) {
if (headers.hasOwnProperty(key)) {
var lower = key.toLowerCase();
keys.push(lower);
headsLower[lower] = headers[key];
}
}
keys.sort();
var mnsHeaders = "";
for (var i in keys) {
var k = keys[i];
if (k.indexOf("x-mns-") === 0) {
mnsHeaders += Util.format("%s:%s\n", k, headsLower[k]);
}
}
var tm = (new Date()).toUTCString();
var mnsURL = Url.parse(url);
headers.Date = tm;
headers.Authorization = this.authorize(mothod, mnsURL.path, mnsHeaders, contentType, contentMD5, tm);
headers.Host = mnsURL.host;
return headers;
};
OpenStack.prototype.authorize = function(httpVerb, mnsURI, mnsHeaders, contentType, contentMD5, tm) {
return Util.format(this._patternMNS, this._account.getKeyId(), this.signature(httpVerb, mnsURI, mnsHeaders, contentType, contentMD5, tm));
};
OpenStack.prototype.signature = function(httpVerb, mnsURI, mnsHeaders, contentType, contentMD5, tm) {
var text = Util.format(this._patternSign, httpVerb, contentMD5, contentType, tm, mnsHeaders, mnsURI);
return this._account.hmac_sha1(text, "base64");
};
return OpenStack;
})();
AliMNS.OpenStack = OpenStack;
})(AliMNS || (AliMNS = {}));
module.exports = AliMNS;
var Buffer = require("buffer");
var CryptoA = require("crypto");
var Events = require("events");
var Util = require("util");
var Url = require("url");
var debug = require("debug")("ali-mns");
var Promise = require("promise");
var Request = require("request");
Request.requestP = Promise.denodeify(Request);
Request.debug = false;
var Xml2js = require("xml2js");
Xml2js.parseStringP = Promise.denodeify(Xml2js.parseString);