using Dapper; using MySqlConnector; using System; using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using System.Linq; using System.Linq.Expressions; using System.Text; using WIDESEA_Core.Const; using WIDESEA_Core.DBManager; using WIDESEA_Core.Enums; using WIDESEA_Core.Extensions; using WIDESEA_Core.Utilities; namespace WIDESEA_Core.Dapper { public class SqlDapper : ISqlDapper { private string _connectionString; private IDbConnection _connection { get; set; } public IDbConnection Connection { get { if (_connection == null || _connection.State == ConnectionState.Closed) { _connection = DBServerProvider.GetDbConnection(_connectionString); } return _connection; } } public SqlDapper() { _connectionString = DBServerProvider.GetConnectionString(); } /// /// string mySql = "Data Source=132.232.2.109;Database=mysql;User /// ID=root;Password=mysql;pooling=true;CharSet=utf8;port=3306;sslmode=none"; /// this.conn = new MySql.Data.MySqlClient.MySqlConnection(mySql); /// /// public SqlDapper(string connKeyName) { _connectionString = DBServerProvider.GetConnectionString(connKeyName); } private bool _transaction { get; set; } /// /// 2020.06.15增加Dapper事务处理 /// /// public void BeginTransaction(Func action, Action error) { _transaction = true; try { Connection.Open(); dbTransaction = Connection.BeginTransaction(); bool result = action(this); if (result) { dbTransaction?.Commit(); } else { dbTransaction?.Rollback(); } } catch (Exception ex) { dbTransaction?.Rollback(); error(ex); } finally { Connection?.Dispose(); dbTransaction?.Dispose(); _transaction = false; } } /// /// var p = new object(); // p.Add("@a", 11); //p.Add("@b", dbType: DbType.Int32, direction: ParameterDirection.Output); //p.Add("@c", dbType: DbType.Int32, direction: ParameterDirection.ReturnValue); // /// /// /// /// /// /// public List QueryList(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) where T : class { return Execute((conn, dbTransaction) => { return conn.Query(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text).ToList(); }, beginTransaction); } public T QueryFirst(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) where T : class { List list = QueryList(cmd, param, commandType: commandType ?? CommandType.Text, beginTransaction: beginTransaction).ToList(); return list.Count == 0 ? null : list[0]; } public object ExecuteScalar(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { return conn.ExecuteScalar(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text); }, beginTransaction); } public int ExcuteNonQuery(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { return conn.Execute(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text); }, beginTransaction); } public IDataReader ExecuteReader(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { return conn.ExecuteReader(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text); }, beginTransaction, false); } public SqlMapper.GridReader QueryMultiple(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { return conn.QueryMultiple(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text); }, beginTransaction, false); } /// /// 获取output值 param.Get("@b"); /// /// /// /// /// /// /// public (List, List) QueryMultiple(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { using (SqlMapper.GridReader reader = QueryMultiple(cmd, param, commandType, beginTransaction)) { return (reader.Read().ToList(), reader.Read().ToList()); } } public (List, List, List) QueryMultiple(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { using (SqlMapper.GridReader reader = QueryMultiple(cmd, param, commandType, beginTransaction)) { return (reader.Read().ToList(), reader.Read().ToList(), reader.Read().ToList()); } } IDbTransaction dbTransaction = null; private T Execute(Func func, bool beginTransaction = false, bool disposeConn = true) { if (beginTransaction) { Connection.Open(); dbTransaction = Connection.BeginTransaction(); } try { T reslutT = func(Connection, dbTransaction); if (!_transaction && dbTransaction != null) { dbTransaction.Commit(); } return reslutT; } catch (Exception ex) { if (!_transaction && dbTransaction != null) { dbTransaction.Rollback(); } throw ex; } finally { if (!_transaction) { if (disposeConn) { Connection.Dispose(); } dbTransaction?.Dispose(); } } } /// /// /// /// /// /// 指定插入的字段 /// 是否开启事务 /// public int Add(T entity, Expression> updateFileds = null, bool beginTransaction = false) { return AddRange(new T[] { entity }); } /// /// /// /// /// /// 指定插入的字段 /// 是否开启事务 /// public int AddRange(IEnumerable entities, Expression> addFileds = null, bool beginTransaction = true) { Type entityType = typeof(T); var key = entityType.GetKeyProperty(); if (key == null) { throw new Exception("实体必须包括主键才能批量更新"); } string[] columns; //指定插入的字段 if (addFileds != null) { columns = addFileds.GetExpressionToArray(); } else { var properties = entityType.GetGenericProperties(); if (key.PropertyType != typeof(Guid)) { properties = properties.Where(x => x.Name != key.Name).ToArray(); } columns = properties.Select(x => x.Name).ToArray(); } string sql = null; if (DBType.Name == DbCurrentType.MySql.ToString()) { //mysql批量写入待优化 sql = $"insert into {entityType.GetEntityTableName()}({string.Join(",", columns)})" + $"values(@{string.Join(",@", columns)});"; } else if (DBType.Name == DbCurrentType.PgSql.ToString()) { sql = $"insert into {entityType.GetEntityTableName()}({"\"" + string.Join("\",\"", columns) + "\""})" + $"values(@{string.Join(",@", columns)});"; } else { //sqlserver通过临时表批量写入 sql = $"insert into {entityType.GetEntityTableName()}({string.Join(",", columns)})" + $"select * from {EntityToSqlTempName.TempInsert};"; sql = entities.GetEntitySql(entityType == typeof(Guid), sql, null, addFileds, null); } return Execute((conn, dbTransaction) => { return conn.Execute(sql, (DBType.Name == DbCurrentType.MySql.ToString() || DBType.Name == DbCurrentType.PgSql.ToString()) ? entities.ToList() : null); }, beginTransaction); } /// /// sqlserver使用的临时表参数化批量更新,mysql批量更新待发开 /// /// /// 实体必须带主键 /// 指定更新的字段x=new {x.a,x.b} /// 是否开启事务 /// public int Update(T entity, Expression> updateFileds = null, bool beginTransaction = true) { return UpdateRange(new T[] { entity }, updateFileds, beginTransaction); } /// ///(根据主键批量更新实体) sqlserver使用的临时表参数化批量更新,mysql待优化 /// /// /// 实体必须带主键 /// 批定更新字段 /// /// public int UpdateRange(IEnumerable entities, Expression> updateFileds = null, bool beginTransaction = true) { Type entityType = typeof(T); var key = entityType.GetKeyProperty(); if (key == null) { throw new Exception("实体必须包括主键才能批量更新"); } var properties = entityType.GetGenericProperties() .Where(x => x.Name != key.Name); if (updateFileds != null) { properties = properties.Where(x => updateFileds.GetExpressionToArray().Contains(x.Name)); } if (DBType.Name == DbCurrentType.MySql.ToString()) { List paramsList = new List(); foreach (var item in properties) { paramsList.Add(item.Name + "=@" + item.Name); } string sqltext = $@"UPDATE { entityType.GetEntityTableName()} SET {string.Join(",", paramsList)} WHERE {entityType.GetKeyName()} = @{entityType.GetKeyName()} ;"; return ExcuteNonQuery(sqltext, entities, CommandType.Text, true); // throw new Exception("mysql批量更新未实现"); } string fileds = string.Join(",", properties.Select(x => $" a.{x.Name}=b.{x.Name}").ToArray()); string sql = $"update a set {fileds} from {entityType.GetEntityTableName()} as a inner join {EntityToSqlTempName.TempInsert.ToString()} as b on a.{key.Name}=b.{key.Name}"; sql = entities.ToList().GetEntitySql(true, sql, null, updateFileds, null); return ExcuteNonQuery(sql, null, CommandType.Text, true); } public int DelWithKey(bool beginTransaction = false, params object[] keys) { Type entityType = typeof(T); var keyProperty = entityType.GetKeyProperty(); if (keyProperty == null || keys == null || keys.Length == 0) return 0; IEnumerable<(bool, string, object)> validation = keyProperty.ValidationValueForDbType(keys); if (validation.Any(x => !x.Item1)) { throw new Exception($"主键类型【{validation.Where(x => !x.Item1).Select(s => s.Item3).FirstOrDefault()}】不正确"); } string tKey = entityType.GetKeyProperty().Name; FieldType fieldType = entityType.GetFieldType(); string joinKeys = (fieldType == FieldType.Int || fieldType == FieldType.BigInt) ? string.Join(",", keys) : $"'{string.Join("','", keys)}'"; string sql; // 2020.08.06增加pgsql删除功能 if (DBType.Name == DbCurrentType.PgSql.ToString()) { sql = $"DELETE FROM \"public\".\"{entityType.GetEntityTableName()}\" where \"{tKey}\" in ({joinKeys});"; } else { sql = $"DELETE FROM {entityType.GetEntityTableName() } where {tKey} in ({joinKeys});"; } return ExcuteNonQuery(sql, null); } /// /// 使用key批量删除 /// /// /// /// public int DelWithKey(params object[] keys) { return DelWithKey(false, keys); } /// /// 通过Bulk批量插入 /// /// /// /// /// /// private int MSSqlBulkInsert(DataTable table, string tableName, SqlBulkCopyOptions sqlBulkCopyOptions = SqlBulkCopyOptions.UseInternalTransaction, string dbKeyName = null) { if (!string.IsNullOrEmpty(dbKeyName)) { Connection.ConnectionString = DBServerProvider.GetConnectionString(dbKeyName); } using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(Connection.ConnectionString, sqlBulkCopyOptions)) { sqlBulkCopy.DestinationTableName = tableName; sqlBulkCopy.BatchSize = table.Rows.Count; for (int i = 0; i < table.Columns.Count; i++) { sqlBulkCopy.ColumnMappings.Add(table.Columns[i].ColumnName, table.Columns[i].ColumnName); } sqlBulkCopy.WriteToServer(table); return table.Rows.Count; } } public int BulkInsert(List entities, string tableName = null, Expression> columns = null, SqlBulkCopyOptions? sqlBulkCopyOptions = null) { DataTable table = entities.ToDataTable(columns, false); return BulkInsert(table, tableName ?? typeof(T).GetEntityTableName(), sqlBulkCopyOptions); } public int BulkInsert(DataTable table, string tableName, SqlBulkCopyOptions? sqlBulkCopyOptions = null, string fileName = null, string tmpPath = null) { if (!string.IsNullOrEmpty(tmpPath)) { tmpPath = tmpPath.ReplacePath(); } if (Connection.GetType().Name == "MySqlConnection") return MySqlBulkInsert(table, tableName, fileName, tmpPath); else if (Connection.GetType().Name == "NpgsqlConnection") { // 2020.08.07增加PGSQL批量写入 PGSqlBulkInsert(table, tableName); return 0; } return MSSqlBulkInsert(table, tableName, sqlBulkCopyOptions ?? SqlBulkCopyOptions.KeepIdentity); } /// ///大批量数据插入,返回成功插入行数 /// /// 数据库连接字符串 /// 数据表 /// 返回成功插入行数 private int MySqlBulkInsert(DataTable table, string tableName, string fileName = null, string tmpPath = null) { if (table.Rows.Count == 0) return 0; tmpPath = tmpPath ?? FileHelper.GetCurrentDownLoadPath(); fileName = fileName ?? $"{DateTime.Now.ToString("yyyyMMddHHmmss")}.csv"; int insertCount = 0; string csv = DataTableToCsv(table); FileHelper.WriteFile(tmpPath, fileName, csv); string path = tmpPath + fileName; try { if (Connection.State == ConnectionState.Closed) Connection.Open(); using (IDbTransaction tran = Connection.BeginTransaction()) { MySqlBulkLoader bulk = new MySqlBulkLoader(Connection as MySqlConnection) { FieldTerminator = ",", FieldQuotationCharacter = '"', EscapeCharacter = '"', LineTerminator = "\r\n", FileName = path.ReplacePath(), NumberOfLinesToSkip = 0, TableName = tableName, }; bulk.Columns.AddRange(table.Columns.Cast().Select(colum => colum.ColumnName).ToList()); insertCount = bulk.Load(); tran.Commit(); } } catch (Exception ex) { throw ex; } finally { Connection?.Dispose(); Connection?.Close(); } return insertCount; // File.Delete(path); } /// ///将DataTable转换为标准的CSV /// /// 数据表 /// 返回标准的CSV private string DataTableToCsv(DataTable table) { //以半角逗号(即,)作分隔符,列为空也要表达其存在。 //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。 //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。 StringBuilder sb = new StringBuilder(); DataColumn colum; Type typeString = typeof(string); Type typeDate = typeof(DateTime); foreach (DataRow row in table.Rows) { for (int i = 0; i < table.Columns.Count; i++) { colum = table.Columns[i]; if (i != 0) sb.Append(","); if (colum.DataType == typeString && row[colum].ToString().Contains(",")) { sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\""); } else if (colum.DataType == typeDate) { //centos系统里把datatable里的日期转换成了10/18/18 3:26:15 PM格式 bool b = DateTime.TryParse(row[colum].ToString(), out DateTime dt); sb.Append(b ? dt.ToString("yyyy-MM-dd HH:mm:ss") : ""); } else sb.Append(row[colum].ToString()); } sb.AppendLine(); } return sb.ToString(); } /// /// 2020.08.07增加PGSQL批量写入 /// /// /// private void PGSqlBulkInsert(DataTable table, string tableName) { List columns = new List(); for (int i = 0; i < table.Columns.Count; i++) { columns.Add("\"" + table.Columns[i].ColumnName + "\""); } string copySql = $"copy \"public\".\"{tableName}\"({string.Join(',', columns)}) FROM STDIN (FORMAT BINARY)"; using (var conn = new Npgsql.NpgsqlConnection(_connectionString)) { conn.Open(); using (var writer = conn.BeginBinaryImport(copySql)) { foreach (DataRow row in table.Rows) { writer.StartRow(); for (int i = 0; i < table.Columns.Count; i++) { writer.Write(row[i]); } } writer.Complete(); } } } } }