连接Couchbase
Posted cathyxiao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了连接Couchbase相关的知识,希望对你有一定的参考价值。
web.config中增加
<configSections>
<sectionGroup name="couchbaseClients">
<section name="couchbase" type="Couchbase.Configuration.Client.Providers.CouchbaseClientSection, Couchbase.NetClient" />
</sectionGroup>
</configSections>
<couchbaseClients>
<couchbase useSsl="false">
<servers>
<add uri="http://192.168.10.80:8091/pools" />
<add uri="http://192.168.10.81:8091/pools" />
</servers>
<buckets>
<add name="GuotaiTrading" useSsl="false" password="">
<connectionPool name="custom" maxSize="20" minSize="1">
</connectionPool>
</add>
</buckets>
</couchbase>
</couchbaseClients>
增加CbManager类:
public class CbManager
private static ICluster cluster get; set;
private static IBucket tradeBucket get; set;
private static IBucket sessionBucket get; set;
/// <summary>
/// 初始化Couchbase;
/// Web.Config配置AppSettings["CouchBaseSection"],如果没有配置,默认"couchbaseClients/couchbase";
/// tradeBucketName默认为"GuotaiTrading";
/// sessionBucketName默认为"gtSessions";
/// </summary>
public static void Init()
string strSection = ConfigurationManager.AppSettings["CouchBaseSection"];
strSection = strSection == null ? "couchbaseClients/couchbase" : strSection;
string tradeBucketName = "GuotaiTrading";
string sessionBucketName = "gtSessions";
if (!string.IsNullOrWhiteSpace(strSection))
cluster = new Cluster(strSection);
if (cluster.Configuration.BucketConfigs.ContainsKey(tradeBucketName))
var strPassword = cluster.Configuration.BucketConfigs[tradeBucketName].Password;
if (!string.IsNullOrWhiteSpace(strPassword))
tradeBucket = cluster.OpenBucket(tradeBucketName, strPassword);
else
tradeBucket = cluster.OpenBucket(tradeBucketName);
if (cluster.Configuration.BucketConfigs.ContainsKey(sessionBucketName))
var strPassword = cluster.Configuration.BucketConfigs[sessionBucketName].Password;
if (!string.IsNullOrWhiteSpace(strPassword))
sessionBucket = cluster.OpenBucket(sessionBucketName, strPassword);
else
sessionBucket = cluster.OpenBucket(sessionBucketName);
public static void ShutDown()
if (tradeBucket != null)
cluster.CloseBucket(tradeBucket);
if (sessionBucket != null)
cluster.CloseBucket(sessionBucket);
#region 类内调用的私有方法
private static async Task<T> BaseGetAsync<T>(IBucket bucket, string id) where T : class
var queryResult = await bucket.GetAsync<T>(id);
if (queryResult != null && queryResult.Success)
return queryResult.Value;
return null;
private static T BaseGet<T>(IBucket bucket, string id) where T : class
var queryResult = bucket.Get<T>(id);
if (queryResult != null && queryResult.Success)
return queryResult.Value;
return null;
private static async Task<bool> BaseUpsertAsync<T>(IBucket bucket, string key, T value, ulong? cas = null, TimeSpan? expires = null)
try
IOperationResult<T> result;
if (cas != null && expires == null)
result = await bucket.UpsertAsync(key, value, cas.Value);
else if (expires != null && cas == null)
result = await bucket.UpsertAsync(key, value, expires.Value);
else if (expires != null && cas != null)
result = await bucket.UpsertAsync(key, value, cas.Value, expires.Value);
else
result = await bucket.UpsertAsync(key, value);
return result.Success;
catch (Exception ex)
Logger.LogError("UpsertAsync Failure ,ex:" + ex.ToString());
return false;
private static bool BaseUpsert<T>(IBucket bucket, string key, T value, TimeSpan? expires = null)
try
var result = expires == null ? bucket.Upsert(key, value) : bucket.Upsert(key, value, expires.Value);
return result.Success;
catch (Exception ex)
Logger.LogError("Upsert Failure ,ex:" + ex.ToString());
return false;
private static async Task<bool> BaseInsertAsync<T>(IBucket bucket, string key, T value, TimeSpan? expires = null)
try
var result = expires == null ? await bucket.InsertAsync(key, value) : await bucket.InsertAsync(key, value, expires.Value);
return result.Success;
catch (Exception ex)
Logger.LogError("InsertAsync Failure ,ex:" + ex.ToString());
return false;
private static bool BaseDelete(IBucket bucket, string key)
try
var result = bucket.Remove(key);
return result.Success;
catch (Exception ex)
Logger.LogError("Delete Failure ,ex:" + ex.ToString());
return false;
private static bool BaseTouch(IBucket bucket, string id, TimeSpan expires)
try
var result = bucket.Touch(id, expires);
return result.Success;
catch (Exception ex)
Logger.LogError("BaseTouch Failure ,ex:" + ex.ToString());
return false;
#endregion
#region tradeBucket增删改查
#region 生成TxId
/// <summary>
/// 获取唯一交易序号
/// </summary>
/// <returns></returns>
public async static Task<ulong> GetTxId()
var result = await tradeBucket.IncrementAsync("MT4_TransactionCounter", 1, 1);
return result.Value;
#endregion
private static Couchbase.Views.IViewQuery UnionQuery(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false, int? groupLevel = null)
var view = tradeBucket.CreateQuery(designDoc, viewName);
view = view.Reduce(reduceFlag);
if (sortType == SortType.desc)
if (endKey != null)
view = view.StartKey(endKey);
if (startKey != null)
view = view.EndKey(startKey);
view.Desc();
else
if (startKey != null)
view = view.StartKey(startKey);
if (endKey != null)
view = view.EndKey(endKey);
if (forceUpdate)
view = view.Stale(Couchbase.Views.StaleState.False);
if (pageLimit > 0)
view = view.Skip((page - 1) * pageLimit).Limit(pageLimit);
if (reduceFlag && groupLevel!=null)
view = view.GroupLevel(groupLevel.Value);
return view;
/// <summary>
/// 异步查询View
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="designDoc"></param>
/// <param name="viewname"></param>
/// <param name="StartKey"></param>
/// <param name="EndKey"></param>
/// <param name="page"></param>
/// <param name="pageLimit"></param>
/// <param name="forceUpdate">是否强制刷新</param>
/// <returns></returns>
public static async Task<List<T>> QueryAsync<T>(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false) where T : new()
var view = UnionQuery(designDoc, viewName, startKey, endKey, page, pageLimit, sortType, forceUpdate, reduceFlag);
var query = await tradeBucket.QueryAsync<object>(view);
List<T> resultList = new List<T>();
if (query.Success && query.Values != null)
foreach (var row in query.Rows)
var result = await tradeBucket.GetAsync<T>(row.Id);
if (result.Success)
resultList.Add(result.Value);
return resultList;
public static List<T> Query<T>(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false) where T : new()
var view = UnionQuery(designDoc, viewName, startKey, endKey, page, pageLimit, sortType, forceUpdate, reduceFlag);
var query = tradeBucket.Query<object>(view);
List<T> resultList = new List<T>();
if (query.Success && query.Values != null)
foreach (var row in query.Rows)
var result = tradeBucket.Get<T>(row.Id);
if (result.Success)
resultList.Add(result.Value);
return resultList;
public static async Task<ConcurrentBag<T>> QueryParallelAsync<T>(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false) where T : new()
var view = UnionQuery(designDoc, viewName, startKey, endKey, page, pageLimit, sortType, forceUpdate, reduceFlag);
var query = await tradeBucket.QueryAsync<object>(view);
ConcurrentBag<T> resultList = new ConcurrentBag<T>();
if (query.Success && query.Values != null)
ParallelOptions options = new ParallelOptions();
options.MaxDegreeOfParallelism = 4;
Parallel.ForEach(query.Rows, options, (item) =>
var result = tradeBucket.Get<T>(item.Id);
if (result.Success)
resultList.Add(result.Value);
);
return resultList;
/// <summary>
/// 异步查询返回数量
/// </summary>
/// <param name="designDoc"></param>
/// <param name="viewName"></param>
/// <param name="startKey"></param>
/// <param name="endKey"></param>
/// <param name="forceUpdate"></param>
/// <param name="reduceFlag"></param>
/// <returns></returns>
public static async Task<int> QueryCountAsync(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = false)
var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag);
var query = await tradeBucket.QueryAsync<object>(view);
return query.Rows.Count();
public static int QueryCount(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = false)
var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag);
var query = tradeBucket.Query<object>(view);
return query.Rows.Count();
/// <summary>
/// 异步统计记录数count (View返回的是count值)
/// </summary>
/// <param name="designDoc"></param>
/// <param name="viewName"></param>
/// <param name="startKey"></param>
/// <param name="endKey"></param>
/// <param name="forceUpdate"></param>
/// <param name="reduceFlag"></param>
/// <param name="groupLevel"></param>
/// <returns></returns>
public static async Task<int> StatisticCountAsync(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = true, int? groupLevel = null)
var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag, groupLevel);
var query = await tradeBucket.QueryAsync<object>(view);
int count = 0;
if (query.Success && query.Values != null && query.Values.Count() > 0)
count = int.Parse(query.Values.FirstOrDefault().ToString());
return count;
/// <summary>
/// 异步查询统计的count (View返回的是JSON数组,包含count、sum、min、max、sumsqr)
/// </summary>
/// <param name="designDoc"></param>
/// <param name="viewName"></param>
/// <param name="startKey"></param>
/// <param name="endKey"></param>
/// <param name="forceUpdate"></param>
/// <param name="reduceFlag"></param>
/// <param name="groupLevel"></param>
/// <returns></returns>
public static async Task<int> QueryStatsAsync(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = true, int? groupLevel = null)
var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag, groupLevel);
var query = await tradeBucket.QueryAsync<JObject>(view);
JToken t = null;
int count = 0;
if (query.Success && query.Values != null && query.Values.Count() > 0)
query.Values.FirstOrDefault().TryGetValue("count", out t);
count = int.Parse(t.ToString());
return count;
/// <summary>
/// 异步查询统计的sum (View返回的是JSON数组,包含count、sum、min、max、sumsqr)
/// </summary>
/// <param name="designDoc"></param>
/// <param name="viewName"></param>
/// <param name="startKey"></param>
/// <param name="endKey"></param>
/// <param name="forceUpdate"></param>
/// <param name="reduceFlag"></param>
/// <param name="groupLevel"></param>
/// <returns></returns>
public static async Task<decimal> QueryStatsSumAsync(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = true, int? groupLevel = null)
var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag, groupLevel);
var query = await tradeBucket.QueryAsync<JObject>(view);
JToken t = null;
decimal sum = 0;
if (query.Success && query.Values != null && query.Values.Count() > 0)
query.Values.FirstOrDefault().TryGetValue("sum", out t);
sum = decimal.Parse(t.ToString());
return sum;
/// <summary>
/// 异步查询返回ViewRow
/// </summary>
/// <param name="designDoc"></param>
/// <param name="viewName"></param>
/// <param name="startKey"></param>
/// <param name="endKey"></param>
/// <param name="page"></param>
/// <param name="pageLimit"></param>
/// <param name="sortType"></param>
/// <param name="forceUpdate"></param>
/// <param name="reduceFlag"></param>
/// <returns></returns>
public static async Task<IEnumerable<Couchbase.Views.ViewRow<object>>> QueryRowsAsync(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false)
var view = UnionQuery(designDoc, viewName, startKey, endKey, page, pageLimit, sortType, forceUpdate, reduceFlag);
var query = await tradeBucket.QueryAsync<object>(view);
if (query.Success && query.Values != null)
return query.Rows;
return null;
/// <summary>
/// 根据ID在CouchBase中查询对象,如果查询成功且结果不为空则返回结果,否则返回null
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="id"></param>
/// <returns></returns>
public static async Task<T> GetAsync<T>(string id) where T : class
return await BaseGetAsync<T>(tradeBucket, id);
public static T Get<T>(string id) where T : class
return BaseGet<T>(tradeBucket, id);
public static async Task<GetCasModel<T>> GetCasAsync<T>(string id) where T : class
var queryResult = await tradeBucket.GetAsync<T>(id);
if (queryResult != null && queryResult.Success && queryResult.Value != null)
return new GetCasModel<T> Value = queryResult.Value, Cas = queryResult.Cas ;
return new GetCasModel<T>();
public static async Task<bool> UpsertCasAsync<T>(string key, T value, ulong cas, TimeSpan? expires = null)
return await BaseUpsertAsync<T>(tradeBucket, key, value, cas, expires);
public static async Task<bool> UpsertAsync<T>(string key, T value, TimeSpan? expires = null)
return await BaseUpsertAsync<T>(tradeBucket, key, value, null, expires);
public static bool Upsert<T>(string key, T value, TimeSpan? expires = null)
return BaseUpsert<T>(tradeBucket, key, value, expires);
public static async Task<bool> InsertAsync<T>(string key, T value, TimeSpan? expires = null)
return await BaseInsertAsync<T>(tradeBucket, key, value, expires);
public static bool Delete(string key)
return BaseDelete(tradeBucket, key);
public static bool Touch(string id, TimeSpan expires)
return BaseTouch(tradeBucket, id, expires);
public static bool Exists(string id)
return tradeBucket.Exists(id);
public static async Task<bool> ExistsAsync(string id)
return await tradeBucket.ExistsAsync(id);
#endregion
#region sessionBucket增删
public static async Task<T> SessionGetAsync<T>(string id) where T : class
return await BaseGetAsync<T>(sessionBucket, id);
public static T SessionGet<T>(string id) where T : class
return BaseGet<T>(sessionBucket, id);
public static bool SessionUpsert<T>(string key, T value, TimeSpan expires)
return BaseUpsert<T>(sessionBucket, key, value, expires);
public async static Task<bool> SessionInsert<T>(string key, T value, TimeSpan expires)
return await BaseInsertAsync<T>(sessionBucket, key, value, expires);
public static bool SessionDelete(string key)
return BaseDelete(sessionBucket, key);
public static bool SessionTouch(string id, TimeSpan expires)
return BaseTouch(sessionBucket, id, expires);
public static bool SessionExists(string id)
return sessionBucket.Exists(id);
#endregion
初始化Couchbase:
CbManager.Init();
以上是关于连接Couchbase的主要内容,如果未能解决你的问题,请参考以下文章