From 5f3aca393e3806f3f4ce6515bff14586be3be20f Mon Sep 17 00:00:00 2001 From: Vidar Hokstad Date: Thu, 11 Jan 2024 19:22:43 +0000 Subject: [PATCH] Switch to threaded model to make the async model easier to work with Also keeping request objects around and matching them to error objects to ease debugging, and more debug output when PUREX_DEBUG env var is set --- lib/X11/display.rb | 151 +++++++++++++++++++++++++++++++-------------- lib/X11/form.rb | 14 ++--- lib/X11/type.rb | 29 +++++---- 3 files changed, 128 insertions(+), 66 deletions(-) diff --git a/lib/X11/display.rb b/lib/X11/display.rb index 8eef828..880b449 100644 --- a/lib/X11/display.rb +++ b/lib/X11/display.rb @@ -31,18 +31,24 @@ module X11 authorize(host, family, display_id) @requestseq = 1 - @queue = [] + @rqueue = Queue.new # Read but not returned events + @wqueue = Queue.new + @extensions = {} # Known extensions + @atoms = {} # Interned atoms - @extensions = {} - - # Interned atoms - @atoms = {} + start_io end def event_handler= block @event_handler= block end + def flush + while !@wqueue.empty? + sleep(0.01) + end + end + def display_info @internal end @@ -68,8 +74,15 @@ module X11 def read_error data error = Form::Error.from_packet(StringIO.new(data)) + # FIXME: Maybe make this configurable, as it means potentially + # keeping along really heavy requests. or alternative purge them + # more aggressively also when there are no errors, as otherwise + # the growth might be unbounded + error.request = @requests[error.sequence_number] + @requests.keys.find_all{|s| s <= error.sequence_number}.each do |s| + @requests.delete(s) + end STDERR.puts "ERROR: #{error.inspect}" - raise error.inspect error end @@ -127,22 +140,24 @@ module X11 end def read_full_packet(len = 32) - data = @socket.read_nonblock(32) + data = @socket.read(32) return nil if data.nil? while data.length < 32 IO.select([@socket],nil,nil,0.001) data.concat(@socket.read_nonblock(32 - data.length)) end return data - rescue IO::WaitReadable - return nil end - def read_packet timeout=5.0 - IO.select([@socket],nil,nil, timeout) + def read_packet data = read_full_packet(32) return nil if data.nil? + # FIXME: Make it configurable. + @requests.keys.find_all{|s| s <= @requestseq - 50}.each do |s| + @requests.delete(s) + end + # FIXME: What is bit 8 for? Synthentic? type = data.unpack("C").first & 0x7f case type @@ -154,57 +169,100 @@ module X11 end end - def write_raw_packet(pkt) - @requestseq += 1 - @socket.write(pkt) - end - def write_packet(*args) pkt = args.join pkt[2..3] = u16(pkt.length/4) - write_raw_packet(pkt) + @wqueue << [nil,nil,pkt] end def write_request ob - data = ob.to_packet if ob.respond_to?(:to_packet) + data = ob.to_packet(self) if ob.respond_to?(:to_packet) raise "BAD LENGTH for #{ob.inspect} (#{ob.request_length.to_i*4} ! #{data.size} " if ob.request_length && ob.request_length.to_i*4 != data.size - write_raw_packet(data) + STDERR.puts "write_req: #{ob.inspect}" if @debug + @wqueue << [ob,nil,data] end - def write_sync(data, reply=nil) - seq = @requestseq - write_request(data) - pkt = next_reply(seq) - return nil if !pkt - return pkt if pkt.is_a?(X11::Form::Error) - pp reply + def write_sync(ob, reply=nil) + data = ob.to_packet(self) if ob.respond_to?(:to_packet) + q = Queue.new + @wqueue << [ob,q,data] + STDERR.puts "write_sync_req: #{ob.inspect}" if @debug + pkt = q.shift + STDERR.puts "write_sync_rep: #{pkt.inspect}" if @debug + raise(pkt) if pkt.is_a?(X11::Form::Error) + return pkt if !pkt.is_a?(String) reply ? reply.from_packet(StringIO.new(pkt)) : pkt end - def peek_packet - !@queue.empty? - end + def peek_packet = !@rqueue.empty? + def next_packet = @rqueue.shift - def next_packet - @queue.shift || read_packet - end + def close = @rqueue.close + + def start_io + @replies ||= {} + @requests ||= {} + # Read thread. + # FIXME: Drop the select. + rt = Thread.new do + while pkt = read_packet + #STDERR.puts "read: #{pkt.inspect}" + if !pkt + sleep 0.1 + elsif pkt.is_a?(String) + # This is a reply. We need the sequence number. + # + seq = pkt.unpack1("@2S") + STDERR.puts " - seq= #{seq}" if @debug + STDERR.puts @replies.inspect if @debug + if @replies[seq] + q = @replies.delete(seq) + STDERR.puts " - reply to #{q}" if @debug + q << pkt + end + elsif pkt.is_a?(X11::Form::Error) + if @replies[pkt.sequence_number] + q = @replies.delete(pkt.sequence_number) + q << pkt + else + @rqueue << pkt + end + else + @rqueue << pkt + end + end + @rqueue.close + @replies.values.each(&:close) + end - def next_reply(errseq) - # FIXME: This is totally broken - while pkt = read_packet - if pkt.is_a?(String) - return pkt - elsif pkt.is_a?(X11::Form::Error) && pkt.sequence_number == errseq - return pkt - else - @queue.push(pkt) + # Write thread + wt = Thread.new do + while msg = @wqueue.shift + ob, q, data = *msg + @requests[@requestseq] = ob + @replies[@requestseq] = q if q + @requestseq = (@requestseq + 1) % 65536 + @socket.write(data) end end + + at_exit do + flush + @rqueue.close + @wqueue.close + # We kill this because it may be stuck in a read + # we'll never care about + Thread.kill(rt) + + # We wait for this to finish because otherwise we may + # lose side-effects + wt.join + end end - + def run loop do - pkt = read_packet + pkt = next_packet return if !pkt yield(pkt) end @@ -215,13 +273,15 @@ module X11 d.depth == depth }.visuals.find{|v| v.qlass = qlass } end + def default_root = screens.first.root + # Requests def create_window(x,y,w,h, values: {}, depth: 32, parent: nil, border_width: 0, wclass: X11::Form::InputOutput, visual: nil ) wid = new_id - parent ||= screens.first.root + parent ||= default_root if visual.nil? visual = find_visual(0, depth).visual_id @@ -571,7 +631,6 @@ module X11 auth_name = "" auth_data = "" end - p [auth_name, auth_data] handshake = Form::ClientHandshake.new( Protocol::BYTE_ORDER, @@ -581,7 +640,7 @@ module X11 auth_data ) - @socket.write(handshake.to_packet) + @socket.write(handshake.to_packet(self)) data = @socket.read(1) raise AuthorizationError, "Failed to read response from server" if !data diff --git a/lib/X11/form.rb b/lib/X11/form.rb index 572d338..18fe805 100644 --- a/lib/X11/form.rb +++ b/lib/X11/form.rb @@ -44,7 +44,7 @@ module X11 end end - def to_packet + def to_packet(dpy) # fetch class level instance variable holding defined fields structs = self.class.structs @@ -64,10 +64,10 @@ module X11 #p [s,value] if value.is_a?(BaseForm) - v = value.to_packet + v = value.to_packet(dpy) else #p [s,value] - v = s.type_klass.pack(value) + v = s.type_klass.pack(value, dpy) end #p v v @@ -77,15 +77,15 @@ module X11 when :length, :format_length #p [s,value] #p [value.size] - s.type_klass.pack(value.size) + s.type_klass.pack(value.size, dpy) when :string - s.type_klass.pack(value) + s.type_klass.pack(value, dpy) when :list Array(value).collect do |obj| if obj.is_a?(BaseForm) - obj.to_packet + obj.to_packet(dpy) else - s.type_klass.pack(obj) + s.type_klass.pack(obj, dpy) end end end diff --git a/lib/X11/type.rb b/lib/X11/type.rb index c58cc02..3a0ddf0 100644 --- a/lib/X11/type.rb +++ b/lib/X11/type.rb @@ -11,7 +11,7 @@ module X11 def self.config(d,b) = (@directive, @bytesize = d,b) - def self.pack(x) + def self.pack(x, dpy) if x.is_a?(Symbol) if (t = X11::Form.const_get(x)) && t.is_a?(Numeric) x = t @@ -27,7 +27,6 @@ module X11 def self.from_packet(sock) = unpack(sock.read(size)) end - # Primitive Types class Int8 < BaseType; config("c",1); end class Int16 < BaseType; config("s",2); end class Int32 < BaseType; config("l",4); end @@ -36,16 +35,14 @@ module X11 class Uint32 < BaseType; config("L",4); end class Message - def self.pack(x) = x.b - def self.unpack(x) = x.b - def self.size = 20 + def self.pack(x,dpy) = x.b + def self.unpack(x) = x.b + def self.size = 20 def self.from_packet(sock) = sock.read(2).b end class String8 - def self.pack(x) - x.b + "\x00"*(-x.length & 3) - end + def self.pack(x, dpy) = (x.b + "\x00"*(-x.length & 3)) def self.unpack(socket, size) raise "Expected size for String8" if size.nil? @@ -57,7 +54,7 @@ module X11 end class String16 - def self.pack(x) + def self.pack(x, dpy) x.encode("UTF-16BE").b + "\x00\x00"*(-x.length & 1) end @@ -71,13 +68,13 @@ module X11 class String8Unpadded - def self.pack(x) = x + def self.pack(x,dpy) = x def self.unpack(socket, size) = socket.read(size) end class Bool - def self.pack(x) = (x ? "\x01" : "\x00") - def self.unpack(str) = (str[0] == "\x01") + def self.pack(x, dpy) = (x ? "\x01" : "\x00") + def self.unpack(str) = (str[0] == "\x01") def self.size = 1 end @@ -96,10 +93,16 @@ module X11 Colormap = Uint32 Drawable = Uint32 Fontable = Uint32 - Atom = Uint32 VisualID = Uint32 Mask = Uint32 Timestamp = Uint32 Keysym = Uint32 + + class Atom + def self.pack(x,dpy) = [dpy.atom(x)].pack("L") + def self.unpack(x) = x.nil? ? nil : x.unpack1("L") + def self.size = 4 + def self.from_packet(sock) = unpack(sock.read(size)) + end end end