We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ACSControllerなるものを作成してください。
class Service(object): def __init__(self, environ, start_response, user=None): self.environ = environ logger.debug("ENVIRON: %s", environ) self.start_response = start_response self.user = user self.sp = None def unpack_redirect(self): if "QUERY_STRING" in self.environ: _qs = self.environ["QUERY_STRING"] return dict([(k, v[0]) for k, v in parse_qs(_qs).items()]) else: return None def unpack_post(self): _dict = parse_qs(get_post(self.environ).decode("utf8")) logger.debug("unpack_post:: %s", _dict) return dict([(k, v[0]) for k, v in _dict.items()]) ... def unpack_either(self): if self.environ["REQUEST_METHOD"] == "GET": _dict = self.unpack_redirect() elif self.environ["REQUEST_METHOD"] == "POST": _dict = self.unpack_post() else: _dict = None logger.debug("_dict: %s", _dict) return _dict def operation(self, _dict, binding): logger.debug("_operation: %s", _dict) if not _dict: resp = BadRequest("Error parsing request or no request") return resp(self.environ, self.start_response) else: try: _relay_state = _dict["RelayState"] except KeyError: _relay_state = "" if "SAMLResponse" in _dict: return self.do( _dict["SAMLResponse"], binding, _relay_state, mtype="response" ) elif "SAMLRequest" in _dict: return self.do( _dict["SAMLRequest"], binding, _relay_state, mtype="request" ) def artifact_operation(self, _dict): if not _dict: resp = BadRequest("Missing query") return resp(self.environ, self.start_response) else: # exchange artifact for response request = self.sp.artifact2message(_dict["SAMLart"], "spsso") return self.do(request, BINDING_HTTP_ARTIFACT, _dict["RelayState"]) def response(self, binding, http_args): if binding == BINDING_HTTP_ARTIFACT: resp = Redirect() else: resp = Response(http_args["data"], headers=http_args["headers"]) return resp(self.environ, self.start_response) def do(self, query, binding, relay_state="", mtype="response"): pass def redirect(self): """ Expects a HTTP-redirect response """ _dict = self.unpack_redirect() return self.operation(_dict, BINDING_HTTP_REDIRECT) def post(self): """ Expects a HTTP-POST response """ _dict = self.unpack_post() return self.operation(_dict, BINDING_HTTP_POST) ... class ACS(Service): def __init__(self, sp, environ, start_response, cache=None, **kwargs): Service.__init__(self, environ, start_response) self.sp = sp self.outstanding_queries = cache.outstanding_queries self.cache = cache self.response = None self.kwargs = kwargs def do(self, response, binding, relay_state="", mtype="response"): """ :param response: The SAML response, transport encoded :param binding: Which binding the query came in over """ # tmp_outstanding_queries = dict(self.outstanding_queries) if not response: logger.info("Missing Response") resp = Unauthorized("Unknown user") return resp(self.environ, self.start_response) try: conv_info = { "remote_addr": self.environ["REMOTE_ADDR"], "request_uri": self.environ["REQUEST_URI"], "entity_id": self.sp.config.entityid, "endpoints": self.sp.config.getattr("endpoints", "sp"), } self.response = self.sp.parse_authn_request_response( response, binding, self.outstanding_queries, self.cache.outstanding_certs, conv_info=conv_info, ) except UnknownPrincipal as excp: logger.error("UnknownPrincipal: %s", excp) resp = ServiceError("UnknownPrincipal: %s" % (excp,)) return resp(self.environ, self.start_response) except UnsupportedBinding as excp: logger.error("UnsupportedBinding: %s", excp) resp = ServiceError("UnsupportedBinding: %s" % (excp,)) return resp(self.environ, self.start_response) except VerificationError as err: resp = ServiceError("Verification error: %s" % (err,)) return resp(self.environ, self.start_response) except SignatureError as err: resp = ServiceError("Signature error: %s" % (err,)) return resp(self.environ, self.start_response) except Exception as err: resp = ServiceError("Other error: %s" % (err,)) return resp(self.environ, self.start_response) logger.info("AVA: %s", self.response.ava) user = User(self.response.name_id, self.response.ava, self.response) cookie = self.cache.set_cookie(user) resp = Redirect("/", headers=[cookie]) return resp(self.environ, self.start_response) def verify_attributes(self, ava): logger.info("SP: %s", self.sp.config.entityid) rest = POLICY.get_entity_categories(self.sp.config.entityid, self.sp.metadata) akeys = [k.lower() for k in ava.keys()] res = {"less": [], "more": []} for key, attr in rest.items(): if key not in ava: if key not in akeys: res["less"].append(key) for key, attr in ava.items(): _key = key.lower() if _key not in rest: res["more"].append(key) return res
pysaml2のサンプルではSPという変数がありましたが、この正体はSaml2Cilentというクラスインスタンスだそうです。
具体クラスと抽象クラスを行ったり来たりしていて処理が見づらいですが、やりたいことはpostのエンドポイントもredirectのエンドポイントも同じで、Saml2Client.parse_authn_request_responseにパラメータを与えて叩きたいということみたいです。
とりあえず、pysamlに付属しているidpのサンプルを稼働させて、それに対応できるように作り込んでみましょう。
The text was updated successfully, but these errors were encountered:
takuan517
No branches or pull requests
ACSControllerなるものを作成してください。
pysaml2のサンプルではSPという変数がありましたが、この正体はSaml2Cilentというクラスインスタンスだそうです。
具体クラスと抽象クラスを行ったり来たりしていて処理が見づらいですが、やりたいことはpostのエンドポイントもredirectのエンドポイントも同じで、Saml2Client.parse_authn_request_responseにパラメータを与えて叩きたいということみたいです。
とりあえず、pysamlに付属しているidpのサンプルを稼働させて、それに対応できるように作り込んでみましょう。
The text was updated successfully, but these errors were encountered: