连接Couchbase

Posted cathyxiao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了连接Couchbase相关的知识,希望对你有一定的参考价值。

web.config中增加

<configSections>
<sectionGroup name="couchbaseClients">
<section name="couchbase" type="Couchbase.Configuration.Client.Providers.CouchbaseClientSection, Couchbase.NetClient" />
</sectionGroup>
</configSections>
<couchbaseClients>
<couchbase useSsl="false">
<servers>
<add uri="http://192.168.10.80:8091/pools" />
<add uri="http://192.168.10.81:8091/pools" />
</servers>
<buckets>
<add name="GuotaiTrading" useSsl="false" password="">
<connectionPool name="custom" maxSize="20" minSize="1">
</connectionPool>
</add>
</buckets>
</couchbase>
</couchbaseClients>

 

增加CbManager类:

public class CbManager

private static ICluster cluster get; set;
private static IBucket tradeBucket get; set;
private static IBucket sessionBucket get; set;

/// <summary>
/// 初始化Couchbase;
/// Web.Config配置AppSettings["CouchBaseSection"],如果没有配置,默认"couchbaseClients/couchbase";
/// tradeBucketName默认为"GuotaiTrading";
/// sessionBucketName默认为"gtSessions";
/// </summary>
public static void Init()

string strSection = ConfigurationManager.AppSettings["CouchBaseSection"];
strSection = strSection == null ? "couchbaseClients/couchbase" : strSection;
string tradeBucketName = "GuotaiTrading";
string sessionBucketName = "gtSessions";
if (!string.IsNullOrWhiteSpace(strSection))

cluster = new Cluster(strSection);
if (cluster.Configuration.BucketConfigs.ContainsKey(tradeBucketName))

var strPassword = cluster.Configuration.BucketConfigs[tradeBucketName].Password;
if (!string.IsNullOrWhiteSpace(strPassword))

tradeBucket = cluster.OpenBucket(tradeBucketName, strPassword);

else

tradeBucket = cluster.OpenBucket(tradeBucketName);


if (cluster.Configuration.BucketConfigs.ContainsKey(sessionBucketName))

var strPassword = cluster.Configuration.BucketConfigs[sessionBucketName].Password;
if (!string.IsNullOrWhiteSpace(strPassword))

sessionBucket = cluster.OpenBucket(sessionBucketName, strPassword);

else

sessionBucket = cluster.OpenBucket(sessionBucketName);



public static void ShutDown()

if (tradeBucket != null)

cluster.CloseBucket(tradeBucket);

if (sessionBucket != null)

cluster.CloseBucket(sessionBucket);


#region 类内调用的私有方法
private static async Task<T> BaseGetAsync<T>(IBucket bucket, string id) where T : class

var queryResult = await bucket.GetAsync<T>(id);
if (queryResult != null && queryResult.Success)

return queryResult.Value;

return null;

private static T BaseGet<T>(IBucket bucket, string id) where T : class

var queryResult = bucket.Get<T>(id);
if (queryResult != null && queryResult.Success)

return queryResult.Value;

return null;

private static async Task<bool> BaseUpsertAsync<T>(IBucket bucket, string key, T value, ulong? cas = null, TimeSpan? expires = null)

try

IOperationResult<T> result;
if (cas != null && expires == null)

result = await bucket.UpsertAsync(key, value, cas.Value);

else if (expires != null && cas == null)

result = await bucket.UpsertAsync(key, value, expires.Value);

else if (expires != null && cas != null)

result = await bucket.UpsertAsync(key, value, cas.Value, expires.Value);

else

result = await bucket.UpsertAsync(key, value);

return result.Success;

catch (Exception ex)

Logger.LogError("UpsertAsync Failure ,ex:" + ex.ToString());
return false;

private static bool BaseUpsert<T>(IBucket bucket, string key, T value, TimeSpan? expires = null)

try

var result = expires == null ? bucket.Upsert(key, value) : bucket.Upsert(key, value, expires.Value);
return result.Success;

catch (Exception ex)

Logger.LogError("Upsert Failure ,ex:" + ex.ToString());
return false;

private static async Task<bool> BaseInsertAsync<T>(IBucket bucket, string key, T value, TimeSpan? expires = null)

try

var result = expires == null ? await bucket.InsertAsync(key, value) : await bucket.InsertAsync(key, value, expires.Value);
return result.Success;

catch (Exception ex)

Logger.LogError("InsertAsync Failure ,ex:" + ex.ToString());
return false;


private static bool BaseDelete(IBucket bucket, string key)

try

var result = bucket.Remove(key);
return result.Success;

catch (Exception ex)

Logger.LogError("Delete Failure ,ex:" + ex.ToString());
return false;

private static bool BaseTouch(IBucket bucket, string id, TimeSpan expires)

try

var result = bucket.Touch(id, expires);
return result.Success;

catch (Exception ex)

Logger.LogError("BaseTouch Failure ,ex:" + ex.ToString());
return false;

#endregion

#region tradeBucket增删改查

#region 生成TxId
/// <summary>
/// 获取唯一交易序号
/// </summary>
/// <returns></returns>
public async static Task<ulong> GetTxId()

var result = await tradeBucket.IncrementAsync("MT4_TransactionCounter", 1, 1);
return result.Value;

#endregion

private static Couchbase.Views.IViewQuery UnionQuery(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false, int? groupLevel = null)

var view = tradeBucket.CreateQuery(designDoc, viewName);
view = view.Reduce(reduceFlag);
if (sortType == SortType.desc)

if (endKey != null)
view = view.StartKey(endKey);
if (startKey != null)
view = view.EndKey(startKey);
view.Desc();

else

if (startKey != null)
view = view.StartKey(startKey);
if (endKey != null)
view = view.EndKey(endKey);

if (forceUpdate)
view = view.Stale(Couchbase.Views.StaleState.False);

if (pageLimit > 0)

view = view.Skip((page - 1) * pageLimit).Limit(pageLimit);

if (reduceFlag && groupLevel!=null)
view = view.GroupLevel(groupLevel.Value);
return view;

/// <summary>
/// 异步查询View
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="designDoc"></param>
/// <param name="viewname"></param>
/// <param name="StartKey"></param>
/// <param name="EndKey"></param>
/// <param name="page"></param>
/// <param name="pageLimit"></param>
/// <param name="forceUpdate">是否强制刷新</param>
/// <returns></returns>
public static async Task<List<T>> QueryAsync<T>(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false) where T : new()

var view = UnionQuery(designDoc, viewName, startKey, endKey, page, pageLimit, sortType, forceUpdate, reduceFlag);
var query = await tradeBucket.QueryAsync<object>(view);
List<T> resultList = new List<T>();
if (query.Success && query.Values != null)

foreach (var row in query.Rows)

var result = await tradeBucket.GetAsync<T>(row.Id);
if (result.Success)
resultList.Add(result.Value);


return resultList;

public static List<T> Query<T>(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false) where T : new()

var view = UnionQuery(designDoc, viewName, startKey, endKey, page, pageLimit, sortType, forceUpdate, reduceFlag);
var query = tradeBucket.Query<object>(view);
List<T> resultList = new List<T>();
if (query.Success && query.Values != null)

foreach (var row in query.Rows)

var result = tradeBucket.Get<T>(row.Id);
if (result.Success)
resultList.Add(result.Value);


return resultList;

public static async Task<ConcurrentBag<T>> QueryParallelAsync<T>(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false) where T : new()

var view = UnionQuery(designDoc, viewName, startKey, endKey, page, pageLimit, sortType, forceUpdate, reduceFlag);
var query = await tradeBucket.QueryAsync<object>(view);
ConcurrentBag<T> resultList = new ConcurrentBag<T>();
if (query.Success && query.Values != null)

ParallelOptions options = new ParallelOptions();
options.MaxDegreeOfParallelism = 4;
Parallel.ForEach(query.Rows, options, (item) =>

var result = tradeBucket.Get<T>(item.Id);
if (result.Success)
resultList.Add(result.Value);
);

return resultList;


/// <summary>
/// 异步查询返回数量
/// </summary>
/// <param name="designDoc"></param>
/// <param name="viewName"></param>
/// <param name="startKey"></param>
/// <param name="endKey"></param>
/// <param name="forceUpdate"></param>
/// <param name="reduceFlag"></param>
/// <returns></returns>
public static async Task<int> QueryCountAsync(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = false)

var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag);
var query = await tradeBucket.QueryAsync<object>(view);
return query.Rows.Count();

public static int QueryCount(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = false)

var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag);
var query = tradeBucket.Query<object>(view);
return query.Rows.Count();

/// <summary>
/// 异步统计记录数count (View返回的是count值)
/// </summary>
/// <param name="designDoc"></param>
/// <param name="viewName"></param>
/// <param name="startKey"></param>
/// <param name="endKey"></param>
/// <param name="forceUpdate"></param>
/// <param name="reduceFlag"></param>
/// <param name="groupLevel"></param>
/// <returns></returns>
public static async Task<int> StatisticCountAsync(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = true, int? groupLevel = null)

var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag, groupLevel);
var query = await tradeBucket.QueryAsync<object>(view);
int count = 0;
if (query.Success && query.Values != null && query.Values.Count() > 0)

count = int.Parse(query.Values.FirstOrDefault().ToString());

return count;

/// <summary>
/// 异步查询统计的count (View返回的是JSON数组,包含count、sum、min、max、sumsqr)
/// </summary>
/// <param name="designDoc"></param>
/// <param name="viewName"></param>
/// <param name="startKey"></param>
/// <param name="endKey"></param>
/// <param name="forceUpdate"></param>
/// <param name="reduceFlag"></param>
/// <param name="groupLevel"></param>
/// <returns></returns>
public static async Task<int> QueryStatsAsync(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = true, int? groupLevel = null)

var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag, groupLevel);
var query = await tradeBucket.QueryAsync<JObject>(view);
JToken t = null;
int count = 0;
if (query.Success && query.Values != null && query.Values.Count() > 0)

query.Values.FirstOrDefault().TryGetValue("count", out t);
count = int.Parse(t.ToString());

return count;

/// <summary>
/// 异步查询统计的sum (View返回的是JSON数组,包含count、sum、min、max、sumsqr)
/// </summary>
/// <param name="designDoc"></param>
/// <param name="viewName"></param>
/// <param name="startKey"></param>
/// <param name="endKey"></param>
/// <param name="forceUpdate"></param>
/// <param name="reduceFlag"></param>
/// <param name="groupLevel"></param>
/// <returns></returns>
public static async Task<decimal> QueryStatsSumAsync(string designDoc, string viewName, object startKey = null, object endKey = null, bool forceUpdate = false, bool reduceFlag = true, int? groupLevel = null)

var view = UnionQuery(designDoc, viewName, startKey, endKey, 1, 0, SortType.asc, forceUpdate, reduceFlag, groupLevel);
var query = await tradeBucket.QueryAsync<JObject>(view);
JToken t = null;
decimal sum = 0;
if (query.Success && query.Values != null && query.Values.Count() > 0)

query.Values.FirstOrDefault().TryGetValue("sum", out t);
sum = decimal.Parse(t.ToString());

return sum;

/// <summary>
/// 异步查询返回ViewRow
/// </summary>
/// <param name="designDoc"></param>
/// <param name="viewName"></param>
/// <param name="startKey"></param>
/// <param name="endKey"></param>
/// <param name="page"></param>
/// <param name="pageLimit"></param>
/// <param name="sortType"></param>
/// <param name="forceUpdate"></param>
/// <param name="reduceFlag"></param>
/// <returns></returns>
public static async Task<IEnumerable<Couchbase.Views.ViewRow<object>>> QueryRowsAsync(string designDoc, string viewName, object startKey = null, object endKey = null, int page = 1, int pageLimit = 50, SortType sortType = SortType.desc, bool forceUpdate = false, bool reduceFlag = false)

var view = UnionQuery(designDoc, viewName, startKey, endKey, page, pageLimit, sortType, forceUpdate, reduceFlag);
var query = await tradeBucket.QueryAsync<object>(view);
if (query.Success && query.Values != null)

return query.Rows;

return null;

/// <summary>
/// 根据ID在CouchBase中查询对象,如果查询成功且结果不为空则返回结果,否则返回null
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="id"></param>
/// <returns></returns>
public static async Task<T> GetAsync<T>(string id) where T : class

return await BaseGetAsync<T>(tradeBucket, id);

public static T Get<T>(string id) where T : class

return BaseGet<T>(tradeBucket, id);

public static async Task<GetCasModel<T>> GetCasAsync<T>(string id) where T : class

var queryResult = await tradeBucket.GetAsync<T>(id);
if (queryResult != null && queryResult.Success && queryResult.Value != null)

return new GetCasModel<T> Value = queryResult.Value, Cas = queryResult.Cas ;

return new GetCasModel<T>();

public static async Task<bool> UpsertCasAsync<T>(string key, T value, ulong cas, TimeSpan? expires = null)

return await BaseUpsertAsync<T>(tradeBucket, key, value, cas, expires);

public static async Task<bool> UpsertAsync<T>(string key, T value, TimeSpan? expires = null)

return await BaseUpsertAsync<T>(tradeBucket, key, value, null, expires);

public static bool Upsert<T>(string key, T value, TimeSpan? expires = null)

return BaseUpsert<T>(tradeBucket, key, value, expires);

public static async Task<bool> InsertAsync<T>(string key, T value, TimeSpan? expires = null)

return await BaseInsertAsync<T>(tradeBucket, key, value, expires);

public static bool Delete(string key)

return BaseDelete(tradeBucket, key);

public static bool Touch(string id, TimeSpan expires)

return BaseTouch(tradeBucket, id, expires);

public static bool Exists(string id)

return tradeBucket.Exists(id);

public static async Task<bool> ExistsAsync(string id)

return await tradeBucket.ExistsAsync(id);

#endregion

#region sessionBucket增删
public static async Task<T> SessionGetAsync<T>(string id) where T : class

return await BaseGetAsync<T>(sessionBucket, id);

public static T SessionGet<T>(string id) where T : class

return BaseGet<T>(sessionBucket, id);

public static bool SessionUpsert<T>(string key, T value, TimeSpan expires)

return BaseUpsert<T>(sessionBucket, key, value, expires);

public async static Task<bool> SessionInsert<T>(string key, T value, TimeSpan expires)

return await BaseInsertAsync<T>(sessionBucket, key, value, expires);

public static bool SessionDelete(string key)

return BaseDelete(sessionBucket, key);

public static bool SessionTouch(string id, TimeSpan expires)

return BaseTouch(sessionBucket, id, expires);

public static bool SessionExists(string id)

return sessionBucket.Exists(id);

#endregion

 

初始化Couchbase:

CbManager.Init();

以上是关于连接Couchbase的主要内容,如果未能解决你的问题,请参考以下文章

MySQL 的内连接、左连接、右连接有啥区别?

SQL中的左连接与右连接,内连接有啥区别

sql左连接 右连接 内连接 外连接都是啥

oracle连接总结(内连接外连接自然连接,交叉连接,自连接)

SQL内连接与外连接的区别

SQL的连接(外连接内连接交叉连接和自连接)