c#连接vertica数据库

Posted 翩翩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c#连接vertica数据库相关的知识,希望对你有一定的参考价值。

 

项目前期使用mysql数据库,大约每天200w数据量,十天1500w数据量之后,读取写入都会很慢,而且经常锁表,后来采用vertica数据库,下面分享vertica数据库使用方法,以及大批量数据快速写入数据库的方法:

1,数据库IDE的配置

     windows系统下需先安装jdk,然后才能使用IDE,可以使用dbvis_windows-x64_9_1_9,一个数据库统一客户端工具,网上搜索下载。

     可能会出现需要加载jar包才能使用的情况,选择软件目录\\jdbc\\vertica.jar包即可。

2,c#里如何使用

    下载类库:https://my.vertica.com/download/vertica/client-drivers/

   

   项目中引用:

   

代码:

 1                VerticaConnection conn = new VerticaConnection(ConfigurationManager.AppSettings["VerticalTest"]);
 2                 {
 3                     conn.Open();
 4                     VerticaTransaction txn = conn.BeginTransaction();
 5                     VerticaCommand cmd = new VerticaCommand();
 6                     cmd.Connection = conn;
 7                     cmd.Transaction = txn;
 8                     string deleteSourceData = @"Delete from " + strTableName + " where sampledate = " + time.ToString("yyyyMMdd") + " and city=\'" + city + "\'";
 9                     cmd.CommandText = deleteSourceData;
10                     cmd.CommandTimeout = 90;
11                     cmd.ExecuteNonQuery();
12                 }

3,大量数据快速批量插入vertica数据库方法

    sqlserver中常见插入方法:

    insert into table1(a,b) valuse (\'Cust1\', \'Smith Company\'),(\'Cust2\', \'Perform Company\')

    vertica里不支持这种写法,但是支持如下:

    insert into table1(a,b)  select a,b,c from table2 union select  \'Cust1\', \'Smith Company\'

   这种写法速度并不快,更快速的方法是使用copy,copy可以从txt文件,也可以是内存流:    

   1)从txt: copy 表名 from \'/indexdata/tmp/conv_ebc_esf_housedimension.txt\' DELIMITER E\'\\t\' ESCAPE AS \'\\\' ENCLOSED BY \'"\' DIRECT EXCEPTIONS \'/indexdata/tmp/housedimension.log\'; 
  
    第一个txt,是数据来源,可以把数据写入txt,列之间用tab分隔,行之间是回车,此种写法未经过代码验证。
 
   2) 从内存流 :
       思路就是:取出大量数据,写入MemoryStream内存流,加载进, copy语句使用语法:
 
       copy {0}{1} from stdin record terminator E\'{2}\' delimiter E\'{3}\' enforcelength no commit 
 
       0:表名 ,1:列名,2:列之间分隔符\'\\n\'    ,3:行之间分隔符\'\\t\'
 
       stdin 就是指输入缓冲区
 
具体代码:
 1         /// <summary>
 2         /// 批量插入数据到vertica数据库
 3         /// </summary>
 4         /// <param name="dt">数据源</param>
 5         /// <param name="strTableName">插入的目标表名</param>
 6         /// <param name="time">日期(删除数据使用)</param>
 7         /// <param name="city">城市(删除数据使用)</param>
 8         public static void BulkCopy(DataTable dt, string strTableName, DateTime time,string city)
 9         {
10             if (dt == null || dt.Rows.Count == 0)
11             {
12                 return;
13             }
14             if (dt.Columns.Count == 0)
15             {
16                 throw new Exception("The length of column cannot be zero.");
17             }
18             //从datatable中获取列名
19             List<string> lstField = new List<string>();
20             for (int colIndex = 0; colIndex < dt.Columns.Count; colIndex++)
21             {
22                 lstField.Add(dt.Columns[colIndex].ColumnName);
23             }
24             string strFiledList = string.Format("({0})", string.Join(",", lstField.ToArray()));
25             //拼写copy语句
26             const char RowSplit = \'\\n\';
27             const char ColSplit = \'\\t\';
28 
29             string strCopyStatement = string.Format("copy {0}{1} from stdin record terminator E\'{2}\' delimiter E\'{3}\' enforcelength no commit",
30                 strTableName, strFiledList, RowSplit, ColSplit);
31             //按照copy语句中的分隔符,分隔数据源
32             StringBuilder sbText = new StringBuilder();
33             foreach (DataRow dr in dt.Rows)
34             {
35                 bool bFirstField = true;
36                 for (int colIndex = 0; colIndex < dt.Columns.Count; colIndex++)
37                 {
38                     string strVal = GetDataString(dr, colIndex);
39                     if (bFirstField)
40                     {
41                         sbText.Append(strVal);
42                         bFirstField = false;
43                     }
44                     else
45                     {
46                         sbText.AppendFormat("{0}{1}", ColSplit, strVal);
47                     }
48                 }
49                 sbText.Append(RowSplit);
50             }
51             //数据源写入内存流
52             string strTemp = sbText.ToString();
53             byte[] buff = Encoding.UTF8.GetBytes(strTemp);
54             using (MemoryStream ms = new MemoryStream())
55             {
56                 ms.Write(buff, 0, buff.Length);
57                 ms.Flush();
58                 ms.Position = 0;
59                 //建立vertica数据库连接
60                 VerticaConnection conn = new VerticaConnection(ConfigurationManager.AppSettings["VerticalTest"]);
61                 {
62                     conn.Open();
63                     VerticaTransaction txn = conn.BeginTransaction();
64                     Vertica.Data.VerticaClient.VerticaCopyStream vcs = new VerticaCopyStream(conn, strCopyStatement);
65                     //插入数据前,先删除重复数据
66                     VerticaCommand cmd = new VerticaCommand();
67                     cmd.Connection = conn;
68                     cmd.Transaction = txn;
69                     string deleteSourceData = @"Delete from " + strTableName + " where sampledate = " + time.ToString("yyyyMMdd") + " and city=\'" + city + "\'";
70                     cmd.CommandText = deleteSourceData;
71                     cmd.CommandTimeout = 90;
72                     cmd.ExecuteNonQuery();
73                     //批量插入数据
74                     vcs.Start();
75                     vcs.AddStream(ms, false);
76                     vcs.Execute();
77 
78                     long insertedCount = vcs.Finish();
79 
80                     IList<long> lstRejected = vcs.Rejects;
81                     if (lstRejected.Count > 0)
82                     {
83                         txn.Rollback();
84                         conn.Close();
85 
86                         // Maybe need more detail info to show
87                         throw new Exception("Bulk copy failure.");
88                     }
89                     else
90                     {
91                         txn.Commit();
92                         conn.Close();
93                     }
94                 }
95 
96                 ms.Close();
97             }
98         }

 

 

以上是关于c#连接vertica数据库的主要内容,如果未能解决你的问题,请参考以下文章

用于vertica sql的pyspark逻辑连接

使用 Hue sqoop 2 从 vertica 获取数据

Vertica7 Native Connection Load Balance

无法从 Spark 显示 Vertica 表

从 Datagrip 连接到 Vertica

kattle的连接SQLserver与vertica的一些方法