tsunami
log in
email
password
links
newest items
tag list
syntax reference
tag:time
history
item name
tags
{{ using System; using System.Collections.Generic; using System.Linq; using System.Text; using Mono.GetOptions; using System.IO; using System.Threading; using System.Diagnostics; using System.Data.SqlClient; namespace ThreadedSqlExecutor { class Program { class Opts : Options { [Option("input file", 'f')] public string FileName { get; protected set; } [Option("max threads", 't')] public int MaxThreads { get; protected set; } [Option("minimum period in ms", 'p')] public int MinimumPeriod { get; protected set; } [Option("connection string", 'c')] public string ConnectionString { get; protected set; } [Option("staggered start", "staggered")] public bool StaggeredStart { get; protected set; } } static void Main(string[] args) { Opts opts = new Opts(); opts.ProcessArgs(args); Action<string> errorAndExit = msg => { Console.Error.WriteLine(msg); Environment.Exit(-1); }; if (!File.Exists(opts.FileName)) errorAndExit("valid file not specified"); const int FieldsPerLine = 3; const int SortFieldIndex = 0; const int ThreadIdIndex = 1; const int SqlTextIndex = 2; const char Delimiter = '|'; var data = from line in File.ReadAllLines(opts.FileName) let fields = line.Split(new[] { Delimiter }, FieldsPerLine) where fields.Length == FieldsPerLine && fields[0] != "sort" let sort = int.Parse(fields[SortFieldIndex]) orderby sort group new Tuple<int, string>(sort, fields[SqlTextIndex]) by int.Parse(fields[ThreadIdIndex]) into g orderby g.Key select g; int threadCount = Math.Min(opts.MaxThreads > 0 ? opts.MaxThreads : int.MaxValue, data.Count()); if (threadCount <= 0) errorAndExit("no threads specified"); object syncRoot = new object(); Action<IGrouping<int, Tuple<int, string>>, int> process = (g, idx) => { if (idx > 0) Thread.Sleep((int)((double)idx / threadCount * opts.MinimumPeriod)); Stopwatch sw = Stopwatch.StartNew(); int count = 0; SqlConnection cn = null; try { cn = new SqlConnection(opts.ConnectionString); cn.Open(); foreach (var tuple in g) { while (sw.ElapsedMilliseconds < count * opts.MinimumPeriod) Thread.Sleep(1); count++; SqlCommand cmd = new SqlCommand(string.Format("--{0}\r\n{1}", tuple.First, tuple.Second), cn); try { cmd.ExecuteNonQuery(); lock (syncRoot) { Console.WriteLine(string.Format(" thread {0}, sort {1}", g.Key, tuple.First)); } } catch (SqlException ex) { lock (syncRoot) { Console.WriteLine(string.Format("FAILED thread {0}, sort {1}", g.Key, tuple.First)); Console.Error.WriteLine("thread {0}\r\n{1}", tuple.First, ex); } } finally { cmd.Dispose(); } } } catch (ArgumentException) { Console.Error.WriteLine("Thread {0} could not connect to server; verify that the connection string is correct.", g.Key); } catch (Exception ex) { Console.Error.WriteLine("Thread {0} could not connect to server:\r\n", g.Key, ex); } finally { if (cn != null) cn.Dispose(); } }; var filtered = data .Take(threadCount) .Select((g, idx) => new Thread(() => process(g, opts.StaggeredStart ? idx : 0))) .ToArray() .ForEach(t => t.Start()) .ToArray() .ForEach(t => t.Join()) .ToArray(); } } public class Tuple<T1, T2> { public T1 First { get; private set; } public T2 Second { get; private set; } public Tuple(T1 first, T2 second) { First = first; Second = second; } public override int GetHashCode() { return First.GetHashCode() ^ Second.GetHashCode(); } public static bool operator ==(Tuple<T1, T2> a, Tuple<T1, T2> b) { return object.ReferenceEquals(a, b) || (object)a != null && a.Equals(b); } public static bool operator !=(Tuple<T1, T2> a, Tuple<T1, T2> b) { return !(a == b); } public override bool Equals(object obj) { var t = obj as Tuple<T1, T2>; return t != null && t.First.Equals(this.First) && t.Second.Equals(this.Second); } public override string ToString() { return string.Format("First: {0} Second: {1}", First, Second); } } static class Functional { public static IEnumerable<T> ForEach<T>(this IEnumerable<T> list, Action<T> action) { foreach (T el in list) { action(el); yield return el; } } } } }}
some permissive license goes here
contact